| 
0
 | 
     1 #!/usr/bin/env python3
 | 
| 
 | 
     2 import multiprocessing
 | 
| 
 | 
     3 import os
 | 
| 
 | 
     4 import time
 | 
| 
 | 
     5 from itertools import cycle
 | 
| 
 | 
     6 '''
 | 
| 
 | 
     7 functions for parallel processing of data chunks using worker function
 | 
| 
 | 
     8 '''
 | 
| 
 | 
     9 
 | 
| 
 | 
    10 
 | 
| 
 | 
    11 def run_multiple_pbs_jobs(cmds, status_files, qsub_params=""):
 | 
| 
 | 
    12     '''
 | 
| 
 | 
    13     Example of pbs_params:
 | 
| 
 | 
    14     -l walltime=1000:00:00,nodes=1:ppn=8,mem=15G
 | 
| 
 | 
    15     -l walltime=150:00:00,nodes=1:ppn=1
 | 
| 
 | 
    16 
 | 
| 
 | 
    17     '''
 | 
| 
 | 
    18     jobs = []
 | 
| 
 | 
    19     status_function = []
 | 
| 
 | 
    20     status_command = []
 | 
| 
 | 
    21     for cmd, sf in zip(cmds, status_files):
 | 
| 
 | 
    22         jobs.append(pbs_send_job(cmd, sf, qsub_params))
 | 
| 
 | 
    23     for p in jobs:
 | 
| 
 | 
    24         p.join()
 | 
| 
 | 
    25         status_function.append(p.exitcode)
 | 
| 
 | 
    26     # collect pbs run status
 | 
| 
 | 
    27     for sf in status_files:
 | 
| 
 | 
    28         with open(sf) as f:
 | 
| 
 | 
    29             status_command.append(f.read().strip())
 | 
| 
 | 
    30     status = {'function': status_function, 'command': status_command}
 | 
| 
 | 
    31     return status
 | 
| 
 | 
    32 
 | 
| 
 | 
    33 
 | 
| 
 | 
    34 def pbs_send_job(cmd, status_file, qsub_params):
 | 
| 
 | 
    35     ''' send job to pbs cluster, require status file'''
 | 
| 
 | 
    36     p = multiprocessing.Process(target=pbs_run,
 | 
| 
 | 
    37                                 args=(cmd, status_file, qsub_params))
 | 
| 
 | 
    38     p.start()
 | 
| 
 | 
    39     return p
 | 
| 
 | 
    40 
 | 
| 
 | 
    41 
 | 
| 
 | 
    42 def pbs_run(cmd, status_file, qsub_params):
 | 
| 
 | 
    43     '''
 | 
| 
 | 
    44     run shell command cmd on pbs cluster, wait for job to finish
 | 
| 
 | 
    45     and return status
 | 
| 
 | 
    46     '''
 | 
| 
 | 
    47     print(status_file)
 | 
| 
 | 
    48     error_file = status_file + ".e"
 | 
| 
 | 
    49     # test if writable
 | 
| 
 | 
    50     try:
 | 
| 
 | 
    51         f = open(status_file, 'w').close()
 | 
| 
 | 
    52         f = open(error_file, 'w').close()
 | 
| 
 | 
    53     except IOError:
 | 
| 
 | 
    54         print("cannot write to status files, make sure path exists")
 | 
| 
 | 
    55         raise IOError
 | 
| 
 | 
    56 
 | 
| 
 | 
    57     if os.path.exists(status_file):
 | 
| 
 | 
    58         print("removing old status file")
 | 
| 
 | 
    59         os.remove(status_file)
 | 
| 
 | 
    60     cmd_full = ("echo '{cmd} && echo \"OK\" > {status_file} || echo \"ERROR\""
 | 
| 
 | 
    61                 " > {status_file}' | qsub -e {err}"
 | 
| 
 | 
    62                 " {qsub_params} ").format(cmd=cmd, status_file=status_file,
 | 
| 
 | 
    63                                           err=error_file,
 | 
| 
 | 
    64                                           qsub_params=qsub_params)
 | 
| 
 | 
    65     os.system(cmd_full)
 | 
| 
 | 
    66 
 | 
| 
 | 
    67     while True:
 | 
| 
 | 
    68         if os.path.exists(status_file):
 | 
| 
 | 
    69             break
 | 
| 
 | 
    70         else:
 | 
| 
 | 
    71             time.sleep(3)
 | 
| 
 | 
    72     with open(status_file) as f:
 | 
| 
 | 
    73         status = f.read().strip()
 | 
