comparison bedutil.py @ 0:887e111c0919 draft

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/proteogenomics/retrieve_ensembl_bed commit 3fd7be931712e7fa5b281bc8c48104c8583ef7f0
author galaxyp
date Sun, 14 Jan 2018 14:11:53 -0500
parents
children c3d600729b6f
comparison
equal deleted inserted replaced
-1:000000000000 0:887e111c0919
1 #!/usr/bin/env python
2 """
3 #
4 #------------------------------------------------------------------------------
5 # University of Minnesota
6 # Copyright 2016, Regents of the University of Minnesota
7 #------------------------------------------------------------------------------
8 # Author:
9 #
10 # James E Johnson
11 #
12 #------------------------------------------------------------------------------
13 """
14
15 import sys
16
17 from Bio.Seq import reverse_complement, translate
18
19
20 def bed_from_line(line, ensembl=False, seq_column=None):
21 fields = line.rstrip('\r\n').split('\t')
22 if len(fields) < 12:
23 return None
24 (chrom, chromStart, chromEnd, name, score, strand,
25 thickStart, thickEnd, itemRgb,
26 blockCount, blockSizes, blockStarts) = fields[0:12]
27 bed_entry = BedEntry(chrom=chrom, chromStart=chromStart, chromEnd=chromEnd,
28 name=name, score=score, strand=strand,
29 thickStart=thickStart, thickEnd=thickEnd,
30 itemRgb=itemRgb,
31 blockCount=blockCount,
32 blockSizes=blockSizes.rstrip(','),
33 blockStarts=blockStarts.rstrip(','))
34 if seq_column is not None and -len(fields) <= seq_column < len(fields):
35 bed_entry.seq = fields[seq_column]
36 if ensembl and len(fields) >= 20:
37 bed_entry.second_name = fields[12]
38 bed_entry.cds_start_status = fields[13]
39 bed_entry.cds_end_status = fields[14]
40 bed_entry.exon_frames = fields[15].rstrip(',')
41 bed_entry.biotype = fields[16]
42 bed_entry.gene_name = fields[17]
43 bed_entry.second_gene_name = fields[18]
44 bed_entry.gene_type = fields[19]
45 return bed_entry
46
47
48 class BedEntry(object):
49 def __init__(self, chrom=None, chromStart=None, chromEnd=None,
50 name=None, score=None, strand=None,
51 thickStart=None, thickEnd=None, itemRgb=None,
52 blockCount=None, blockSizes=None, blockStarts=None):
53 self.chrom = chrom
54 self.chromStart = int(chromStart)
55 self.chromEnd = int(chromEnd)
56 self.name = name
57 self.score = int(score) if score is not None else 0
58 self.strand = '-' if str(strand).startswith('-') else '+'
59 self.thickStart = int(thickStart) if thickStart else self.chromStart
60 self.thickEnd = int(thickEnd) if thickEnd else self.chromEnd
61 self.itemRgb = str(itemRgb) if itemRgb is not None else r'100,100,100'
62 self.blockCount = int(blockCount)
63 if isinstance(blockSizes, str) or isinstance(blockSizes, unicode):
64 self.blockSizes = [int(x) for x in blockSizes.split(',')]
65 elif isinstance(blockSizes, list):
66 self.blockSizes = [int(x) for x in blockSizes]
67 else:
68 self.blockSizes = blockSizes
69 if isinstance(blockStarts, str) or isinstance(blockSizes, unicode):
70 self.blockStarts = [int(x) for x in blockStarts.split(',')]
71 elif isinstance(blockStarts, list):
72 self.blockStarts = [int(x) for x in blockStarts]
73 else:
74 self.blockStarts = blockStarts
75 self.second_name = None
76 self.cds_start_status = None
77 self.cds_end_status = None
78 self.exon_frames = None
79 self.biotype = None
80 self.gene_name = None
81 self.second_gene_name = None
82 self.gene_type = None
83 self.seq = None
84 self.cdna = None
85 self.pep = None
86 # T26C
87 self.aa_change = []
88 # p.Trp26Cys g.<pos><ref>><alt> # g.1304573A>G
89 self.variants = []
90
91 def __str__(self):
92 return '%s\t%d\t%d\t%s\t%d\t%s\t%d\t%d\t%s\t%d\t%s\t%s' % (
93 self.chrom, self.chromStart, self.chromEnd,
94 self.name, self.score, self.strand,
95 self.thickStart, self.thickEnd, str(self.itemRgb), self.blockCount,
96 ','.join([str(x) for x in self.blockSizes]),
97 ','.join([str(x) for x in self.blockStarts]))
98
99 def get_splice_junctions(self):
100 splice_juncs = []
101 for i in range(self.blockCount - 1):
102 splice_junc = "%s:%d_%d"\
103 % (self.chrom,
104 self.chromStart + self.blockSizes[i],
105 self.chromStart + self.blockStarts[i+1])
106 splice_juncs.append(splice_junc)
107 return splice_juncs
108
109 def get_exon_seqs(self):
110 if not self.seq:
111 return None
112 exons = []
113 for i in range(self.blockCount):
114 exons.append(self.seq[self.blockStarts[i]:self.blockStarts[i]
115 + self.blockSizes[i]])
116 if self.strand == '-': # reverse complement
117 exons.reverse()
118 for i, s in enumerate(exons):
119 exons[i] = reverse_complement(s)
120 return exons
121
122 def get_spliced_seq(self, strand=None):
123 if not self.seq:
124 return None
125 seq = ''.join(self.get_exon_seqs())
126 if strand and self.strand != strand:
127 seq = reverse_complement(seq)
128 return seq
129
130 def get_cdna(self):
131 if not self.cdna:
132 self.cdna = self.get_spliced_seq()
133 return self.cdna
134
135 def get_cds(self):
136 cdna = self.get_cdna()
137 if cdna:
138 if self.chromStart == self.thickStart\
139 and self.chromEnd == self.thickEnd:
140 return cdna
141 pos = [self.cdna_offset_of_pos(self.thickStart),
142 self.cdna_offset_of_pos(self.thickEnd)]
143 if 0 <= min(pos) <= max(pos) <= len(cdna):
144 return cdna[min(pos):max(pos)]
145 return None
146
147 def set_cds(self, cdna_start, cdna_end):
148 cdna_len = sum(self.blockSizes)
149 if 0 <= cdna_start < cdna_end <= cdna_len:
150 cds_pos = [self.pos_of_cdna_offet(cdna_start),
151 self.pos_of_cdna_offet(cdna_end)]
152 if all(cds_pos):
153 self.thickStart = min(cds_pos)
154 self.thickEnd = max(cds_pos)
155 return self
156 return None
157
158 def trim_cds(self, basepairs):
159 if self.chromStart <= self.thickStart < self.thickEnd <= self.chromEnd:
160 cds_pos = [self.cdna_offset_of_pos(self.thickStart),
161 self.cdna_offset_of_pos(self.thickEnd)]
162 if basepairs > 0:
163 return self.set_cds(min(cds_pos) + basepairs, max(cds_pos))
164 else:
165 return self.set_cds(min(cds_pos), max(cds_pos) + basepairs)
166 return None
167
168 def get_cigar(self):
169 cigar = ''
170 r = range(self.blockCount)
171 xl = None
172 for x in r:
173 if xl is not None:
174 intronSize = abs(self.blockStarts[x] - self.blockSizes[xl]
175 - self.blockStarts[xl])
176 cigar += '%dN' % intronSize
177 cigar += '%dM' % self.blockSizes[x]
178 xl = x
179 return cigar
180
181 def get_cigar_md(self):
182 cigar = ''
183 md = ''
184 r = range(self.blockCount)
185 xl = None
186 for x in r:
187 if xl is not None:
188 intronSize = abs(self.blockStarts[x] - self.blockSizes[xl]
189 - self.blockStarts[xl])
190 cigar += '%dN' % intronSize
191 cigar += '%dM' % self.blockSizes[x]
192 xl = x
193 md = '%d' % sum(self.blockSizes)
194 return (cigar, md)
195
196 def get_translation(self, sequence=None):
197 translation = None
198 seq = sequence if sequence else self.get_spliced_seq()
199 if seq:
200 seqlen = len(seq) / 3 * 3
201 if seqlen >= 3:
202 translation = translate(seq[:seqlen])
203 return translation
204
205 def get_translations(self):
206 translations = []
207 seq = self.get_spliced_seq()
208 if seq:
209 for i in range(3):
210 translation = self.get_translation(sequence=seq[i:])
211 if translation:
212 translations.append(translation)
213 return translations
214
215 def pos_of_cdna_offet(self, offset):
216 if offset is not None and 0 <= offset < sum(self.blockSizes):
217 r = range(self.blockCount)
218 rev = self.strand == '-'
219 if rev:
220 r.reverse()
221 nlen = 0
222 for x in r:
223 if offset < nlen + self.blockSizes[x]:
224 if rev:
225 return self.chromStart + self.blockStarts[x]\
226 + self.blockSizes[x] - (offset - nlen)
227 else:
228 return self.chromStart + self.blockStarts[x]\
229 + (offset - nlen)
230 nlen += self.blockSizes[x]
231 return None
232
233 def cdna_offset_of_pos(self, pos):
234 if not self.chromStart <= pos < self.chromEnd:
235 return -1
236 r = range(self.blockCount)
237 rev = self.strand == '-'
238 if rev:
239 r.reverse()
240 nlen = 0
241 for x in r:
242 bStart = self.chromStart + self.blockStarts[x]
243 bEnd = bStart + self.blockSizes[x]
244 if bStart <= pos < bEnd:
245 return nlen + (bEnd - pos if rev else pos - bStart)
246 nlen += self.blockSizes[x]
247
248 def apply_variant(self, pos, ref, alt):
249 pos = int(pos)
250 if not ref or not alt:
251 print >> sys.stderr, "variant requires ref and alt sequences"
252 return
253 if not self.chromStart <= pos <= self.chromEnd:
254 print >> sys.stderr, "variant not in entry %s: %s %d < %d < %d"\
255 % (self.name, self.strand, self.chromStart, pos, self.chromEnd)
256 print >> sys.stderr, "%s" % str(self)
257 return
258 if len(ref) != len(alt):
259 print >> sys.stderr, "variant only works for snp: %s %s"\
260 % (ref, alt)
261 return
262 if not self.seq:
263 print >> sys.stderr, "variant entry %s has no seq" % self.name
264 return
265 """
266 if self.strand == '-':
267 ref = reverse_complement(ref)
268 alt = reverse_complement(alt)
269 """
270 bases = list(self.seq)
271 offset = pos - self.chromStart
272 for i in range(len(ref)):
273 # offset = self.cdna_offset_of_pos(pos+i)
274 if offset is not None:
275 bases[offset+i] = alt[i]
276 else:
277 print >> sys.stderr,\
278 "variant offset %s: %s %d < %d < %d"\
279 % (self.name, self.strand, self.chromStart,
280 pos+1, self.chromEnd)
281 print >> sys.stderr, "%s" % str(self)
282 self.seq = ''.join(bases)
283 self.variants.append("g.%d%s>%s" % (pos+1, ref, alt))
284
285 def get_variant_bed(self, pos, ref, alt):
286 pos = int(pos)
287 if not ref or not alt:
288 print >> sys.stderr, "variant requires ref and alt sequences"
289 return None
290 if not self.chromStart <= pos <= self.chromEnd:
291 print >> sys.stderr,\
292 "variant not in entry %s: %s %d < %d < %d"\
293 % (self.name, self.strand, self.chromStart, pos, self.chromEnd)
294 print >> sys.stderr, "%s" % str(self)
295 return None
296 if not self.seq:
297 print >> sys.stderr, "variant entry %s has no seq" % self.name
298 return None
299 tbed = BedEntry(chrom=self.chrom,
300 chromStart=self.chromStart, chromEnd=self.chromEnd,
301 name=self.name, score=self.score, strand=self.strand,
302 thickStart=self.chromStart, thickEnd=self.chromEnd,
303 itemRgb=self.itemRgb,
304 blockCount=self.blockCount,
305 blockSizes=self.blockSizes,
306 blockStarts=self.blockStarts)
307 bases = list(self.seq)
308 offset = pos - self.chromStart
309 tbed.seq = ''.join(bases[:offset] + list(alt)
310 + bases[offset+len(ref):])
311 if len(ref) != len(alt):
312 diff = len(alt) - len(ref)
313 rEnd = pos + len(ref)
314 # need to adjust blocks
315 # change spans blocks,
316 for x in range(tbed.blockCount):
317 bStart = tbed.chromStart + tbed.blockStarts[x]
318 bEnd = bStart + tbed.blockSizes[x]
319 # change within a block or extends (last block)
320 # adjust blocksize
321 # seq: GGGcatGGG
322 # ref c alt tag: GGGtagatGGG
323 # ref cat alt a: GGGaGGG
324 if bStart <= pos < rEnd < bEnd:
325 tbed.blockSizes[x] += diff
326 return tbed
327
328 # (start, end)
329 def get_subrange(self, tstart, tstop, debug=False):
330 chromStart = self.chromStart
331 chromEnd = self.chromEnd
332 if debug:
333 print >> sys.stderr, "%s" % (str(self))
334 r = range(self.blockCount)
335 if self.strand == '-':
336 r.reverse()
337 bStart = 0
338 bEnd = 0
339 for x in r:
340 bEnd = bStart + self.blockSizes[x]
341 if bStart <= tstart < bEnd:
342 if self.strand == '+':
343 chromStart = self.chromStart + self.blockStarts[x] +\
344 (tstart - bStart)
345 else:
346 chromEnd = self.chromStart + self.blockStarts[x] +\
347 self.blockSizes[x] - (tstart - bStart)
348 if bStart <= tstop < bEnd:
349 if self.strand == '+':
350 chromEnd = self.chromStart + self.blockStarts[x] +\
351 (tstop - bStart)
352 else:
353 chromStart = self.chromStart + self.blockStarts[x] +\
354 self.blockSizes[x] - (tstop - bStart)
355 if debug:
356 print >> sys.stderr,\
357 "%3d %s\t%d\t%d\t%d\t%d\t%d\t%d"\
358 % (x, self.strand, bStart, bEnd,
359 tstart, tstop, chromStart, chromEnd)
360 bStart += self.blockSizes[x]
361 return(chromStart, chromEnd)
362
363 # get the blocks for sub range
364 def get_blocks(self, chromStart, chromEnd):
365 tblockCount = 0
366 tblockSizes = []
367 tblockStarts = []
368 for x in range(self.blockCount):
369 bStart = self.chromStart + self.blockStarts[x]
370 bEnd = bStart + self.blockSizes[x]
371 if bStart > chromEnd:
372 break
373 if bEnd < chromStart:
374 continue
375 cStart = max(chromStart, bStart)
376 tblockStarts.append(cStart - chromStart)
377 tblockSizes.append(min(chromEnd, bEnd) - cStart)
378 tblockCount += 1
379 return (tblockCount, tblockSizes, tblockStarts)
380
381 def trim(self, tstart, tstop, debug=False):
382 (tchromStart, tchromEnd) =\
383 self.get_subrange(tstart, tstop, debug=debug)
384 (tblockCount, tblockSizes, tblockStarts) =\
385 self.get_blocks(tchromStart, tchromEnd)
386 tbed = BedEntry(
387 chrom=self.chrom, chromStart=tchromStart, chromEnd=tchromEnd,
388 name=self.name, score=self.score, strand=self.strand,
389 thickStart=tchromStart, thickEnd=tchromEnd, itemRgb=self.itemRgb,
390 blockCount=tblockCount,
391 blockSizes=tblockSizes, blockStarts=tblockStarts)
392 if self.seq:
393 ts = tchromStart-self.chromStart
394 te = tchromEnd - tchromStart + ts
395 tbed.seq = self.seq[ts:te]
396 return tbed
397
398 def get_filtered_translations(self, untrimmed=False, filtering=True,
399 ignore_left_bp=0, ignore_right_bp=0,
400 debug=False):
401 translations = [None, None, None]
402 seq = self.get_spliced_seq()
403 ignore = (ignore_left_bp if self.strand == '+'
404 else ignore_right_bp) / 3
405 block_sum = sum(self.blockSizes)
406 exon_sizes = [x for x in self.blockSizes]
407 if self.strand == '-':
408 exon_sizes.reverse()
409 splice_sites = [sum(exon_sizes[:x]) / 3
410 for x in range(1, len(exon_sizes))]
411 if debug:
412 print >> sys.stderr, "splice_sites: %s" % splice_sites
413 junc = splice_sites[0] if len(splice_sites) > 0 else exon_sizes[0]
414 if seq:
415 for i in range(3):
416 translation = self.get_translation(sequence=seq[i:])
417 if translation:
418 tstart = 0
419 tstop = len(translation)
420 offset = (block_sum - i) % 3
421 if debug:
422 print >> sys.stderr,\
423 "frame: %d\ttstart: %d tstop: %d offset: %d\t%s"\
424 % (i, tstart, tstop, offset, translation)
425 if not untrimmed:
426 tstart = translation.rfind('*', 0, junc) + 1
427 stop = translation.find('*', junc)
428 tstop = stop if stop >= 0 else len(translation)
429 offset = (block_sum - i) % 3
430 trimmed = translation[tstart:tstop]
431 if debug:
432 print >> sys.stderr,\
433 "frame: %d\ttstart: %d tstop: %d offset: %d\t%s"\
434 % (i, tstart, tstop, offset, trimmed)
435 if filtering and tstart > ignore:
436 continue
437 # get genomic locations for start and end
438 if self.strand == '+':
439 chromStart = self.chromStart + i + (tstart * 3)
440 chromEnd = self.chromEnd - offset\
441 - (len(translation) - tstop) * 3
442 else:
443 chromStart = self.chromStart + offset\
444 + (len(translation) - tstop) * 3
445 chromEnd = self.chromEnd - i - (tstart * 3)
446 # get the blocks for this translation
447 (tblockCount, tblockSizes, tblockStarts) =\
448 self.get_blocks(chromStart, chromEnd)
449 translations[i] = (chromStart, chromEnd, trimmed,
450 tblockCount, tblockSizes, tblockStarts)
451 if debug:
452 print >> sys.stderr,\
453 "tblockCount: %d tblockStarts: %s tblockSizes: %s"\
454 % (tblockCount, tblockStarts, tblockSizes)
455 return translations
456
457 def get_seq_id(self, seqtype='unk:unk', reference='', frame=None):
458 # Ensembl fasta ID format
459 # >ID SEQTYPE:STATUS LOCATION GENE TRANSCRIPT
460 # >ENSP00000328693 pep:splice chromosome:NCBI35:1:904515:910768:1\
461 # gene:ENSG00000158815:transcript:ENST00000328693\
462 # gene_biotype:protein_coding transcript_biotype:protein_coding
463 frame_name = ''
464 chromStart = self.chromStart
465 chromEnd = self.chromEnd
466 strand = 1 if self.strand == '+' else -1
467 if frame is not None:
468 block_sum = sum(self.blockSizes)
469 offset = (block_sum - frame) % 3
470 frame_name = '_' + str(frame + 1)
471 if self.strand == '+':
472 chromStart += frame
473 chromEnd -= offset
474 else:
475 chromStart += offset
476 chromEnd -= frame
477 location = "chromosome:%s:%s:%s:%s:%s"\
478 % (reference, self.chrom, chromStart, chromEnd, strand)
479 seq_id = "%s%s %s %s" % (self.name, frame_name, seqtype, location)
480 return seq_id
481
482 def get_line(self, start_offset=0, end_offset=0):
483 if start_offset or end_offset:
484 s_offset = start_offset if start_offset else 0
485 e_offset = end_offset if end_offset else 0
486 if s_offset > self.chromStart:
487 s_offset = self.chromStart
488 chrStart = self.chromStart - s_offset
489 chrEnd = self.chromEnd + e_offset
490 blkSizes = self.blockSizes
491 blkSizes[0] += s_offset
492 blkSizes[-1] += e_offset
493 blkStarts = self.blockStarts
494 for i in range(1, self.blockCount):
495 blkStarts[i] += s_offset
496 items = [str(x) for x in [self.chrom, chrStart, chrEnd, self.name,
497 self.score, self.strand, self.thickStart,
498 self.thickEnd, self.itemRgb,
499 self.blockCount,
500 ','.join([str(x) for x in blkSizes]),
501 ','.join([str(x) for x in blkStarts])]]
502 return '\t'.join(items) + '\n'
503 return self.line