Mercurial > repos > trinity_ctat > trinity
comparison trinity_wrapper.py @ 0:f571ca145e9d draft
Initial commit with concatenate and trinity tools. Still testing.
| author | trinity_ctat |
|---|---|
| date | Mon, 11 Sep 2017 16:42:11 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:f571ca145e9d |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 | |
| 3 ''' | |
| 4 trinity_runner.py | |
| 5 This program is used as a wrapper for Trinity to allow an automatic rerun of failed jobs. It takes arguments for a typical Trinity run: | |
| 6 ~ Required args ~ | |
| 7 Input files - single or paired (left and right) | |
| 8 File type (fasta, fastq) | |
| 9 Max memory - this I need to derive somehow from the dynamic runner using Galaxy slots | |
| 10 | |
| 11 ~ Optional args ~ | |
| 12 Output directory - this allows users to run the same job over in case it walltime'd out or failed for recoverable reasons. | |
| 13 | |
| 14 -- | |
| 15 Created Tuesday, 7 March 2017. | |
| 16 Carrie Ganote | |
| 17 | |
| 18 Licensed to Indiana University under Creative Commons 3.0 | |
| 19 ''' | |
| 20 import subprocess32 | |
| 21 import argparse | |
| 22 import logging as log | |
| 23 import sys | |
| 24 import os | |
| 25 import errno | |
| 26 from datetime import datetime | |
| 27 | |
| 28 TRINITY_OUT_DIR = "trinity_out_dir" | |
| 29 | |
| 30 def main(*args): | |
| 31 parser = argparse.ArgumentParser(description="") | |
| 32 parser.add_argument("-o","--output", help="Name of output directory") | |
| 33 parser.add_argument("-q","--seqType", help="Type of reads; fa or fq") | |
| 34 parser.add_argument("-m","--max_memory", help="How much memory to allocate? Or maybe how many cpus?") | |
| 35 parser.add_argument("-p","--mem_per_cpu", help="Memory PER CPU, in GB, in case we want to multiply mem x cpu at runtime") | |
| 36 parser.add_argument("-s","--single", help="Single read file input") | |
| 37 parser.add_argument("-l","--left", help="Left read file from paired inputs") | |
| 38 parser.add_argument("-r","--right", help="Right read file from paired inputs") | |
| 39 parser.add_argument("-v","--verbose", help="Enable debugging messages to be displayed", action='store_true') | |
| 40 parser.add_argument("-g","--log", help="Log file") | |
| 41 parser.add_argument("-t","--timing", help="Timing file, if it exists", default=None) | |
| 42 parser.add_argument("-d","--dir", help="if supplying a rerunnable job, this is the (hopefully unique) name of the directory to run it in.") | |
| 43 parser.add_argument("-u","--user", help="Username to run job under") | |
| 44 parser.add_argument("-f","--fullpath", help="if supplying a rerunnable job, this is the full path (except the user and dir names) to run the job in.") | |
| 45 parser.add_argument("-c","--CPU", help="CPUs, either a hard coded numer or from Galaxy slots") | |
| 46 # parser.add_argument("-","--", help="") | |
| 47 args = parser.parse_args() | |
| 48 | |
| 49 if args.verbose: | |
| 50 log.basicConfig(format='%(message)s',level=log.DEBUG) | |
| 51 cmd = ["Trinity"] | |
| 52 | |
| 53 ### Add rerun ability ########################################### | |
| 54 # This variable tells us later whether to copy the files back to the job working directory | |
| 55 copyback = False | |
| 56 if args.dir and args.user and args.fullpath: | |
| 57 cleandir = args.dir | |
| 58 chars = "\\`*_{}[]()>#+-.!$&;| " | |
| 59 for c in chars: | |
| 60 if c in cleandir: | |
| 61 cleandir = cleandir.replace(c, "_") | |
| 62 rerunPath = "%s/%s/%s" % (args.fullpath, args.user, cleandir) | |
| 63 print "Rerunpath is ",rerunPath | |
| 64 try: | |
| 65 os.makedirs(rerunPath) | |
| 66 print "Created dir ",rerunPath | |
| 67 except OSError as exc: | |
| 68 if exc.errno == errno.EEXIST and os.path.isdir(rerunPath): | |
| 69 pass | |
| 70 else: | |
| 71 raise | |
| 72 copyback = os.getcwd() | |
| 73 outdir = copyback + "/" + TRINITY_OUT_DIR | |
| 74 try: | |
| 75 os.makedirs(outdir) | |
| 76 print "Created dir ",outdir | |
| 77 except OSError as exc: | |
| 78 if exc.errno == errno.EEXIST and os.path.isdir(outdir): | |
| 79 pass | |
| 80 else: | |
| 81 raise | |
| 82 os.chdir(rerunPath) | |
| 83 | |
| 84 ### Add information for reads ################################### | |
| 85 if args.left and args.right: | |
| 86 cmd += ["--left",args.left,"--right", args.right] | |
| 87 elif args.single: | |
| 88 cmd += ["--single",args.single] | |
| 89 else: | |
| 90 raise Exception ("Need input files in order to run Trinity!") | |
| 91 | |
| 92 ### Add seqtype ################################################## | |
| 93 if args.seqType: | |
| 94 cmd += ["--seqType",args.seqType] | |
| 95 else: | |
| 96 raise Exception ("Please specify a file type for your reads!") | |
| 97 | |
| 98 ### Memory and CPU management #################################### | |
| 99 if args.mem_per_cpu and not args.max_memory: | |
| 100 if args.CPU: | |
| 101 memry = int(args.CPU) * int(args.mem_per_cpu) | |
| 102 memstr = "%dG" % (memry) | |
| 103 cmd += ["--max_memory",memstr] | |
| 104 else: | |
| 105 memry = 2 * int(args.mem_per_cpu) | |
| 106 memstr = "%dG" % (memry) | |
| 107 cmd += ["--max_memory",memstr] | |
| 108 elif args.max_memory and not args.mem_per_cpu: | |
| 109 cmd += ["--max_memory",args.max_memory] | |
| 110 else: | |
| 111 raise Exception ("Please pick Memory per cpu, or max mem, but not both.") | |
| 112 if args.CPU: | |
| 113 cmd += ["--CPU", args.CPU] | |
| 114 | |
| 115 ### Enough args, let's run it #################################### | |
| 116 print "About to write to %s" % args.log | |
| 117 out = open(args.log, 'w') | |
| 118 totalattempts = attempts = 2 | |
| 119 ec = 1 | |
| 120 finish = 1 | |
| 121 out.write("Command is:\n%s\n" % (" ".join(cmd))) | |
| 122 | |
| 123 ### There is definitely some value in running the job more than once, especially if it dies for stupid reasons.. ### | |
| 124 while ec != 0 and attempts > 0 and finish != 0: | |
| 125 | |
| 126 dt = datetime.now() | |
| 127 dtstr = dt.strftime("%d/%m/%y %H:%M") | |
| 128 out.write("Beginning attempt %d of Trinity job at %s\n" % (totalattempts - attempts +1, dtstr) ) | |
| 129 attempts -= 1 | |
| 130 ec = subprocess32.call(cmd, shell=False, stdin=None, stdout=out, stderr=out, timeout=None) | |
| 131 out.write("Trinity exited with status %d\n" % ec) | |
| 132 | |
| 133 greplog = open("greplog", 'w') | |
| 134 cmds = ["grep", 'All commands completed successfully', args.log] | |
| 135 finish = subprocess32.call(cmds,shell=False, stdin=None, stdout=greplog, stderr=greplog, timeout=None) | |
| 136 greplog.close() | |
| 137 out.write("Finished and found the success command with grep code %d\n" % finish) | |
| 138 | |
| 139 if ec == 0 and args.timing is not None: | |
| 140 if copyback is not False: | |
| 141 cwd = os.getcwd() | |
| 142 dest = copyback + "/" + TRINITY_OUT_DIR + "/Trinity.fasta" | |
| 143 src = cwd + "/" + TRINITY_OUT_DIR + "/Trinity.fasta" | |
| 144 print "copying trinity outputs from %s to %s" % (src, dest) | |
| 145 os.symlink(src, dest) | |
| 146 | |
| 147 #copy the timing file into the log | |
| 148 try: | |
| 149 handle = open (args.timing, 'r') | |
| 150 for line in handle: | |
| 151 out.write(line) | |
| 152 handle.close() | |
| 153 except (OSError, IOError) as e: | |
| 154 print "Oops, no timing file found? ",e | |
| 155 | |
| 156 | |
| 157 out.close() | |
| 158 exit (ec) | |
| 159 | |
| 160 if __name__ == "__main__": | |
| 161 main(*sys.argv) | |
| 162 |