| 
 | 
    74     return status
 | 
| 
 | 
    75 
 | 
| 
 | 
    76 
 | 
| 
 | 
    77 def spawn(f):
 | 
| 
 | 
    78     def fun(pipe, x):
 | 
| 
 | 
    79         pipe.send(f(x))
 | 
| 
 | 
    80         pipe.close()
 | 
| 
 | 
    81     return fun
 | 
| 
 | 
    82 
 | 
| 
 | 
    83 
 | 
| 
 | 
    84 def get_max_proc():
 | 
| 
 | 
    85     '''Number of cpu to ise in ether get from config.py is available or
 | 
| 
 | 
    86     from global PROC or from environment variable PRCO or set to system max'''
 | 
| 
 | 
    87     try:
 | 
| 
 | 
    88         from config import PROC as max_proc
 | 
| 
 | 
    89     except ImportError:
 | 
| 
 | 
    90         if "PROC" in globals():
 | 
| 
 | 
    91             max_proc = PROC
 | 
| 
 | 
    92         elif "PROC" in os.environ:
 | 
| 
 | 
    93             max_proc = int(os.environ["PROC"])
 | 
| 
 | 
    94 
 | 
| 
 | 
    95         else:
 | 
| 
 | 
    96             max_proc = multiprocessing.cpu_count()
 | 
| 
 | 
    97     return max_proc
 | 
| 
 | 
    98 
 | 
| 
 | 
    99 
 | 
| 
 | 
   100 def parmap2(f, X, groups, ppn):
 | 
| 
 | 
   101     max_proc = get_max_proc()
 | 
| 
 | 
   102     print("running in parallel using ", max_proc, "cpu(s)")
 | 
| 
 | 
   103     process_pool = []
 | 
| 
 | 
   104     output = [None] * len(X)
 | 
| 
 | 
   105     # prepare processes
 | 
| 
 | 
   106     for x, index in zip(X, list(range(len(X)))):
 | 
| 
 | 
   107         # status:
 | 
| 
 | 
   108         # 0: waiting, 1: running, 2:collected
 | 
| 
 | 
   109         process_pool.append({
 | 
| 
 | 
   110             'status': 0,
 | 
| 
 | 
   111             'proc': None,
 | 
| 
 | 
   112             'pipe': None,
 | 
| 
 | 
   113             'index': index,
 | 
| 
 | 
   114             'group': groups[index],
 | 
| 
 | 
   115             'ppn': ppn[index]
 | 
| 
 | 
   116 
 | 
| 
 | 
   117         })
 | 
| 
 | 
   118 
 | 
| 
 | 
   119     # run processes
 | 
| 
 | 
   120     running = 0
 | 
| 
 | 
   121     finished = 0
 | 
| 
 | 
   122     sleep_time = 0.001
 | 
| 
 | 
   123     while True:
 | 
| 
 | 
   124         # count alive processes
 | 
| 
 | 
   125         if not sleep_time:
 | 
| 
 | 
   126             sleep_time = 0.001
 | 
| 
 | 
   127         for i in process_pool:
 | 
| 
 | 
   128             if i['status'] == 1 and not (i['proc'].exitcode is None):
 | 
| 
 | 
   129                 sleep_time = 0.0
 | 
| 
 | 
   130                 # was running now finished --> collect
 | 
| 
 | 
   131                 i['status'] = 2
 | 
| 
 | 
   132                 running -= 1
 | 
| 
 | 
   133                 finished += 1
 | 
| 
 | 
   134                 output[i['index']] = collect(i['proc'], i['pipe'])
 | 
| 
 | 
   135                 del i['pipe']
 | 
| 
 | 
   136                 del i['proc']
 | 
| 
 | 
   137             if i['status'] == 0 and running < max_proc:
 | 
| 
 | 
   138                 # waiting and free --> run
 | 
| 
 | 
   139                 # check if this group can be run
 | 
| 
 | 
   140                 running_groups = [pp['group']
 | 
| 
 | 
   141                                   for pp in process_pool if pp['status'] == 1]
 | 
| 
 | 
   142                 # check max load  of concurent runs:
 | 
| 
 | 
   143                 current_load = sum([pp['ppn']
 | 
| 
 | 
   144                                     for pp in process_pool if pp['status'] == 1])
 | 
| 
 | 
   145                 cond1 = (i['ppn'] + current_load) <= 1
 | 
| 
 | 
   146                 cond2 = not i['group'] in running_groups
 | 
| 
 | 
   147                 if cond1 and cond2:
 | 
| 
 | 
   148                     sleep_time = 0.0
 | 
