Mercurial > repos > petrn > repeatexplorer
comparison lib/seqtools.py @ 0:f6ebec6e235e draft
Uploaded
| author | petrn |
|---|---|
| date | Thu, 19 Dec 2019 13:46:43 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:f6ebec6e235e |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 import logging | |
| 3 logger = logging.getLogger(__name__) | |
| 4 import itertools | |
| 5 import os | |
| 6 import sys | |
| 7 import random | |
| 8 import sqlite3 | |
| 9 import subprocess | |
| 10 import shlex # for command line arguments split | |
| 11 from collections import namedtuple | |
| 12 from lib.utils import format_query | |
| 13 from lib.parallel.parallel import parallel2 as parallel | |
| 14 from lib.parallel.parallel import get_max_proc | |
| 15 REQUIRED_VERSION = (3, 4) | |
| 16 MAX_PRINT = 10 | |
| 17 | |
| 18 if sys.version_info < REQUIRED_VERSION: | |
| 19 raise Exception("\n\npython 3.4 or higher is required!\n") | |
| 20 | |
| 21 # additional functions | |
| 22 | |
| 23 | |
| 24 | |
| 25 def _hitsort_worker(query, blastdb, output, options): | |
| 26 | |
| 27 # output from blast is parsed from stdout | |
| 28 cmd = options.all2all_search_params.format(query = query, blastdb = blastdb) | |
| 29 print(cmd) | |
| 30 min_lcov, min_pid, min_ovl, min_scov, evalue_max = options.filtering_threshold | |
| 31 pair2insert = '' | |
| 32 signs ={'plus':'+', 'minus':'-'} | |
| 33 with open(output, 'w') as f: | |
| 34 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p: | |
| 35 for i in p.stdout: | |
| 36 items = i.decode('utf-8').split() | |
| 37 if options.filter_self_hits: | |
| 38 if items[4] == items[0]: | |
| 39 continue | |
| 40 evalue = float(items[10]) | |
| 41 ovl_q = abs(float(items[2]) - float(items[3])) + 1 | |
| 42 ovl_h = abs(float(items[6]) - float(items[7])) + 1 | |
| 43 if (ovl_q >= min_ovl or ovl_h >= min_ovl) and float(items[8]) >= min_pid: | |
| 44 if float(items[1]) >= float(items[5]): | |
| 45 lcov = ovl_q * 100.0 / float(items[1]) | |
| 46 scov = ovl_h * 100.0 / float(items[5]) | |
| 47 else: | |
| 48 lcov = ovl_q * 100.0 / float(items[5]) | |
| 49 scov = ovl_h * 100.0 / float(items[1]) | |
| 50 # positive line: | |
| 51 if lcov >= min_lcov and scov >= min_scov and evalue_max > evalue: | |
| 52 pr = [items[0], items[4]] | |
| 53 # previous HSP | |
| 54 if pair2insert != "{0}\t{1}".format(pr[0], pr[1]): | |
| 55 pair2insert = "{0}\t{1}".format(pr[0], pr[1]) | |
| 56 if items[11] in ['plus', 'minus']: | |
| 57 items[11] = signs[items[11]] | |
| 58 f.write("{0}\t{1}\t{2}\n".format(pair2insert, items[9], "\t".join( | |
| 59 items[k] for k in [1, 2, 3, 5, 6, 7, 8, 10, 11]))) | |
| 60 | |
| 61 def blast_with_filter(fasta_file_filter, blastdb): | |
| 62 ''' | |
| 63 Return list of sequences query id which has similarity to filter | |
| 64 It uses mgblast for search | |
| 65 ''' | |
| 66 params = '-p 85 -W18 -UT -X40 -KT -JF -F "m D" -v100000000 -b100000000 -D4 -C 30 -H 30' | |
| 67 positive_hits = set() | |
| 68 min_pid = 90 | |
| 69 min_ovl_perc = 90 | |
| 70 cmd = " ".join(['mgblast', | |
| 71 '-i', fasta_file_filter, | |
| 72 '-d', blastdb, | |
| 73 params | |
| 74 ]) | |
| 75 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p: | |
| 76 for i in p.stdout: | |
| 77 items = i.decode('utf-8').split() | |
| 78 ovl_perc = (abs(float(items[6]) - float(items[7])) + 1) / float(items[5]) * 100 | |
| 79 pid = float(items[8]) | |
| 80 if pid > min_pid and ovl_perc > min_ovl_perc: | |
| 81 positive_hits.add(items[4]) | |
| 82 return(positive_hits) | |
| 83 | |
| 84 | |
| 85 | |
| 86 # TODO test task | |
| 87 # predefine blast params | |
| 88 def blast_worker(query, blastdb, output, params): | |
| 89 if params['program'] in ['blastx', 'blastn']: | |
| 90 default_columns = ' -outfmt "6 {}"'.format(params['output_columns']) | |
| 91 cmd = "{} -query {} -db {} {} {}".format( | |
| 92 params['program'], query, blastdb, params['args'], default_columns) | |
| 93 elif params['program']=='diamond blastx': | |
| 94 # diomand have slightly different format than blastx | |
| 95 default_columns = ' --outfmt 6 {}'.format(params['output_columns']) | |
| 96 cmd = "{} -q {} -d {} {} {}".format( | |
| 97 params['program'], query, blastdb, params['args'], default_columns) | |
| 98 print(cmd) | |
| 99 preceeding_pair = ['', ''] | |
| 100 BlastValues = namedtuple("BlastValues", params['output_columns']) | |
| 101 with open(output, 'wb') as f: | |
| 102 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p: | |
| 103 for i in p.stdout: | |
| 104 items = i.decode('utf-8').split() | |
| 105 blast_values = BlastValues(*[f(i) for i, f in zip(items, params['column_types'])]) | |
| 106 #qseqid, sseqid, qlen, slen, length, ppos, bitscore = [ | |
| 107 # f(i) for i, f in zip(items, params['column_types'])] | |
| 108 if params['filter_function'](blast_values): | |
| 109 if preceeding_pair != [blast_values.qseqid, blast_values.sseqid]: | |
| 110 f.write(i) | |
| 111 preceeding_pair = [blast_values.qseqid, blast_values.sseqid] | |
| 112 | |
| 113 | |
| 114 class Sequence: | |
| 115 | |
| 116 def __init__(self, seq, name, paired=False): | |
| 117 # the mode os seq storage can be changed later to make it more | |
| 118 # memory efficient | |
| 119 self._seq = bytes(str(seq), "ascii") | |
| 120 self.name = str(name) | |
| 121 | |
| 122 @property | |
| 123 def seq(self): | |
| 124 return self._seq.decode("utf-8") | |
| 125 | |
| 126 @seq.setter | |
| 127 def seq(self, value): | |
| 128 self._seq = bytes(str(value), "ascii") | |
| 129 | |
| 130 def __str__(self): | |
| 131 return "{0} : {1}".format(self.name, self.seq) | |
| 132 | |
| 133 @staticmethod | |
| 134 def read_fasta(fasta_file_name): | |
| 135 ''' | |
| 136 generator - reads sequences from fasta file | |
| 137 return sequence one by one | |
| 138 ''' | |
| 139 with open(fasta_file_name, 'r') as f: | |
| 140 header = None | |
| 141 seqstr = None | |
| 142 for rawline in f: | |
| 143 line = rawline.strip() | |
| 144 if line == "": | |
| 145 continue | |
| 146 if line[0] == ">": | |
| 147 if header and seqstr: | |
| 148 yield Sequence(seqstr, header) | |
| 149 # reset | |
| 150 seqstr = None | |
| 151 header = line[1:] | |
| 152 elif seqstr: | |
| 153 Warning("sequence was not preceeded by header") | |
| 154 else: | |
| 155 header = line[1:] | |
| 156 else: | |
| 157 seqstr = line if not seqstr else seqstr + line | |
| 158 # skip empty lines: | |
| 159 if header and seqstr: | |
| 160 yield Sequence(seqstr, header) | |
| 161 return | |
| 162 | |
| 163 def write2fasta(self, file_object): | |
| 164 file_object.write(">{0}\n{1}\n".format(self.name, self.seq)) | |
| 165 | |
| 166 | |
| 167 class SequenceSet: | |
| 168 | |
| 169 def __init__(self, filename=None, source=None, sample_size=0, new=False, paired=False, prefix_length=0, rename=False, seed=123, fasta=None): | |
| 170 ''' | |
| 171 filename: path to sqlite database, if none database is stored in memory | |
| 172 source: path to fasta file, if none empty database is created | |
| 173 sample_size: use only sample of sample_size from fasta file, if 0 all is used | |
| 174 new: should be old database created? | |
| 175 paired - paired reads | |
| 176 prefix_length -int | |
| 177 rename -- use int as names | |
| 178 | |
| 179 creates SequenceSet, either empty or from fasta file, | |
| 180 it can be stored as dictionary or in shelve in file | |
| 181 only sample of all sequences can be loaded. if sample_size = 0 | |
| 182 all sequences are loaded | |
| 183 if source is give, new database is created - if filename exist - it will be deleted! | |
| 184 ''' | |
| 185 #====================================================================== | |
| 186 # TODO name checking | |
| 187 # TODO detect unique prefixes | |
| 188 #====================================================================== | |
| 189 # type of storage | |
| 190 random.seed(seed) | |
| 191 | |
| 192 self.source = source | |
| 193 self.sample_size = sample_size | |
| 194 self.paired = paired | |
| 195 self.filename = filename | |
| 196 self.prefix_length = prefix_length | |
| 197 self.prefix_codes = {} | |
| 198 self.rename = rename | |
| 199 self.fasta = fasta | |
| 200 self._length = None | |
| 201 self._original_length = None | |
| 202 self.blastdb = None | |
| 203 self.blastdb_legacy = None | |
| 204 self.chunks = None | |
| 205 self.ids_kept = None | |
| 206 self.annotations = [] | |
| 207 self.hitsort = None | |
| 208 if filename: | |
| 209 logger.debug("Creating database in file") | |
| 210 if os.path.isfile(filename) and (new or source): | |
| 211 # remove old database if exists | |
| 212 os.remove(filename) | |
| 213 # sqlite database in file | |
| 214 self.conn = sqlite3.connect(filename) | |
| 215 else: | |
| 216 # sqlite database in memory | |
| 217 logger.debug("Creating database in memory") | |
| 218 self.conn = sqlite3.connect(":memory:") | |
| 219 c = self.conn.cursor() | |
| 220 c.execute("create table sequences (name text, sequence text)") | |
| 221 | |
| 222 if source: | |
| 223 self._read_from_fasta() | |
| 224 c.execute("CREATE TABLE prefix_codes (prefix TEXT, N INTEGER)") | |
| 225 self.conn.executemany("INSERT INTO prefix_codes (prefix, N) values (?,?)", self.prefix_codes.items()) | |
| 226 self.conn.commit() | |
| 227 self._commit_immediately = True | |
| 228 self.makeblastdb(legacy=True) | |
| 229 # store some attribudes in database | |
| 230 self._update_database() | |
| 231 logger.info("sequences loaded") | |
| 232 logger.info(self) | |
| 233 | |
| 234 def _update_database(self): | |
| 235 # pass all atributes - which are either float, ind and str to extra table | |
| 236 c = self.conn.cursor() | |
| 237 stored_attributes = [ | |
| 238 ("sample_size", "integer"), | |
| 239 ("_length", "integer"), | |
| 240 ("_original_length", "integer"), | |
| 241 ("paired", "integer"), | |
| 242 ] | |
| 243 columns = ", ".join(i[0] + " " + i[1] for i in stored_attributes) | |
| 244 try: | |
| 245 c.execute(( | |
| 246 "CREATE TABLE seqinfo ( {} )".format(columns) | |
| 247 )) | |
| 248 except sqlite3.OperationalError: | |
| 249 pass # table already exists | |
| 250 # get current values | |
| 251 values = tuple(getattr(self, i[0]) for i in stored_attributes) | |
| 252 placeholder = ", ".join(["?"] * len(values)) | |
| 253 c.execute("DELETE FROM seqinfo") | |
| 254 c.execute("INSERT INTO seqinfo VALUES ({})".format(placeholder), values) | |
| 255 | |
| 256 | |
| 257 def _read_from_fasta(self): | |
| 258 # changed will be commited after whole file is loaded - this is faster | |
| 259 self._commit_immediately = False | |
| 260 if self.fasta: | |
| 261 f = open(self.fasta, mode='w') | |
| 262 # define sampling | |
| 263 if self.sample_size: | |
| 264 logger.debug( | |
| 265 'sampling sequences: {} sample size'.format(self.sample_size)) | |
| 266 N = self.fasta_length(self.source) | |
| 267 if self.paired: | |
| 268 s = itertools.chain( | |
| 269 [[i, i + 1] for i in sorted(random.sample(range(1, N, 2), int(self.sample_size / 2)))]) | |
| 270 | |
| 271 sample = itertools.chain( | |
| 272 [item for sublist in s for item in sublist]) | |
| 273 | |
| 274 else: | |
| 275 sample = itertools.chain( | |
| 276 sorted(random.sample(range(1, N + 1), self.sample_size))) | |
| 277 logger.debug("sampling unpaired reads") | |
| 278 else: | |
| 279 # no sampling - use all | |
| 280 sample = itertools.count(1) | |
| 281 | |
| 282 # define counter: | |
| 283 if self.rename: | |
| 284 if self.paired: | |
| 285 counter = itertools.count(1, 0.5) | |
| 286 else: | |
| 287 counter = itertools.count(1, 1) | |
| 288 else: | |
| 289 counter = itertools.cycle([""]) | |
| 290 # define pairs: | |
| 291 if self.paired: | |
| 292 pair = itertools.cycle(['f', 'r']) | |
| 293 else: | |
| 294 pair = itertools.cycle(['']) | |
| 295 position = 0 | |
| 296 | |
| 297 seq2write = next(sample) | |
| 298 | |
| 299 for i in Sequence.read_fasta(self.source): | |
| 300 position += 1 | |
| 301 if position == seq2write: | |
| 302 | |
| 303 # create name: | |
| 304 prefix = i.name[0:self.prefix_length] # could be empty "" | |
| 305 if self.rename: | |
| 306 i.name = "{0}{1}{2}".format( | |
| 307 prefix, | |
| 308 int(next(counter)), | |
| 309 next(pair) | |
| 310 ) | |
| 311 if self.prefix_length: | |
| 312 if prefix in self.prefix_codes: | |
| 313 self.prefix_codes[prefix] += 1 | |
| 314 else: | |
| 315 self.prefix_codes[prefix] = 1 | |
| 316 self[i.name] = i.seq | |
| 317 if self.fasta: | |
| 318 i.write2fasta(f) | |
| 319 try: | |
| 320 seq2write = next(sample) | |
| 321 except StopIteration: | |
| 322 # no more sequences to sample | |
| 323 break | |
| 324 | |
| 325 | |
| 326 self.conn.commit() # save changes | |
| 327 if self.fasta: | |
| 328 f.close() | |
| 329 self._commit_immediately = True | |
| 330 | |
| 331 def __getitem__(self, item): | |
| 332 # item cannot be empty!!! | |
| 333 c = self.conn.cursor() | |
| 334 c.execute("select * from sequences where name= '{}'".format(item)) | |
| 335 # try: | |
| 336 name, sequence = c.fetchone() # should be only one | |
| 337 # except TypeError: | |
| 338 # return None | |
| 339 return sequence | |
| 340 | |
| 341 def __setitem__(self, item, value): | |
| 342 c = self.conn.cursor() | |
| 343 c.execute( | |
| 344 "insert into sequences values ( '{0}', '{1}')".format(item, value)) | |
| 345 if self._commit_immediately: | |
| 346 self.conn.commit() # save changes | |
| 347 | |
| 348 # def __iter__(self): | |
| 349 # self.c.execute("select name from sequences") | |
| 350 # for i in self.c: | |
| 351 # yield i[0] | |
| 352 | |
| 353 def __iter__(self): | |
| 354 c = self.conn.cursor() | |
| 355 c.execute("select name from sequences") | |
| 356 for i in c: | |
| 357 yield i[0] | |
| 358 | |
| 359 | |
| 360 # def __iter__(self): | |
| 361 # for i in range(1,len(self)): | |
| 362 # yield i | |
| 363 def items(self): | |
| 364 c = self.conn.cursor() | |
| 365 c.execute("select name, sequence from sequences") | |
| 366 for name, seq in c: | |
| 367 yield Sequence(seq, name) | |
| 368 | |
| 369 def toDict(self): | |
| 370 c = self.conn.cursor() | |
| 371 c.execute("select name, sequence from sequences") | |
| 372 d = {} | |
| 373 for name, seq in c: | |
| 374 d[name]=seq | |
| 375 return(d) | |
| 376 | |
| 377 def __len__(self): | |
| 378 c = self.conn.cursor() | |
| 379 c.execute("select count(*) from sequences") | |
| 380 length = c.fetchone()[0] | |
| 381 return(length) | |
| 382 | |
| 383 def __str__(self): | |
| 384 out = "" | |
| 385 c = self.conn.cursor() | |
| 386 c.execute("select * from sequences limit {0}".format(MAX_PRINT)) | |
| 387 for n, s in c: | |
| 388 out = out + "{0} : {1}\n".format(n, s) | |
| 389 out = out + "...\n" | |
| 390 out = out + str(len(self)) + " sequence total\n" | |
| 391 return out | |
| 392 | |
| 393 def insert(self, sequence, commit=True): | |
| 394 self._commit_immediately = commit | |
| 395 self[sequence.name] = sequence.seq | |
| 396 self._commit_immediately = True | |
| 397 | |
| 398 def keys(self): | |
| 399 ''' | |
| 400 this will get names of sequences | |
| 401 ''' | |
| 402 c = self.conn.cursor() | |
| 403 c.execute('select name from sequences order by rowid') | |
| 404 names = [] | |
| 405 for i in c.fetchall(): | |
| 406 names.append(i[0]) | |
| 407 return names | |
| 408 | |
| 409 def sample(self, size, seed=123): | |
| 410 ''' | |
| 411 generator - reproducible sampling sequences | |
| 412 ''' | |
| 413 max_index = len(self) | |
| 414 # sample = seqtools.SequenceSet(source=info.input_sequences, ) | |
| 415 if self.paired: | |
| 416 size = int(size / 2) | |
| 417 step = 2 | |
| 418 else: | |
| 419 step = 1 | |
| 420 random.seed(seed) | |
| 421 c = self.conn.cursor() | |
| 422 for i in sorted(random.sample(range(1, max_index, step), size)): | |
| 423 c.execute( | |
| 424 "select name, sequence from sequences where rowid={}".format(i)) | |
| 425 name, sequence = c.fetchone() | |
| 426 yield Sequence(sequence, name) | |
| 427 if self.paired: | |
| 428 c.execute( | |
| 429 "select name, sequence from sequences where rowid={}".format(i + 1)) | |
| 430 name, sequence = c.fetchone() | |
| 431 yield Sequence(sequence, name) | |
| 432 | |
| 433 def sample2db(self, db_file=None, fasta_file_name=None, size=100, seed=123): | |
| 434 if (not db_file) and (not fasta_file_name): | |
| 435 raise IOError("no db_file nor fasta_file_name were defined") | |
| 436 # open files | |
| 437 if db_file: | |
| 438 db = SequenceSet(filename=db_file, source=None, new=True) | |
| 439 if fasta_file_name: | |
| 440 f = open(fasta_file_name, 'w') | |
| 441 # write in files | |
| 442 for i in self.sample(size, seed): | |
| 443 if db_file: | |
| 444 db.insert(i, commit=False) | |
| 445 if fasta_file_name: | |
| 446 i.write2fasta(f) | |
| 447 # close files | |
| 448 if fasta_file_name: | |
| 449 f.close() | |
| 450 if db_file: | |
| 451 db.conn.commit() | |
| 452 return db | |
| 453 | |
| 454 def save2fasta(self, fasta_file_name, keep=True, subset=None): | |
| 455 if subset: | |
| 456 with open(fasta_file_name, 'w') as f: | |
| 457 c = self.conn.cursor() | |
| 458 c.execute("select name, sequence from sequences where name in ('{}')".format( | |
| 459 "','".join(subset))) | |
| 460 for name, sequence in c: | |
| 461 s = Sequence(sequence, name) | |
| 462 s.write2fasta(f) | |
| 463 else: | |
| 464 with open(fasta_file_name, 'w') as f: | |
| 465 for i in self.items(): | |
| 466 i.write2fasta(f) | |
| 467 if keep: | |
| 468 self.fasta = fasta_file_name | |
| 469 | |
| 470 def save_annotation(self, annotation_name, subset, dir): | |
| 471 annotation_file = "{}/{}_annotation.csv".format(dir, annotation_name) | |
| 472 with open(annotation_file, "w") as f: | |
| 473 c = self.conn.cursor() | |
| 474 c.execute( | |
| 475 "select * from {} where name in ('{}')".format( | |
| 476 annotation_name, "','".join(subset) | |
| 477 ) | |
| 478 ) | |
| 479 header = [description[0] for description in c.description] | |
| 480 f.write("\t".join(str(j) for j in header) + "\n") | |
| 481 for i in c: | |
| 482 f.write("\t".join(str(j) for j in i) + "\n") | |
| 483 return annotation_file | |
| 484 | |
| 485 def make_chunks(self, file_name=None, chunk_size=5000): | |
| 486 ''' | |
| 487 split SequneceSet to chunks and save to files, | |
| 488 return list of files names | |
| 489 ''' | |
| 490 logger.debug("creating chunks from fasta file: ") | |
| 491 if not file_name and self.fasta: | |
| 492 file_name = self.fasta | |
| 493 else: | |
| 494 raise Exception('file name for chunks is not defined!') | |
| 495 | |
| 496 seqs = iter(self.items()) | |
| 497 fn_chunk_names = [] | |
| 498 for i in itertools.count(): | |
| 499 try: | |
| 500 fn = "{0}.{1}".format(file_name, i) | |
| 501 logger.info("saving chunk {}".format(fn)) | |
| 502 for n in range(chunk_size): | |
| 503 s = next(seqs) | |
| 504 if not n: | |
| 505 f = open(fn, 'w') | |
| 506 s.write2fasta(f) | |
| 507 f.close() | |
| 508 fn_chunk_names.append(fn) | |
| 509 except StopIteration: | |
| 510 if not f.closed: | |
| 511 f.close() | |
| 512 fn_chunk_names.append(fn) | |
| 513 break | |
| 514 self.chunks = fn_chunk_names | |
| 515 logger.debug(self.chunks) | |
| 516 return(fn_chunk_names) | |
| 517 | |
| 518 def makeblastdb(self, blastdb=None, dbtype='nucl', legacy=False, lastdb = False): | |
| 519 ''' | |
| 520 dbtype eithe 'nucl' of 'prot' | |
| 521 ''' | |
| 522 logger.info("creating blast database") | |
| 523 # check if fasta file on disk is present | |
| 524 if not self.fasta: | |
| 525 try: | |
| 526 self.save2fasta(blastdb) | |
| 527 except TypeError: | |
| 528 print("\nEither blastdb or fasta must be ", | |
| 529 "defined when making blast database!!!\n") | |
| 530 raise | |
| 531 else: | |
| 532 blastdb = blastdb if blastdb else self.fasta | |
| 533 | |
| 534 cmd = "makeblastdb -in {0} -out {1} -dbtype {2}".format(self.fasta, | |
| 535 blastdb, | |
| 536 dbtype) | |
| 537 args = shlex.split(cmd) | |
| 538 if 0 == subprocess.check_call(args, stderr=sys.stdout): | |
| 539 self.blastdb = blastdb | |
| 540 else: | |
| 541 Warning("makeblastdb failed") | |
| 542 | |
| 543 if legacy: | |
| 544 logger.info("creating legacy blast database") | |
| 545 prot = {'nucl': 'F', 'prot': 'T'}[dbtype] | |
| 546 cmd = "formatdb -i {0} -p {1} -n {2}.legacy".format(self.fasta, | |
| 547 prot, | |
| 548 blastdb | |
| 549 ) | |
| 550 args = shlex.split(cmd) | |
| 551 if subprocess.check_call(args) == 0: | |
| 552 self.blastdb_legacy = "{}.legacy".format(blastdb) | |
| 553 else: | |
| 554 Warning("formatdb failed") | |
| 555 if lastdb: | |
| 556 logger.info("creating lastdb database") | |
| 557 cmd = "lastdb -u NEAR {fasta} {fasta}".format(fasta=self.fasta) | |
| 558 args = shlex.split(cmd) | |
| 559 if subprocess.check_call(args) == 0: | |
| 560 self.lastdb = self.fasta | |
| 561 else: | |
| 562 Warning("formatdb failed") | |
| 563 | |
| 564 | |
| 565 def create_hitsort(self, options, output=None): | |
| 566 ''' | |
| 567 query is name of files to search against blastdb of SequenceSet object | |
| 568 query could be multiple files | |
| 569 this is inteded to be used for tabular output only | |
| 570 output file must be specified, | |
| 571 if more than one query is used, multiple files are created | |
| 572 worker - function to execute blast search | |
| 573 ''' | |
| 574 | |
| 575 logger.info('running all to all blast') | |
| 576 query = self.chunks | |
| 577 if not output: | |
| 578 output = "{}.blast".format(self.fasta) | |
| 579 database = getattr(self, options.database) | |
| 580 if len(query) > 1: | |
| 581 output_parts = [output + "." + str(i) for i in range(len(query))] | |
| 582 else: | |
| 583 output_parts = [output] | |
| 584 query = [query] | |
| 585 | |
| 586 parallel(_hitsort_worker, query, [database], | |
| 587 output_parts, [options]) | |
| 588 logger.info('all to all blast finished') | |
| 589 logger.info('removing duplicates from all to all blast results') | |
| 590 self._clean_hitsort( | |
| 591 hitsort_files=output_parts, filtered_hitsort=output) | |
| 592 # remove temp files: | |
| 593 for f in output_parts: | |
| 594 os.unlink(f) | |
| 595 self.hitsort = output | |
| 596 return output | |
| 597 | |
| 598 def _clean_hitsort(self, hitsort_files, filtered_hitsort): | |
| 599 ''' alterantive hitsort duplicate removal ''' | |
| 600 | |
| 601 ids = self.keys() | |
| 602 Nfiles = 5 + int(sum([os.path.getsize(i) | |
| 603 for i in hitsort_files]) / 5e9) | |
| 604 hitsort_parts = [ | |
| 605 "{0}.{1}.parts".format(filtered_hitsort, i) for i in range(Nfiles)] | |
| 606 # open files | |
| 607 fs = [] | |
| 608 for i in hitsort_parts: | |
| 609 fs.append(open(i, 'w')) | |
| 610 | |
| 611 ids_destination = {} | |
| 612 fs_iter = itertools.cycle(fs) | |
| 613 for i in ids: | |
| 614 #ids_destination[i] = fs_iter.next() | |
| 615 ids_destination[i] = next(fs_iter) | |
| 616 lines_out = lines_total = 0 | |
| 617 temp_open = False | |
| 618 | |
| 619 for i in hitsort_files: | |
| 620 f = open(i, 'r') | |
| 621 while True: | |
| 622 line = f.readline() | |
| 623 if not line: | |
| 624 break | |
| 625 lines_total += 1 | |
| 626 line_items = line.split() | |
| 627 # ids_destination[line_items[0]].write(line) | |
| 628 # do not assume that it is sorted! | |
| 629 ids_destination[min(line_items[0:2])].write(line) | |
| 630 # close all files | |
| 631 for i in fs: | |
| 632 i.close() | |
| 633 | |
| 634 # load one by one and exclude duplicates: | |
| 635 hit_out = open(filtered_hitsort, 'w') | |
| 636 for i in hitsort_parts: | |
| 637 f = open(i, 'r') | |
| 638 hitsort_shelve = {} | |
| 639 while True: | |
| 640 line = f.readline() | |
| 641 if not line: | |
| 642 break | |
| 643 line_items = line.split() | |
| 644 key = "\t".join(sorted(line_items[0:2])) | |
| 645 value = line_items | |
| 646 bitscore = float(line_items[2]) | |
| 647 if key in hitsort_shelve: | |
| 648 if float(hitsort_shelve[key][2]) > bitscore: | |
| 649 hitsort_shelve[key] = value | |
| 650 hit_out.write("\t".join(hitsort_shelve[key]) + "\n") | |
| 651 del hitsort_shelve[key] | |
| 652 lines_out += 1 | |
| 653 else: | |
| 654 hitsort_shelve[key] = value | |
| 655 f.close() | |
| 656 for key in hitsort_shelve: | |
| 657 hit_out.write("\t".join(hitsort_shelve[key]) + "\n") | |
| 658 lines_out += 1 | |
| 659 # clean up | |
| 660 for i in hitsort_parts: | |
| 661 os.unlink(i) | |
| 662 hit_out.close() | |
| 663 return lines_total, lines_out | |
| 664 | |
| 665 def _check_database(self, database): | |
| 666 ''' | |
| 667 database is path to fastafile, database with same prefix should exist | |
| 668 it creates database if does not exist as temporal file! | |
| 669 ''' | |
| 670 pass | |
| 671 # TODO | |
| 672 | |
| 673 def remove_sequences_using_filter(self, fasta_file_filter, keep_proportion, | |
| 674 omitted_sequences_file, | |
| 675 kept_sequences_file): | |
| 676 ''' | |
| 677 Run blast with fasta_file_filter against legaci blastdb of SequenceSet to detect sequences | |
| 678 which are to be removed from SequencesSet. Complete pairsare removed if paired sequences | |
| 679 are used. | |
| 680 ''' | |
| 681 ids = blast_with_filter(fasta_file_filter, self.blastdb_legacy) | |
| 682 # check against database and remove sequences | |
| 683 | |
| 684 c = self.conn.cursor() | |
| 685 if self.paired: | |
| 686 c.execute("SELECT rowid FROM sequences WHERE name IN {}".format( | |
| 687 format_query(ids) | |
| 688 )) | |
| 689 odd_rowids = set() | |
| 690 for i in c.fetchall(): | |
| 691 if i[0] % 2 == 0: | |
| 692 odd_rowids.add(i[0] - 1) | |
| 693 else: | |
| 694 odd_rowids.add(i[0]) | |
| 695 odd_rowids_keep = random.sample(odd_rowids, round(keep_proportion * len(odd_rowids))) | |
| 696 c.execute("SELECT name FROM sequences WHERE rowid IN {} ORDER BY rowid".format( | |
| 697 format_query(odd_rowids_keep + [i+1 for i in odd_rowids_keep]) | |
| 698 )) | |
| 699 self.ids_kept = [i[0] for i in c.fetchall()] | |
| 700 odd_rowids_omit = list(odd_rowids.difference(odd_rowids_keep)) | |
| 701 c.execute("SELECT name FROM sequences WHERE rowid IN {} ORDER BY rowid".format( | |
| 702 format_query(odd_rowids_omit + [i+1 for i in odd_rowids_omit]) | |
| 703 )) | |
| 704 ids_omit = [i[0] for i in c.fetchall()] | |
| 705 | |
| 706 else: | |
| 707 self.ids_kept = random.sample(ids, round(keep_proportion * len(ids))) | |
| 708 ids_omit = list(ids.difference(self.ids_kept)) | |
| 709 self.save2fasta(omitted_sequences_file, keep=False, subset=ids_omit) | |
| 710 self.save2fasta(kept_sequences_file, keep=False, subset=self.ids_kept) | |
| 711 self.drop_sequences(ids_omit) | |
| 712 # save info about omited sequences | |
| 713 c.execute("CREATE TABLE ids_kept (name TEXT)") | |
| 714 c.executemany("INSERT INTO ids_kept (name) values (?)", [(i,) for i in self.ids_kept]) | |
| 715 return len(ids_omit) | |
| 716 | |
| 717 def drop_sequences(self, ids_to_drop): | |
| 718 ''' | |
| 719 remove sequences from sequence set based on the ID | |
| 720 ''' | |
| 721 self._original_length = len(self) | |
| 722 c = self.conn.cursor() | |
| 723 idslist = format_query(ids_to_drop) | |
| 724 c.execute("CREATE TABLE omited_sequences AS SELECT * FROM sequences WHERE name IN {}".format(idslist)) | |
| 725 c.execute("DELETE FROM sequences WHERE name IN {}".format( | |
| 726 idslist | |
| 727 )) | |
| 728 # new databases must be created - with ommited sequences! | |
| 729 self.save2fasta(fasta_file_name=self.fasta) ## this will replace origninal file! | |
| 730 self._length = len(self) | |
| 731 self.makeblastdb(legacy=True) | |
| 732 self._update_database() | |
| 733 | |
| 734 def annotate(self, database, annotation_name, directory="", params=""): | |
| 735 ''' | |
| 736 database : path to blast formated database | |
| 737 method: type of search to use for annotation. it must be | |
| 738 'blastn' of 'blastx' | |
| 739 Annotation is save in directory also into the table with name stored in | |
| 740 annotation_name variable | |
| 741 ''' | |
| 742 logger.info("annotating reads with {} ".format(annotation_name)) | |
| 743 self._check_database(database) | |
| 744 if "parallelize" in params: | |
| 745 parallelize = params['parallelize'] | |
| 746 else: | |
| 747 parallelize = True | |
| 748 if parallelize: | |
| 749 if not self.chunks: | |
| 750 self.make_chunks() | |
| 751 output = [ | |
| 752 "{}/{}_{}".format(directory, annotation_name,os.path.basename(i)) for i in self.chunks] | |
| 753 parallel(blast_worker, self.chunks, [database], output, [params]) | |
| 754 else: | |
| 755 # no explicit parallelization using parallel | |
| 756 single_output = "{}/{}_hits".format(directory, annotation_name) | |
| 757 params['args'] = params['args'].format(max_proc=get_max_proc()) | |
| 758 blast_worker(self.fasta, database, single_output,params) | |
| 759 output = [single_output] | |
| 760 | |
| 761 # put into as new attribute and sqlite table | |
| 762 c = self.conn.cursor() | |
| 763 c.execute("create table {} (name text, db_id text, length real, bitscore real , pid real)".format( | |
| 764 annotation_name)) | |
| 765 for blast_results in output: | |
| 766 with open(blast_results, 'r') as f: | |
| 767 for i in f: | |
| 768 items = i.split() | |
| 769 types = [str, str, float, float, float, float, float] | |
| 770 qseqid, sseqid, qlen, slen, length, pident, bitscore = [ | |
| 771 f(i) for i, f in zip(items, types)] | |
| 772 c.execute('insert into {} values ("{}", "{}", "{}", "{}", "{}")'.format( | |
| 773 annotation_name, qseqid, sseqid, length, bitscore, pident)) | |
| 774 self.conn.commit() | |
| 775 self.annotations.append(annotation_name) | |
| 776 | |
| 777 @staticmethod | |
| 778 def fasta_length(fasta_file_name): | |
| 779 ''' | |
| 780 count number of sequences, | |
| 781 ''' | |
| 782 with open(fasta_file_name, 'r') as f: | |
| 783 N = 0 | |
| 784 for i in f: | |
| 785 if i.strip()[0] == ">": | |
| 786 N += 1 | |
| 787 return N | |
| 788 | |
| 789 | |
| 790 # test | |
| 791 if __name__ == "__main__": | |
| 792 # get root directory | |
| 793 | |
| 794 MAIN_DIR = os.path.realpath( | |
| 795 os.path.dirname(os.path.realpath(__file__)) + "/..") | |
| 796 print("MAIN_DIR:") | |
| 797 print(MAIN_DIR) | |
| 798 TMP_DIR = "{}/tmp".format(MAIN_DIR) | |
| 799 TEST_DATA_DIR = "{}/test_data".format(MAIN_DIR) | |
| 800 | |
| 801 # some data: | |
| 802 fn = "{}/pair_reads.fasta".format(TEST_DATA_DIR) | |
| 803 os.chdir(TMP_DIR) | |
| 804 # s = SequenceSet(filename='sqlite.db') | |
| 805 print("number of sequences in fasta file:") | |
| 806 print(SequenceSet.fasta_length(fn)) | |
| 807 | |
| 808 print("loading sequences...") | |
| 809 s = SequenceSet(source=fn, paired=True, rename=False, | |
| 810 filename='sqlite.db', prefix_length=4, fasta='sample2.fasta') | |
| 811 print(s) | |
| 812 | |
| 813 print("creating blast database") | |
| 814 s.makeblastdb() | |
| 815 | |
| 816 print("creating chunks from fasta file: ") | |
| 817 file_list = s.make_chunks("fasta_chunks", chunk_size=500) | |
| 818 print(file_list) | |
| 819 | |
| 820 print('creating hitsort') | |
| 821 s.create_hitsort(file_list, "hitsort") | |
| 822 | |
| 823 print("saving to fasta file") | |
| 824 s.save2fasta('sampleX.fasta', keep=False) |
