Mercurial > repos > petrn > repeatexplorer
comparison lib/assembly_tools.py @ 0:f6ebec6e235e draft
Uploaded
| author | petrn | 
|---|---|
| date | Thu, 19 Dec 2019 13:46:43 +0000 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:f6ebec6e235e | 
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 import sys | |
| 3 import logging | |
| 4 import subprocess | |
| 5 import os | |
| 6 import tempfile | |
| 7 import config | |
| 8 import shutil | |
| 9 import itertools | |
| 10 import pickle | |
| 11 import shlex | |
| 12 from lib.parallel.parallel import parallel2 as parallel | |
| 13 from lib.parallel.parallel import get_max_proc | |
| 14 | |
| 15 REQUIRED_VERSION = (3, 4) | |
| 16 MAX_BUFFER_SIZE = 100000 | |
| 17 if sys.version_info < REQUIRED_VERSION: | |
| 18 raise Exception("\n\npython 3.4 or higher is required!\n") | |
| 19 LOGGER = logging.getLogger(__name__) | |
| 20 MAX_FASTA_IN_DIRECTORY = 1000 | |
| 21 | |
| 22 def assembly(sequences, hitsort, clusters_info, assembly_dir, | |
| 23 contigs_file, min_size_of_cluster_for_assembly = 0): | |
| 24 ''' | |
| 25 Runs assembly on sequences (SequenceSet). Assembly is | |
| 26 performed on each cluster separatelly, clusters are taken | |
| 27 from hitsort(Graph) | |
| 28 Cluster_listing - list of Clusters | |
| 29 if cluster.tandem_rank is 1 or 2 - no assembly is performed!! | |
| 30 ''' | |
| 31 | |
| 32 # iterate over large clusters, assembly is performed on sequences stored in | |
| 33 # cluster_little[index].fasta_file_full - for annotated clusters | |
| 34 # sequences of small clusters are retrieved from database | |
| 35 fasta_seqs = [ | |
| 36 i.fasta_file_full | |
| 37 for i in clusters_info | |
| 38 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS | |
| 39 ] | |
| 40 prefixes = ["CL{}".format(i.index) | |
| 41 for i in clusters_info | |
| 42 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS] | |
| 43 LOGGER.info("Number of clusters for assembly: {}".format(hitsort.number_of_clusters)) | |
| 44 LOGGER.info("Assembling large clusters") | |
| 45 assembly_files = parallel(cap3worker, fasta_seqs, prefixes) | |
| 46 LOGGER.info("Large clusters assembled") | |
| 47 # some clusters - tanem rank 1 assembled | |
| 48 j = 0 | |
| 49 for cl in clusters_info: | |
| 50 if cl.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS: | |
| 51 cl.assembly_files = {i: None for i in config.CAP3_FILES_MAPPING} | |
| 52 consensus_file = cl.dir + "/tarean_contigs.fasta" | |
| 53 cl.assembly_files["{}.{}.contigs"] = consensus_file | |
| 54 with open(cl.dir_tarean + "/tarean_contigs.fasta", | |
| 55 'r') as fin, open(consensus_file, 'w') as fout: | |
| 56 for line in fin: | |
| 57 if line[0] == ">": | |
| 58 line = ">CL{}Contig{}".format(cl.index, line[1:]) | |
| 59 fout.write(line) | |
| 60 else: | |
| 61 cl.assembly_files = assembly_files[j] | |
| 62 j += 1 | |
| 63 # assembly of small clusters: | |
| 64 # connection to files were results will be concatenated | |
| 65 LOGGER.info("Assembly of small cluster - making tmp files") | |
| 66 | |
| 67 tmp_dir_root = tempfile.mkdtemp() | |
| 68 tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root) | |
| 69 nproc = get_max_proc() | |
| 70 prefixes = [[] for i in range(nproc)] | |
| 71 tmp_seq_files = [[] for i in range(nproc)] | |
| 72 | |
| 73 LOGGER.info("Assembly of small clusters - saving small cluster to tmp files") | |
| 74 seq_dictionary = sequences.toDict() | |
| 75 fasta_count = 0 | |
| 76 chunk_counter = itertools.cycle(range(nproc)) | |
| 77 for index in range(len(clusters_info) + 1, hitsort.number_of_clusters): | |
| 78 chunk = next(chunk_counter) | |
| 79 ids = hitsort.get_cluster_reads(index) | |
| 80 if len(ids) < min_size_of_cluster_for_assembly: | |
| 81 break | |
| 82 prefixes[chunk].append("CL{}".format(index)) | |
| 83 fasta_count += 1 | |
| 84 if fasta_count > MAX_FASTA_IN_DIRECTORY: | |
| 85 # create new subdir to keep number of files in directory low | |
| 86 fasta_count = 1 | |
| 87 tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root) | |
| 88 fasta_file_name = "{}/{}".format(tmp_subdir, index) | |
| 89 write_seqDict_to_fasta(file_name=fasta_file_name, sequences=seq_dictionary, subset=ids) | |
| 90 tmp_seq_files[chunk].append(fasta_file_name) | |
| 91 del seq_dictionary | |
| 92 LOGGER.info("Assembly of small clusters running") | |
| 93 pickled_fparts_small_contigs = parallel(cap3worker_multiple, tmp_seq_files, prefixes) | |
| 94 LOGGER.info("Assembly of small clusters done") | |
| 95 # append to all results | |
| 96 LOGGER.info("Assembly of small clusters - appending results") | |
| 97 all_con = {} | |
| 98 for fkey in config.CAP3_FILES_MAPPING: | |
| 99 if config.CAP3_FILES_MAPPING[fkey]: | |
| 100 all_con[fkey] = open("{}/{}".format( | |
| 101 assembly_dir, config.CAP3_FILES_MAPPING[fkey]), 'w') | |
| 102 else: | |
| 103 all_con[fkey] = None | |
| 104 for pickled_fparts_multiple in pickled_fparts_small_contigs: | |
| 105 with open(pickled_fparts_multiple, 'rb') as fp: | |
| 106 fparts_multiple = pickle.load(fp) | |
| 107 os.unlink(pickled_fparts_multiple) | |
| 108 for fparts in fparts_multiple: | |
| 109 for fkey in fparts: | |
| 110 if all_con[fkey]: | |
| 111 with open(fparts[fkey]) as f: | |
| 112 all_con[fkey].write(f.read()) | |
| 113 os.unlink(fparts[fkey]) | |
| 114 else: | |
| 115 os.unlink(fparts[fkey]) | |
| 116 | |
| 117 for fkey in all_con: | |
| 118 if all_con[fkey]: | |
| 119 all_con[fkey].close() | |
| 120 LOGGER.info("Assembling - copying files to final location") | |
| 121 # copy all contigs to one location: - contigs_file | |
| 122 with open(contigs_file, 'w') as fout: | |
| 123 for cl in clusters_info: | |
| 124 # append contig from large clusters | |
| 125 with open(cl.assembly_files["{}.{}.contigs"], 'r') as fin: | |
| 126 for i in fin: | |
| 127 fout.write(i) | |
| 128 # append contigs from small clusters | |
| 129 small_aln_file = "{}/{}".format( | |
| 130 assembly_dir, config.CAP3_FILES_MAPPING['{}.{}.aln']) | |
| 131 # calculate profile on aln file of small clusters | |
| 132 file_base_name = small_aln_file[:-4] | |
| 133 os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=small_aln_file, out=file_base_name)) | |
| 134 os.system("select_and_sort_contigs.pl {fn}.info.fasta 5 2>&1".format(fn=file_base_name)) | |
| 135 small_contig_file = file_base_name + ".info.fasta" | |
| 136 with open(small_contig_file, 'r') as fin: | |
| 137 for i in fin: | |
| 138 fout.write(i) | |
| 139 shutil.rmtree(tmp_dir_root) | |
| 140 | |
| 141 def write_seqDict_to_fasta(file_name, sequences, subset): | |
| 142 with open(file_name, 'w') as f: | |
| 143 for i in subset: | |
| 144 f.write(">{}\n{}\n".format(i, sequences[i])) | |
| 145 | |
| 146 | |
| 147 | |
| 148 | |
| 149 def cap3worker(seqfile, prefix="cap", cap3args=" -p 80 -o 40 "): | |
| 150 prefix2 = "cap" | |
| 151 cmd = "cap3 " + seqfile + cap3args + " -x " + prefix2 | |
| 152 with open(seqfile + "." + prefix2 + ".aln", "w") as aln: | |
| 153 subprocess.check_call(shlex.split(cmd), shell=False, stdout=aln) | |
| 154 # this generate three files | |
| 155 files_dict = {} | |
| 156 for fkey in config.CAP3_FILENAMES: | |
| 157 fn = fkey.format(seqfile, prefix2) | |
| 158 fn_tmp = "{}.tmp".format(fn) | |
| 159 files_dict[fkey] = fn | |
| 160 if config.CAP3_PATTERNS_REPLACE[fkey]: | |
| 161 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey] | |
| 162 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out: | |
| 163 for line in con_in: | |
| 164 con_out.write(line.replace(pattern, replacement.format( | |
| 165 prefix))) | |
| 166 os.rename(fn_tmp, fn) | |
| 167 | |
| 168 # make new meaningful names here | |
| 169 | |
| 170 for fkey in config.CAP3_FILES_GOODNAMES: | |
| 171 config.CAP3_FILES_GOODNAMES[fkey] | |
| 172 fn_goodname = os.path.dirname(files_dict[fkey]) + "/" + config.CAP3_FILES_GOODNAMES[fkey] | |
| 173 os.rename(files_dict[fkey], fn_goodname) | |
| 174 files_dict[fkey] = fn_goodname | |
| 175 | |
| 176 aln_file = files_dict["{}.{}.aln"] | |
| 177 file_base_name = aln_file[:-4] | |
| 178 os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=aln_file, out=file_base_name)) | |
| 179 os.system("select_and_sort_contigs.pl {fn}.info.fasta 5".format(fn=file_base_name)) | |
| 180 # TODO -add new file to files_dict | |
| 181 # replace simple fasta with info.fasta | |
| 182 files_dict["{}.{}.contigs"] = file_base_name + ".info.fasta" | |
| 183 return files_dict | |
| 184 | |
| 185 def cap3worker_multiple(many_seqfile, many_prefixes, cap3args=" -p 80 -o 40 "): | |
| 186 ''' | |
| 187 purpose of this script is to run multiple assemblies within single process | |
| 188 avoiding running high number of short parallel subprocesses | |
| 189 As starting subprocess for each cap3 assembly was very ineffective, | |
| 190 all ap3 commands are written to single file and run using single subprocess | |
| 191 command - | |
| 192 ''' | |
| 193 cmd_file = tempfile.NamedTemporaryFile(mode="w",delete=False).name | |
| 194 with open(cmd_file,'w') as cmdf: | |
| 195 for seqfile, prefix in zip(many_seqfile, many_prefixes): | |
| 196 cmd = "cap3 " + seqfile + cap3args + " -x " + prefix + " > " + seqfile + "." + prefix + ".aln\n" | |
| 197 cmdf.write(cmd) | |
| 198 os.system("sh "+ cmd_file) | |
| 199 # collect results: | |
| 200 files_dict_many = [] | |
| 201 for seqfile, prefix in zip(many_seqfile, many_prefixes): | |
| 202 files_dict = {} | |
| 203 for fkey in config.CAP3_FILENAMES: | |
| 204 fn = fkey.format(seqfile, prefix) | |
| 205 fn_tmp = "{}.tmp".format(fn) | |
| 206 files_dict[fkey] = fn | |
| 207 if config.CAP3_PATTERNS_REPLACE[fkey]: | |
| 208 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey] | |
| 209 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out: | |
| 210 for line in con_in: | |
| 211 con_out.write(line.replace(pattern, replacement.format( | |
| 212 prefix))) | |
| 213 os.rename(fn_tmp, fn) | |
| 214 files_dict_many.append(files_dict) | |
| 215 # this is too large to be return directly - use picking | |
| 216 f = tempfile.NamedTemporaryFile(delete=False) | |
| 217 os.unlink(cmd_file) | |
| 218 pickle.dump(files_dict_many,file=f) | |
| 219 return f.name | |
| 220 | |
| 221 | 