| 
 | 
   149                     try:
 | 
| 
 | 
   150                         i['pipe'] = multiprocessing.Pipe()
 | 
| 
 | 
   151                     except OSError as e:
 | 
| 
 | 
   152                         print('exception occured:',e)
 | 
| 
 | 
   153                         continue
 | 
| 
 | 
   154                     i['proc'] = multiprocessing.Process(
 | 
| 
 | 
   155                         target=spawn(f),
 | 
| 
 | 
   156                         args=(i['pipe'][1], X[i['index']]),
 | 
| 
 | 
   157                         name=str(i['index']))
 | 
| 
 | 
   158                     i['proc'].start()
 | 
| 
 | 
   159                     i['status'] = 1
 | 
| 
 | 
   160                     running += 1
 | 
| 
 | 
   161         if finished == len(process_pool):
 | 
| 
 | 
   162             break
 | 
| 
 | 
   163         if sleep_time:
 | 
| 
 | 
   164             # sleep only if nothing changed in the last cycle
 | 
| 
 | 
   165             time.sleep(sleep_time)
 | 
| 
 | 
   166             # sleep time gradually increase to 1 sec
 | 
| 
 | 
   167             sleep_time = min(2 * sleep_time, 1)  
 | 
| 
 | 
   168     return output
 | 
| 
 | 
   169 
 | 
| 
 | 
   170 
 | 
| 
 | 
   171 def print_status(pp):
 | 
| 
 | 
   172     states = ['waiting', 'running', 'collected']
 | 
| 
 | 
   173     print("___________________________________")
 | 
| 
 | 
   174     print("jobid    status   group   ppn   exitcode")
 | 
| 
 | 
   175     print("===================================")
 | 
| 
 | 
   176     for i in pp:
 | 
| 
 | 
   177         print(
 | 
| 
 | 
   178             i['index'], "    ",
 | 
| 
 | 
   179             states[i['status']], "    ",
 | 
| 
 | 
   180             i['group'], "    ",
 | 
| 
 | 
   181             i['ppn'], "    ",
 | 
| 
 | 
   182             i['proc'].exitcode
 | 
| 
 | 
   183         )
 | 
| 
 | 
   184 
 | 
| 
 | 
   185 
 | 
| 
 | 
   186 def collect(pf, pp):
 | 
| 
 | 
   187     if pf.pid and not pf.exitcode and not pf.is_alive():
 | 
| 
 | 
   188         returnvalue = pp[0].recv()
 | 
| 
 | 
   189         pf.join()
 | 
| 
 | 
   190         pp[0].close()
 | 
| 
 | 
   191         pp[1].close()
 | 
| 
 | 
   192         return returnvalue
 | 
| 
 | 
   193     elif pf.exitcode:
 | 
| 
 | 
   194         print("job finished with exit code {}".format(pf.exitcode))
 | 
| 
 | 
   195         pf.join()
 | 
| 
 | 
   196         pp[0].close()
 | 
| 
 | 
   197         pp[1].close()
 | 
| 
 | 
   198         return None
 | 
| 
 | 
   199     # return None
 | 
| 
 | 
   200     else:
 | 
| 
 | 
   201         raise Exception('not collected')
 | 
| 
 | 
   202 
 | 
| 
 | 
   203 
 | 
| 
 | 
   204 def parmap(f, X):
 | 
| 
 | 
   205 
 | 
| 
 | 
   206     max_proc = get_max_proc()
 | 
| 
 | 
   207 
 | 
| 
 | 
   208     pipe = []
 | 
| 
 | 
   209     proc = []
 | 
| 
 | 
   210     returnvalue = {}
 | 
| 
 | 
   211 
 | 
| 
 | 
   212     for x, index in zip(X, list(range(len(X)))):
 | 
| 
 | 
   213         pipe.append(multiprocessing.Pipe())
 | 
| 
 | 
   214         proc.append(multiprocessing.Process(target=spawn(f),
 | 
| 
 | 
   215                                             args=(pipe[-1][1], x), name=str(index)))
 | 
| 
 | 
   216         p = proc[-1]
 | 
| 
 | 
   217         # count alive processes
 | 
| 
 | 
   218         while True:
 | 
| 
 | 
   219             running = 0
 | 
| 
 | 
   220             for i in proc:
 | 
| 
 | 
   221                 if i.is_alive():
 | 
| 
 | 
   222                     running += 1
 | 
| 
 | 
   223           #          print "running:"+str(running)
 | 
| 
 | 
   224             if running < max_proc:
 | 
