comparison tools/protein_analysis/seq_analysis_utils.py @ 30:6d9d7cdf00fc draft

v0.2.11 Job splitting fast-fail; RXLR tools supports HMMER2 from BioConda; Capture more version information; misc internal changes
author peterjc
date Thu, 21 Sep 2017 11:15:55 -0400
parents 3cb02adf4326
children 20da7f48b56f
comparison
equal deleted inserted replaced
29:3cb02adf4326 30:6d9d7cdf00fc
5 5
6 Given Galaxy currently supports Python 2.4+ this cannot use the Python module 6 Given Galaxy currently supports Python 2.4+ this cannot use the Python module
7 multiprocessing so the function run_jobs instead is a simple pool approach 7 multiprocessing so the function run_jobs instead is a simple pool approach
8 using just the subprocess library. 8 using just the subprocess library.
9 """ 9 """
10 import sys 10
11 from __future__ import print_function
12
11 import os 13 import os
12 import subprocess 14 import subprocess
15 import sys
16
13 from time import sleep 17 from time import sleep
14 18
15 __version__ = "0.0.2" 19 __version__ = "0.0.2"
16 20
17 try: 21 try:
45 else: 49 else:
46 raise NotImplementedError('cannot determine number of cpus') 50 raise NotImplementedError('cannot determine number of cpus')
47 51
48 52
49 def thread_count(command_line_arg, default=1): 53 def thread_count(command_line_arg, default=1):
54 """Determine number of threads to use from the command line args."""
50 try: 55 try:
51 num = int(command_line_arg) 56 num = int(command_line_arg)
52 except ValueError: 57 except ValueError:
53 num = default 58 num = default
54 if num < 1: 59 if num < 1:
135 for i in range(0, len(seq), 60): 140 for i in range(0, len(seq), 60):
136 handle.write(seq[i:i + 60] + "\n") 141 handle.write(seq[i:i + 60] + "\n")
137 handle.close() 142 handle.close()
138 files.append(new_filename) 143 files.append(new_filename)
139 # print "%i records in %s" % (len(records), new_filename) 144 # print "%i records in %s" % (len(records), new_filename)
140 except ValueError, err: 145 except ValueError as err:
141 # Max length failure from parser - clean up 146 # Max length failure from parser - clean up
142 try: 147 try:
143 handle.close() 148 handle.close()
144 except Exception: 149 except Exception:
145 pass 150 pass
150 for f in files: 155 for f in files:
151 assert os.path.isfile(f), "Missing split file %r (!??)" % f 156 assert os.path.isfile(f), "Missing split file %r (!??)" % f
152 return files 157 return files
153 158
154 159
155 def run_jobs(jobs, threads, pause=10, verbose=False): 160 def run_jobs(jobs, threads, pause=10, verbose=False, fast_fail=True):
156 """Takes list of cmd strings, returns dict with error levels.""" 161 """Takes list of cmd strings, returns dict with error levels."""
157 pending = jobs[:] 162 pending = jobs[:]
158 running = [] 163 running = []
159 results = {} 164 results = {}
165 skipped = []
160 if threads == 1: 166 if threads == 1:
161 # Special case this for speed, don't need the waits 167 # Special case this for speed, don't need the waits
162 for cmd in jobs: 168 for cmd in jobs:
163 results[cmd] = subprocess.call(cmd, shell=True) 169 results[cmd] = subprocess.call(cmd, shell=True)
164 return results 170 return results
171 failed = False
165 while pending or running: 172 while pending or running:
166 # See if any have finished 173 # See if any have finished
167 for (cmd, process) in running: 174 for (cmd, process) in running:
168 return_code = process.poll() # non-blocking 175 return_code = process.poll() # non-blocking
169 if return_code is not None: 176 if return_code is not None:
170 results[cmd] = return_code 177 results[cmd] = return_code
178 if return_code:
179 failed = True
171 running = [(cmd, process) for (cmd, process) in running 180 running = [(cmd, process) for (cmd, process) in running
172 if cmd not in results] 181 if cmd not in results]
173 if verbose: 182 if verbose:
174 print "%i jobs pending, %i running, %i completed" \ 183 print("%i jobs pending, %i running, %i completed" %
175 % (len(pending), len(running), len(results)) 184 (len(pending), len(running), len(results)))
176 # See if we can start any new threads 185 # See if we can start any new threads
186 if pending and failed and fast_fail:
187 # Don't start any more jobs
188 if verbose:
189 print("Failed, will not start remaining %i jobs" % len(pending))
190 skipped = pending
191 pending = []
177 while pending and len(running) < threads: 192 while pending and len(running) < threads:
178 cmd = pending.pop(0) 193 cmd = pending.pop(0)
179 if verbose: 194 if verbose:
180 print cmd 195 print(cmd)
181 process = subprocess.Popen(cmd, shell=True) 196 process = subprocess.Popen(cmd, shell=True)
182 running.append((cmd, process)) 197 running.append((cmd, process))
183 # Loop... 198 # Loop...
184 sleep(pause) 199 sleep(pause)
185 if verbose: 200 if verbose:
186 print "%i jobs completed" % len(results) 201 print("%i jobs completed" % len(results))
187 assert set(jobs) == set(results) 202 assert set(jobs) == set(results).union(skipped)
188 return results 203 return results