Mercurial > repos > petrn > repeatexplorer
comparison lib/parallel/parallel.py @ 0:f6ebec6e235e draft
Uploaded
| author | petrn |
|---|---|
| date | Thu, 19 Dec 2019 13:46:43 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:f6ebec6e235e |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 import multiprocessing | |
| 3 import os | |
| 4 import time | |
| 5 from itertools import cycle | |
| 6 ''' | |
| 7 functions for parallel processing of data chunks using worker function | |
| 8 ''' | |
| 9 | |
| 10 | |
| 11 def run_multiple_pbs_jobs(cmds, status_files, qsub_params=""): | |
| 12 ''' | |
| 13 Example of pbs_params: | |
| 14 -l walltime=1000:00:00,nodes=1:ppn=8,mem=15G | |
| 15 -l walltime=150:00:00,nodes=1:ppn=1 | |
| 16 | |
| 17 ''' | |
| 18 jobs = [] | |
| 19 status_function = [] | |
| 20 status_command = [] | |
| 21 for cmd, sf in zip(cmds, status_files): | |
| 22 jobs.append(pbs_send_job(cmd, sf, qsub_params)) | |
| 23 for p in jobs: | |
| 24 p.join() | |
| 25 status_function.append(p.exitcode) | |
| 26 # collect pbs run status | |
| 27 for sf in status_files: | |
| 28 with open(sf) as f: | |
| 29 status_command.append(f.read().strip()) | |
| 30 status = {'function': status_function, 'command': status_command} | |
| 31 return status | |
| 32 | |
| 33 | |
| 34 def pbs_send_job(cmd, status_file, qsub_params): | |
| 35 ''' send job to pbs cluster, require status file''' | |
| 36 p = multiprocessing.Process(target=pbs_run, | |
| 37 args=(cmd, status_file, qsub_params)) | |
| 38 p.start() | |
| 39 return p | |
| 40 | |
| 41 | |
| 42 def pbs_run(cmd, status_file, qsub_params): | |
| 43 ''' | |
| 44 run shell command cmd on pbs cluster, wait for job to finish | |
| 45 and return status | |
| 46 ''' | |
| 47 print(status_file) | |
| 48 error_file = status_file + ".e" | |
| 49 # test if writable | |
| 50 try: | |
| 51 f = open(status_file, 'w').close() | |
| 52 f = open(error_file, 'w').close() | |
| 53 except IOError: | |
| 54 print("cannot write to status files, make sure path exists") | |
| 55 raise IOError | |
| 56 | |
| 57 if os.path.exists(status_file): | |
| 58 print("removing old status file") | |
| 59 os.remove(status_file) | |
| 60 cmd_full = ("echo '{cmd} && echo \"OK\" > {status_file} || echo \"ERROR\"" | |
| 61 " > {status_file}' | qsub -e {err}" | |
| 62 " {qsub_params} ").format(cmd=cmd, status_file=status_file, | |
| 63 err=error_file, | |
| 64 qsub_params=qsub_params) | |
| 65 os.system(cmd_full) | |
| 66 | |
| 67 while True: | |
| 68 if os.path.exists(status_file): | |
| 69 break | |
| 70 else: | |
| 71 time.sleep(3) | |
| 72 with open(status_file) as f: | |
| 73 status = f.read().strip() | |
| 74 return status | |
| 75 | |
| 76 | |
| 77 def spawn(f): | |
| 78 def fun(pipe, x): | |
| 79 pipe.send(f(x)) | |
| 80 pipe.close() | |
| 81 return fun | |
| 82 | |
| 83 | |
| 84 def get_max_proc(): | |
| 85 '''Number of cpu to ise in ether get from config.py is available or | |
| 86 from global PROC or from environment variable PRCO or set to system max''' | |
| 87 try: | |
| 88 from config import PROC as max_proc | |
| 89 except ImportError: | |
| 90 if "PROC" in globals(): | |
| 91 max_proc = PROC | |
| 92 elif "PROC" in os.environ: | |
| 93 max_proc = int(os.environ["PROC"]) | |
| 94 | |
| 95 else: | |
| 96 max_proc = multiprocessing.cpu_count() | |
| 97 return max_proc | |
| 98 | |
| 99 | |
| 100 def parmap2(f, X, groups, ppn): | |
| 101 max_proc = get_max_proc() | |
| 102 print("running in parallel using ", max_proc, "cpu(s)") | |
| 103 process_pool = [] | |
| 104 output = [None] * len(X) | |
| 105 # prepare processes | |
| 106 for x, index in zip(X, list(range(len(X)))): | |
| 107 # status: | |
| 108 # 0: waiting, 1: running, 2:collected | |
| 109 process_pool.append({ | |
| 110 'status': 0, | |
| 111 'proc': None, | |
| 112 'pipe': None, | |
| 113 'index': index, | |
| 114 'group': groups[index], | |
| 115 'ppn': ppn[index] | |
| 116 | |
| 117 }) | |
| 118 | |
| 119 # run processes | |
| 120 running = 0 | |
| 121 finished = 0 | |
| 122 sleep_time = 0.001 | |
| 123 while True: | |
| 124 # count alive processes | |
| 125 if not sleep_time: | |
| 126 sleep_time = 0.001 | |
| 127 for i in process_pool: | |
| 128 if i['status'] == 1 and not (i['proc'].exitcode is None): | |
| 129 sleep_time = 0.0 | |
| 130 # was running now finished --> collect | |
| 131 i['status'] = 2 | |
| 132 running -= 1 | |
| 133 finished += 1 | |
| 134 output[i['index']] = collect(i['proc'], i['pipe']) | |
| 135 del i['pipe'] | |
| 136 del i['proc'] | |
| 137 if i['status'] == 0 and running < max_proc: | |
| 138 # waiting and free --> run | |
| 139 # check if this group can be run | |
| 140 running_groups = [pp['group'] | |
| 141 for pp in process_pool if pp['status'] == 1] | |
| 142 # check max load of concurent runs: | |
| 143 current_load = sum([pp['ppn'] | |
| 144 for pp in process_pool if pp['status'] == 1]) | |
| 145 cond1 = (i['ppn'] + current_load) <= 1 | |
| 146 cond2 = not i['group'] in running_groups | |
| 147 if cond1 and cond2: | |
| 148 sleep_time = 0.0 | |
| 149 try: | |
| 150 i['pipe'] = multiprocessing.Pipe() | |
| 151 except OSError as e: | |
| 152 print('exception occured:',e) | |
| 153 continue | |
| 154 i['proc'] = multiprocessing.Process( | |
| 155 target=spawn(f), | |
| 156 args=(i['pipe'][1], X[i['index']]), | |
| 157 name=str(i['index'])) | |
| 158 i['proc'].start() | |
| 159 i['status'] = 1 | |
| 160 running += 1 | |
| 161 if finished == len(process_pool): | |
| 162 break | |
| 163 if sleep_time: | |
| 164 # sleep only if nothing changed in the last cycle | |
| 165 time.sleep(sleep_time) | |
| 166 # sleep time gradually increase to 1 sec | |
| 167 sleep_time = min(2 * sleep_time, 1) | |
| 168 return output | |
| 169 | |
| 170 | |
| 171 def print_status(pp): | |
| 172 states = ['waiting', 'running', 'collected'] | |
| 173 print("___________________________________") | |
| 174 print("jobid status group ppn exitcode") | |
| 175 print("===================================") | |
| 176 for i in pp: | |
| 177 print( | |
| 178 i['index'], " ", | |
| 179 states[i['status']], " ", | |
| 180 i['group'], " ", | |
| 181 i['ppn'], " ", | |
| 182 i['proc'].exitcode | |
| 183 ) | |
| 184 | |
| 185 | |
| 186 def collect(pf, pp): | |
| 187 if pf.pid and not pf.exitcode and not pf.is_alive(): | |
| 188 returnvalue = pp[0].recv() | |
| 189 pf.join() | |
| 190 pp[0].close() | |
| 191 pp[1].close() | |
| 192 return returnvalue | |
| 193 elif pf.exitcode: | |
| 194 print("job finished with exit code {}".format(pf.exitcode)) | |
| 195 pf.join() | |
| 196 pp[0].close() | |
| 197 pp[1].close() | |
| 198 return None | |
| 199 # return None | |
| 200 else: | |
| 201 raise Exception('not collected') | |
| 202 | |
| 203 | |
| 204 def parmap(f, X): | |
| 205 | |
| 206 max_proc = get_max_proc() | |
| 207 | |
| 208 pipe = [] | |
| 209 proc = [] | |
| 210 returnvalue = {} | |
| 211 | |
| 212 for x, index in zip(X, list(range(len(X)))): | |
| 213 pipe.append(multiprocessing.Pipe()) | |
| 214 proc.append(multiprocessing.Process(target=spawn(f), | |
| 215 args=(pipe[-1][1], x), name=str(index))) | |
| 216 p = proc[-1] | |
| 217 # count alive processes | |
| 218 while True: | |
| 219 running = 0 | |
| 220 for i in proc: | |
| 221 if i.is_alive(): | |
| 222 running += 1 | |
| 223 # print "running:"+str(running) | |
| 224 if running < max_proc: | |
| 225 break | |
| 226 else: | |
| 227 time.sleep(0.1) | |
| 228 p.start() | |
| 229 # print "process started:"+str(p.pid) | |
| 230 # check for finished | |
| 231 | |
| 232 for pf, pp, index in zip(proc, pipe, range(len(pipe))): | |
| 233 if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue): | |
| 234 pf.join() | |
| 235 returnvalue[str(pf.name)] = pp[0].recv() | |
| 236 pp[0].close() | |
| 237 pp[1].close() | |
| 238 # proc must be garbage collected - to free all file connection | |
| 239 del proc[index] | |
| 240 del pipe[index] | |
| 241 | |
| 242 # collect the rest: | |
| 243 [pf.join() for pf in proc] | |
| 244 for pf, pp in zip(proc, pipe): | |
| 245 if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue): | |
| 246 returnvalue[str(pf.name)] = pp[0].recv() | |
| 247 pp[0].close() | |
| 248 pp[1].close() | |
| 249 # convert to list in input correct order | |
| 250 returnvalue = [returnvalue[str(i)] for i in range(len(X))] | |
| 251 return returnvalue | |
| 252 | |
| 253 | |
| 254 def parallel2(command, *args, groups=None, ppn=None): | |
| 255 ''' same as parallel but groups are used to identifie mutually | |
| 256 exclusive jobs, jobs with the same goup id are never run together | |
| 257 ppn params is 'load' of the job - sum of loads cannot exceed 1 | |
| 258 ''' | |
| 259 # check args, expand if necessary | |
| 260 args = list(args) | |
| 261 N = [len(i) for i in args] # lengths of lists | |
| 262 Mx = max(N) | |
| 263 if len(set(N)) == 1: | |
| 264 # all good | |
| 265 pass | |
| 266 elif set(N) == set([1, Mx]): | |
| 267 # expand args of length 1 | |
| 268 for i in range(len(args)): | |
| 269 if len(args[i]) == 1: | |
| 270 args[i] = args[i] * Mx | |
| 271 else: | |
| 272 raise ValueError | |
| 273 if not groups: | |
| 274 groups = range(Mx) | |
| 275 elif len(groups) != Mx: | |
| 276 print("length of groups must be same as number of job or None") | |
| 277 raise ValueError | |
| 278 | |
| 279 if not ppn: | |
| 280 ppn = [0] * Mx | |
| 281 elif len(ppn) != Mx: | |
| 282 print("length of ppn must be same as number of job or None") | |
| 283 raise ValueError | |
| 284 elif max(ppn) > 1 and min(ppn): | |
| 285 print("ppn values must be in 0 - 1 range") | |
| 286 raise ValueError | |
| 287 # convert argument to suitable format - 'transpose' | |
| 288 argsTuples = list(zip(*args)) | |
| 289 args = [list(i) for i in argsTuples] | |
| 290 | |
| 291 # multiprocessing.Pool() | |
| 292 | |
| 293 def command_star(args): | |
| 294 return(command(*args)) | |
| 295 | |
| 296 x = parmap2(command_star, argsTuples, groups, ppn) | |
| 297 return x | |
| 298 | |
| 299 | |
| 300 def parallel(command, *args): | |
| 301 ''' Execute command in parallel using multiprocessing | |
| 302 command is the function to be executed | |
| 303 args is list of list of arguments | |
| 304 execution is : | |
| 305 command(args[0][0],args[1][0],args[2][0],args[3][0],....) | |
| 306 command(args[0][1],args[1][1],args[2][1],args[3][1],....) | |
| 307 command(args[0][2],args[1][2],args[2][2],args[3][2],....) | |
| 308 ... | |
| 309 output of command is returned as list | |
| 310 ''' | |
| 311 # check args, expand if necessary | |
| 312 args = list(args) | |
| 313 N = [len(i) for i in args] # lengths of lists | |
| 314 Mx = max(N) | |
| 315 if len(set(N)) == 1: | |
| 316 # all good | |
| 317 pass | |
| 318 elif set(N) == set([1, Mx]): | |
| 319 # expand args of length 1 | |
| 320 for i in range(len(args)): | |
| 321 if len(args[i]) == 1: | |
| 322 args[i] = args[i] * Mx | |
| 323 else: | |
| 324 raise ValueError | |
| 325 | |
| 326 # convert argument to suitable format - 'transpose' | |
| 327 argsTuples = list(zip(*args)) | |
| 328 args = [list(i) for i in argsTuples] | |
| 329 | |
| 330 multiprocessing.Pool() | |
| 331 | |
| 332 def command_star(args): | |
| 333 return(command(*args)) | |
| 334 | |
| 335 x = parmap(command_star, argsTuples) | |
| 336 return x | |
| 337 | |
| 338 | |
| 339 def worker(*a): | |
| 340 x = 0 | |
| 341 y = 0 | |
| 342 for i in a: | |
| 343 if i == 1.1: | |
| 344 print("raising exception") | |
| 345 s = 1 / 0 | |
| 346 y += i | |
| 347 for j in range(10): | |
| 348 x += i | |
| 349 for j in range(100000): | |
| 350 x = 1.0 / (float(j) + 1.0) | |
| 351 return(y) | |
| 352 | |
| 353 # test | |
| 354 if __name__ == "__main__": | |
| 355 # x = parallel2(worker, [1], [2], [3], [4], [1], [1, 2, 3, 7, 10, 1.1, 20, 30, 40, 10, 30, 20, 40, 50, 50], [ | |
| 356 # 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2]) | |
| 357 | |
| 358 x = parallel2( | |
| 359 worker, [1], [2], [3], [4], [1], | |
| 360 [1, 2, 3, 7, 10, 1.2, 20, 30, 40, 10, 30, 20, 40, 50, 50], | |
| 361 [3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2], | |
| 362 groups=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], | |
| 363 ppn=[0.6, 0.6, 0.2, 0.6, 0.2, 0.2, 0.4, | |
| 364 0.1, 0.1, 0.3, 0.3, 0.3, 0.1, 0.1, 0.1] | |
| 365 ) | |
| 366 print(x) |