| 
 | 
   225                 break
 | 
| 
 | 
   226             else:
 | 
| 
 | 
   227                 time.sleep(0.1)
 | 
| 
 | 
   228         p.start()
 | 
| 
 | 
   229         # print "process started:"+str(p.pid)
 | 
| 
 | 
   230         # check for finished
 | 
| 
 | 
   231 
 | 
| 
 | 
   232         for pf, pp, index in zip(proc, pipe, range(len(pipe))):
 | 
| 
 | 
   233             if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue):
 | 
| 
 | 
   234                 pf.join()
 | 
| 
 | 
   235                 returnvalue[str(pf.name)] = pp[0].recv()
 | 
| 
 | 
   236                 pp[0].close()
 | 
| 
 | 
   237                 pp[1].close()
 | 
| 
 | 
   238                 # proc must be garbage collected - to free all file connection
 | 
| 
 | 
   239                 del proc[index]
 | 
| 
 | 
   240                 del pipe[index]
 | 
| 
 | 
   241 
 | 
| 
 | 
   242     # collect the rest:
 | 
| 
 | 
   243     [pf.join() for pf in proc]
 | 
| 
 | 
   244     for pf, pp in zip(proc, pipe):
 | 
| 
 | 
   245         if pf.pid and not pf.exitcode and not pf.is_alive() and (pf.name not in returnvalue):
 | 
| 
 | 
   246             returnvalue[str(pf.name)] = pp[0].recv()
 | 
| 
 | 
   247             pp[0].close()
 | 
| 
 | 
   248             pp[1].close()
 | 
| 
 | 
   249     # convert to list in input correct order
 | 
| 
 | 
   250     returnvalue = [returnvalue[str(i)] for i in range(len(X))]
 | 
| 
 | 
   251     return returnvalue
 | 
| 
 | 
   252 
 | 
| 
 | 
   253 
 | 
| 
 | 
   254 def parallel2(command, *args, groups=None, ppn=None):
 | 
| 
 | 
   255     ''' same as parallel but groups are used to identifie mutually
 | 
| 
 | 
   256     exclusive jobs, jobs with the same goup id are never run together
 | 
| 
 | 
   257     ppn params is 'load' of the job - sum of loads cannot exceed 1
 | 
| 
 | 
   258     '''
 | 
| 
 | 
   259     # check args, expand if necessary
 | 
| 
 | 
   260     args = list(args)
 | 
| 
 | 
   261     N = [len(i) for i in args]  # lengths of lists
 | 
| 
 | 
   262     Mx = max(N)
 | 
| 
 | 
   263     if len(set(N)) == 1:
 | 
| 
 | 
   264         # all good
 | 
| 
 | 
   265         pass
 | 
| 
 | 
   266     elif set(N) == set([1, Mx]):
 | 
| 
 | 
   267         # expand args of length 1
 | 
| 
 | 
   268         for i in range(len(args)):
 | 
| 
 | 
   269             if len(args[i]) == 1:
 | 
| 
 | 
   270                 args[i] = args[i] * Mx
 | 
| 
 | 
   271     else:
 | 
| 
 | 
   272         raise ValueError
 | 
| 
 | 
   273     if not groups:
 | 
| 
 | 
   274         groups = range(Mx)
 | 
| 
 | 
   275     elif len(groups) != Mx:
 | 
| 
 | 
   276         print("length of groups must be same as number of job or None")
 | 
| 
 | 
   277         raise ValueError
 | 
| 
 | 
   278 
 | 
| 
 | 
   279     if not ppn:
 | 
| 
 | 
   280         ppn = [0] * Mx
 | 
| 
 | 
   281     elif len(ppn) != Mx:
 | 
| 
 | 
   282         print("length of ppn must be same as number of job or None")
 | 
| 
 | 
   283         raise ValueError
 | 
| 
 | 
   284     elif max(ppn) > 1 and min(ppn):
 | 
| 
 | 
   285         print("ppn values must be in 0 - 1 range")
 | 
| 
 | 
   286         raise ValueError
 | 
| 
 | 
   287     # convert argument to suitable format - 'transpose'
 | 
| 
 | 
   288     argsTuples = list(zip(*args))
 | 
| 
 | 
   289     args = [list(i) for i in argsTuples]
 | 
| 
 | 
   290 
 | 
| 
 | 
   291     # multiprocessing.Pool()
 | 
| 
 | 
   292 
 | 
| 
 | 
   293     def command_star(args):
 | 
| 
 | 
   294         return(command(*args))
 | 
| 
 | 
   295 
 | 
