| 0 | 1 #!/usr/bin/env python3 | 
|  | 2 import multiprocessing | 
|  | 3 import os | 
|  | 4 import time | 
|  | 5 from itertools import cycle | 
|  | 6 ''' | 
|  | 7 functions for parallel processing of data chunks using worker function | 
|  | 8 ''' | 
|  | 9 | 
|  | 10 | 
|  | 11 def run_multiple_pbs_jobs(cmds, status_files, qsub_params=""): | 
|  | 12     ''' | 
|  | 13     Example of pbs_params: | 
|  | 14     -l walltime=1000:00:00,nodes=1:ppn=8,mem=15G | 
|  | 15     -l walltime=150:00:00,nodes=1:ppn=1 | 
|  | 16 | 
|  | 17     ''' | 
|  | 18     jobs = [] | 
|  | 19     status_function = [] | 
|  | 20     status_command = [] | 
|  | 21     for cmd, sf in zip(cmds, status_files): | 
|  | 22         jobs.append(pbs_send_job(cmd, sf, qsub_params)) | 
|  | 23     for p in jobs: | 
|  | 24         p.join() | 
|  | 25         status_function.append(p.exitcode) | 
|  | 26     # collect pbs run status | 
|  | 27     for sf in status_files: | 
|  | 28         with open(sf) as f: | 
|  | 29             status_command.append(f.read().strip()) | 
|  | 30     status = {'function': status_function, 'command': status_command} | 
|  | 31     return status | 
|  | 32 | 
|  | 33 | 
|  | 34 def pbs_send_job(cmd, status_file, qsub_params): | 
|  | 35     ''' send job to pbs cluster, require status file''' | 
|  | 36     p = multiprocessing.Process(target=pbs_run, | 
|  | 37                                 args=(cmd, status_file, qsub_params)) | 
|  | 38     p.start() | 
|  | 39     return p | 
|  | 40 | 
|  | 41 | 
|  | 42 def pbs_run(cmd, status_file, qsub_params): | 
|  | 43     ''' | 
|  | 44     run shell command cmd on pbs cluster, wait for job to finish | 
|  | 45     and return status | 
|  | 46     ''' | 
|  | 47     print(status_file) | 
|  | 48     error_file = status_file + ".e" | 
|  | 49     # test if writable | 
|  | 50     try: | 
|  | 51         f = open(status_file, 'w').close() | 
|  | 52         f = open(error_file, 'w').close() | 
|  | 53     except IOError: | 
|  | 54         print("cannot write to status files, make sure path exists") | 
|  | 55         raise IOError | 
|  | 56 | 
|  | 57     if os.path.exists(status_file): | 
|  | 58         print("removing old status file") | 
|  | 59         os.remove(status_file) | 
|  | 60     cmd_full = ("echo '{cmd} && echo \"OK\" > {status_file} || echo \"ERROR\"" | 
|  | 61                 " > {status_file}' | qsub -e {err}" | 
|  | 62                 " {qsub_params} ").format(cmd=cmd, status_file=status_file, | 
|  | 63                                           err=error_file, | 
|  | 64                                           qsub_params=qsub_params) | 
|  | 65     os.system(cmd_full) | 
|  | 66 | 
|  | 67     while True: | 
|  | 68         if os.path.exists(status_file): | 
|  | 69             break | 
|  | 70         else: | 
|  | 71             time.sleep(3) | 
|  | 72     with open(status_file) as f: | 
|  | 73         status = f.read().strip() | 
|  | 74     return status | 
|  | 75 | 
|  | 76 | 
|  | 77 def spawn(f): | 
|  | 78     def fun(pipe, x): | 
|  | 79         pipe.send(f(x)) | 
|  | 80         pipe.close() | 
|  | 81     return fun | 
|  | 82 | 
|  | 83 | 
|  | 84 def get_max_proc(): | 
|  | 85     '''Number of cpu to ise in ether get from config.py is available or | 
|  | 86     from global PROC or from environment variable PRCO or set to system max''' | 
|  | 87     try: | 
|  | 88         from config import PROC as max_proc | 
|  | 89     except ImportError: | 
|  | 90         if "PROC" in globals(): | 
|  | 91             max_proc = PROC | 
|  | 92         elif "PROC" in os.environ: | 
|  | 93             max_proc = int(os.environ["PROC"]) | 
|  | 94 | 
|  | 95         else: | 
|  | 96             max_proc = multiprocessing.cpu_count() | 
|  | 97     return max_proc | 
|  | 98 | 
|  | 99 | 
|  | 100 def parmap2(f, X, groups, ppn): | 
|  | 101     max_proc = get_max_proc() | 
|  | 102     print("running in parallel using ", max_proc, "cpu(s)") | 
|  | 103     process_pool = [] | 
|  | 104     output = [None] * len(X) | 
|  | 105     # prepare processes | 
|  | 106     for x, index in zip(X, list(range(len(X)))): | 
|  | 107         # status: | 
|  | 108         # 0: waiting, 1: running, 2:collected | 
|  | 109         process_pool.append({ | 
|  | 110             'status': 0, | 
|  | 111             'proc': None, | 
|  | 112             'pipe': None, | 
|  | 113             'index': index, | 
|  | 114             'group': groups[index], | 
|  | 115             'ppn': ppn[index] | 
|  | 116 | 
|  | 117         }) | 
|  | 118 | 
|  | 119     # run processes | 
|  | 120     running = 0 | 
|  | 121     finished = 0 | 
|  | 122     sleep_time = 0.001 | 
|  | 123     while True: | 
|  | 124         # count alive processes | 
|  | 125         if not sleep_time: | 
|  | 126             sleep_time = 0.001 | 
|  | 127         for i in process_pool: | 
|  | 128             if i['status'] == 1 and not (i['proc'].exitcode is None): | 
|  | 129                 sleep_time = 0.0 | 
|  | 130                 # was running now finished --> collect | 
|  | 131                 i['status'] = 2 | 
|  | 132                 running -= 1 | 
|  | 133                 finished += 1 | 
|  | 134                 output[i['index']] = collect(i['proc'], i['pipe']) | 
|  | 135                 del i['pipe'] | 
|  | 136                 del i['proc'] | 
|  | 137             if i['status'] == 0 and running < max_proc: | 
|  | 138                 # waiting and free --> run | 
|  | 139                 # check if this group can be run | 
|  | 140                 running_groups = [pp['group'] | 
|  | 141                                   for pp in process_pool if pp['status'] == 1] | 
|  | 142                 # check max load  of concurent runs: | 
|  | 143                 current_load = sum([pp['ppn'] | 
|  | 144                                     for pp in process_pool if pp['status'] == 1]) | 
|  | 145                 cond1 = (i['ppn'] + current_load) <= 1 | 
|  | 146                 cond2 = not i['group'] in running_groups | 
|  | 147                 if cond1 and cond2: | 
|  | 148                     sleep_time = 0.0 | 
|  | 149                     try: | 
|  | 150                         i['pipe'] = multiprocessing.Pipe() | 
|  | 151                     except OSError as e: | 
|  | 152                         print('exception occured:',e) | 
|  | 153                         continue | 
|  | 154                     i['proc'] = multiprocessing.Process( | 
|  | 155                         target=spawn(f), | 
|  | 156                         args=(i['pipe'][1], X[i['index']]), | 
|  | 157                         name=str(i['index'])) | 
|  | 158                     i['proc'].start() | 
|  | 159                     i['status'] = 1 | 
|  | 160                     running += 1 | 
|  | 161         if finished == len(process_pool): | 
|  | 162             break | 
|  | 163         if sleep_time: | 
|  | 164             # sleep only if nothing changed in the last cycle | 
|  | 165             time.sleep(sleep_time) | 
|  | 166             # sleep time gradually increase to 1 sec | 
|  | 167             sleep_time = min(2 * sleep_time, 1) | 
|  | 168     return output | 
|  | 169 | 
|  | 170 | 
|  | 171 def print_status(pp): | 
|  | 172     states = ['waiting', 'running', 'collected'] | 
|  | 173     print("___________________________________") | 
|  | 174     print("jobid    status   group   ppn   exitcode") | 
|  | 175     print("===================================") | 
|  | 176     for i in pp: | 
|  | 177         print( | 
|  | 178             i['index'], "    ", | 
|  | 179             states[i['status']], "    ", | 
|  | 180             i['group'], "    ", | 
|  | 181             i['ppn'], "    ", | 
|  | 182             i['proc'].exitcode | 
|  | 183         ) | 
|  | 184 | 
|  | 185 | 
|  | 186 def collect(pf, pp): | 
|  | 187     if pf.pid and not pf.exitcode and not pf.is_alive(): | 
|  | 188         returnvalue = pp[0].recv() | 
|  | 189         pf.join() | 
|  | 190         pp[0].close() | 
|  | 191         pp[1].close() | 
|  | 192         return returnvalue | 
|  | 193     elif pf.exitcode: | 
|  | 194         print("job finished with exit code {}".format(pf.exitcode)) | 
|  | 195         pf.join() | 
|  | 196         pp[0].close() | 
|  | 197         pp[1].close() | 
|  | 198         return None | 
|  | 199     # return None | 
|  | 200     else: | 
|  | 201         raise Exception('not collected') | 
|  | 202 | 
|  | 203 | 
|  | 204 def parmap(f, X): | 
|  | 205 | 
|  | 206     max_proc = get_max_proc() | 
|  | 207 | 
|  | 208     pipe = [] | 
|  | 209     proc = [] | 
|  | 210     returnvalue = {} | 
|  | 211 | 
|  | 212     for x, index in zip(X, list(range(len(X)))): | 
|  | 213         pipe.append(multiprocessing.Pipe()) | 
|  | 214         proc.append(multiprocessing.Process(target=spawn(f), | 
|  | 215                                             args=(pipe[-1][1], x), name=str(index))) | 
|  | 216         p = proc[-1] | 
|  | 217         # count alive processes | 
|  | 218         while True: | 
|  | 219             running = 0 | 
|  | 220             for i in proc: | 
|  | 221                 if i.is_alive(): | 
|  | 222                     running += 1 | 
|  | 223           #          print "running:"+str(running) | 
|  | 224             if running < max_proc: | 
|  | 225                 break | 
|  | 226             else: | 
|  | 227                 time.sleep(0.1) | 
|  | 228         p.start() | 
|  | 229         # print "process started:"+str(p.pid) | 
|  | 230         # check for finished | 
|  | 231 | 
|  | 232         for pf, pp, index in zip(proc, pipe, range(len(pipe))): | 
|  | 233             if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue): | 
|  | 234                 pf.join() | 
|  | 235                 returnvalue[str(pf.name)] = pp[0].recv() | 
|  | 236                 pp[0].close() | 
|  | 237                 pp[1].close() | 
|  | 238                 # proc must be garbage collected - to free all file connection | 
|  | 239                 del proc[index] | 
|  | 240                 del pipe[index] | 
|  | 241 | 
|  | 242     # collect the rest: | 
|  | 243     [pf.join() for pf in proc] | 
|  | 244     for pf, pp in zip(proc, pipe): | 
|  | 245         if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue): | 
|  | 246             returnvalue[str(pf.name)] = pp[0].recv() | 
|  | 247             pp[0].close() | 
|  | 248             pp[1].close() | 
|  | 249     # convert to list in input correct order | 
|  | 250     returnvalue = [returnvalue[str(i)] for i in range(len(X))] | 
|  | 251     return returnvalue | 
|  | 252 | 
|  | 253 | 
|  | 254 def parallel2(command, *args, groups=None, ppn=None): | 
|  | 255     ''' same as parallel but groups are used to identifie mutually | 
|  | 256     exclusive jobs, jobs with the same goup id are never run together | 
|  | 257     ppn params is 'load' of the job - sum of loads cannot exceed 1 | 
|  | 258     ''' | 
|  | 259     # check args, expand if necessary | 
|  | 260     args = list(args) | 
|  | 261     N = [len(i) for i in args]  # lengths of lists | 
|  | 262     Mx = max(N) | 
|  | 263     if len(set(N)) == 1: | 
|  | 264         # all good | 
|  | 265         pass | 
|  | 266     elif set(N) == set([1, Mx]): | 
|  | 267         # expand args of length 1 | 
|  | 268         for i in range(len(args)): | 
|  | 269             if len(args[i]) == 1: | 
|  | 270                 args[i] = args[i] * Mx | 
|  | 271     else: | 
|  | 272         raise ValueError | 
|  | 273     if not groups: | 
|  | 274         groups = range(Mx) | 
|  | 275     elif len(groups) != Mx: | 
|  | 276         print("length of groups must be same as number of job or None") | 
|  | 277         raise ValueError | 
|  | 278 | 
|  | 279     if not ppn: | 
|  | 280         ppn = [0] * Mx | 
|  | 281     elif len(ppn) != Mx: | 
|  | 282         print("length of ppn must be same as number of job or None") | 
|  | 283         raise ValueError | 
|  | 284     elif max(ppn) > 1 and min(ppn): | 
|  | 285         print("ppn values must be in 0 - 1 range") | 
|  | 286         raise ValueError | 
|  | 287     # convert argument to suitable format - 'transpose' | 
|  | 288     argsTuples = list(zip(*args)) | 
|  | 289     args = [list(i) for i in argsTuples] | 
|  | 290 | 
|  | 291     # multiprocessing.Pool() | 
|  | 292 | 
|  | 293     def command_star(args): | 
|  | 294         return(command(*args)) | 
|  | 295 | 
|  | 296     x = parmap2(command_star,  argsTuples, groups, ppn) | 
|  | 297     return x | 
|  | 298 | 
|  | 299 | 
|  | 300 def parallel(command, *args): | 
|  | 301     ''' Execute command in parallel using multiprocessing | 
|  | 302     command is the function to be executed | 
|  | 303     args is list of list of arguments | 
|  | 304     execution is : | 
|  | 305         command(args[0][0],args[1][0],args[2][0],args[3][0],....) | 
|  | 306         command(args[0][1],args[1][1],args[2][1],args[3][1],....) | 
|  | 307         command(args[0][2],args[1][2],args[2][2],args[3][2],....) | 
|  | 308         ... | 
|  | 309     output of command is returned as list | 
|  | 310     ''' | 
|  | 311     # check args, expand if necessary | 
|  | 312     args = list(args) | 
|  | 313     N = [len(i) for i in args]  # lengths of lists | 
|  | 314     Mx = max(N) | 
|  | 315     if len(set(N)) == 1: | 
|  | 316         # all good | 
|  | 317         pass | 
|  | 318     elif set(N) == set([1, Mx]): | 
|  | 319         # expand args of length 1 | 
|  | 320         for i in range(len(args)): | 
|  | 321             if len(args[i]) == 1: | 
|  | 322                 args[i] = args[i] * Mx | 
|  | 323     else: | 
|  | 324         raise ValueError | 
|  | 325 | 
|  | 326     # convert argument to suitable format - 'transpose' | 
|  | 327     argsTuples = list(zip(*args)) | 
|  | 328     args = [list(i) for i in argsTuples] | 
|  | 329 | 
|  | 330     multiprocessing.Pool() | 
|  | 331 | 
|  | 332     def command_star(args): | 
|  | 333         return(command(*args)) | 
|  | 334 | 
|  | 335     x = parmap(command_star, argsTuples) | 
|  | 336     return x | 
|  | 337 | 
|  | 338 | 
|  | 339 def worker(*a): | 
|  | 340     x = 0 | 
|  | 341     y = 0 | 
|  | 342     for i in a: | 
|  | 343         if i == 1.1: | 
|  | 344             print("raising exception") | 
|  | 345             s = 1 / 0 | 
|  | 346         y += i | 
|  | 347         for j in range(10): | 
|  | 348             x += i | 
|  | 349             for j in range(100000): | 
|  | 350                 x = 1.0 / (float(j) + 1.0) | 
|  | 351     return(y) | 
|  | 352 | 
|  | 353 # test | 
|  | 354 if __name__ == "__main__": | 
|  | 355  #   x = parallel2(worker, [1], [2], [3], [4], [1], [1, 2, 3, 7, 10, 1.1, 20, 30, 40, 10, 30, 20, 40, 50, 50], [ | 
|  | 356  #       3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2]) | 
|  | 357 | 
|  | 358     x = parallel2( | 
|  | 359         worker, [1], [2], [3], [4], [1], | 
|  | 360         [1, 2, 3, 7, 10, 1.2, 20, 30, 40, 10, 30, 20, 40, 50, 50], | 
|  | 361         [3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2], | 
|  | 362         groups=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], | 
|  | 363         ppn=[0.6, 0.6, 0.2, 0.6, 0.2, 0.2, 0.4, | 
|  | 364              0.1, 0.1, 0.3, 0.3, 0.3, 0.1, 0.1, 0.1] | 
|  | 365     ) | 
|  | 366     print(x) |