| 
 | 
   296     x = parmap2(command_star,  argsTuples, groups, ppn)
 | 
| 
 | 
   297     return x
 | 
| 
 | 
   298 
 | 
| 
 | 
   299 
 | 
| 
 | 
   300 def parallel(command, *args):
 | 
| 
 | 
   301     ''' Execute command in parallel using multiprocessing
 | 
| 
 | 
   302     command is the function to be executed
 | 
| 
 | 
   303     args is list of list of arguments
 | 
| 
 | 
   304     execution is :
 | 
| 
 | 
   305         command(args[0][0],args[1][0],args[2][0],args[3][0],....)
 | 
| 
 | 
   306         command(args[0][1],args[1][1],args[2][1],args[3][1],....)
 | 
| 
 | 
   307         command(args[0][2],args[1][2],args[2][2],args[3][2],....)
 | 
| 
 | 
   308         ...
 | 
| 
 | 
   309     output of command is returned as list
 | 
| 
 | 
   310     '''
 | 
| 
 | 
   311     # check args, expand if necessary
 | 
| 
 | 
   312     args = list(args)
 | 
| 
 | 
   313     N = [len(i) for i in args]  # lengths of lists
 | 
| 
 | 
   314     Mx = max(N)
 | 
| 
 | 
   315     if len(set(N)) == 1:
 | 
| 
 | 
   316         # all good
 | 
| 
 | 
   317         pass
 | 
| 
 | 
   318     elif set(N) == set([1, Mx]):
 | 
| 
 | 
   319         # expand args of length 1
 | 
| 
 | 
   320         for i in range(len(args)):
 | 
| 
 | 
   321             if len(args[i]) == 1:
 | 
| 
 | 
   322                 args[i] = args[i] * Mx
 | 
| 
 | 
   323     else:
 | 
| 
 | 
   324         raise ValueError
 | 
| 
 | 
   325 
 | 
| 
 | 
   326     # convert argument to suitable format - 'transpose'
 | 
| 
 | 
   327     argsTuples = list(zip(*args))
 | 
| 
 | 
   328     args = [list(i) for i in argsTuples]
 | 
| 
 | 
   329 
 | 
| 
 | 
   330     multiprocessing.Pool()
 | 
| 
 | 
   331 
 | 
| 
 | 
   332     def command_star(args):
 | 
| 
 | 
   333         return(command(*args))
 | 
| 
 | 
   334 
 | 
| 
 | 
   335     x = parmap(command_star, argsTuples)
 | 
| 
 | 
   336     return x
 | 
| 
 | 
   337 
 | 
| 
 | 
   338 
 | 
| 
 | 
   339 def worker(*a):
 | 
| 
 | 
   340     x = 0
 | 
| 
 | 
   341     y = 0
 | 
| 
 | 
   342     for i in a:
 | 
| 
 | 
   343         if i == 1.1:
 | 
| 
 | 
   344             print("raising exception")
 | 
| 
 | 
   345             s = 1 / 0
 | 
| 
 | 
   346         y += i
 | 
| 
 | 
   347         for j in range(10):
 | 
| 
 | 
   348             x += i
 | 
| 
 | 
   349             for j in range(100000):
 | 
| 
 | 
   350                 x = 1.0 / (float(j) + 1.0)
 | 
| 
 | 
   351     return(y)
 | 
| 
 | 
   352 
 | 
| 
 | 
   353 # test
 | 
| 
 | 
   354 if __name__ == "__main__":
 | 
| 
 | 
   355  #   x = parallel2(worker, [1], [2], [3], [4], [1], [1, 2, 3, 7, 10, 1.1, 20, 30, 40, 10, 30, 20, 40, 50, 50], [
 | 
| 
 | 
   356  #       3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2])
 | 
| 
 | 
   357 
 | 
| 
 | 
   358     x = parallel2(
 | 
| 
 | 
   359         worker, [1], [2], [3], [4], [1],
 | 
| 
 | 
   360         [1, 2, 3, 7, 10, 1.2, 20, 30, 40, 10, 30, 20, 40, 50, 50],
 | 
| 
 | 
   361         [3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 5, 6, 4, 3, 2],
 | 
| 
 | 
   362         groups=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
 | 
| 
 | 
   363         ppn=[0.6, 0.6, 0.2, 0.6, 0.2, 0.2, 0.4,
 | 
| 
 | 
   364              0.1, 0.1, 0.3, 0.3, 0.3, 0.1, 0.1, 0.1]
 | 
| 
 | 
   365     )
 | 
| 
 | 
   366     print(x)
 |