Mercurial > repos > jjohnson > ensembl_variant_report
comparison ensemblref.py @ 0:c3a9e63e8c51 draft
planemo upload for repository https://github.com/jj-umn/galaxytools/tree/master/ensembl_variant_report commit 239c1ee096e5fc3e2e929f7bf2d4afba5c677d4b-dirty
| author | jjohnson |
|---|---|
| date | Fri, 06 Jan 2017 16:19:40 -0500 |
| parents | |
| children | 9e83cc05d384 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:c3a9e63e8c51 |
|---|---|
| 1 """ | |
| 2 with gtf and twobit | |
| 3 get_bed(transcript_id) | |
| 4 """ | |
| 5 from gtf_to_genes import gene, gene_utilities | |
| 6 from twobitreader import TwoBitFile | |
| 7 from Bio.Seq import reverse_complement, translate | |
| 8 import logging | |
| 9 logger = logging.getLogger("test") | |
| 10 | |
| 11 class EnsemblRef(object): | |
| 12 | |
| 13 def __init__(self,gtf_file,twobitfile,read_now=True): | |
| 14 self.gtf_file = gtf_file | |
| 15 self.twobitfile = twobitfile | |
| 16 self.twobit = TwoBitFile(self.twobitfile) | |
| 17 self.gene_dict = None | |
| 18 self.transcript_idx = None | |
| 19 self.name_idx = None | |
| 20 if read_now: | |
| 21 self.get_transcript_idx() | |
| 22 | |
| 23 | |
| 24 def get_gene_dict(self): | |
| 25 if self.gene_dict is None: | |
| 26 gene_structures = gene.t_parse_gtf('test') | |
| 27 self.gene_dict = gene_structures.get_genes(self.gtf_file,logger=logger) | |
| 28 return self.gene_dict | |
| 29 | |
| 30 | |
| 31 def get_transcript_idx(self): | |
| 32 if self.transcript_idx is None: | |
| 33 self.transcript_idx = gene_utilities.index_transcripts(self.get_gene_dict(),by_prot_id=False) | |
| 34 return self.transcript_idx | |
| 35 | |
| 36 | |
| 37 def get_name_idx(self): | |
| 38 if self.name_idx is None: | |
| 39 self.name_idx = dict() | |
| 40 for i,t in self.get_transcript_idx().items(): | |
| 41 for name in t.gene.names: | |
| 42 self.name_idx[name] = t.gene | |
| 43 for name in t.names: | |
| 44 self.name_idx[name] = t | |
| 45 if t.prot_id: | |
| 46 self.name_idx[t.prot_id] = t | |
| 47 return self.name_idx | |
| 48 | |
| 49 | |
| 50 def get_gtf_transcript(self,transcript_id): | |
| 51 idx = self.get_transcript_idx() | |
| 52 return idx[transcript_id] if transcript_id in idx else None | |
| 53 | |
| 54 | |
| 55 def transcript_is_coding(self,transcript_id): | |
| 56 tx = self.get_transcript_idx()[transcript_id] | |
| 57 return len(tx.start_codons) > 0 | |
| 58 | |
| 59 | |
| 60 def get_transcript_start_codon(self,transcript_id): | |
| 61 tx = self.get_transcript_idx()[transcript_id] | |
| 62 return tx.start_codons[0] if len(tx.start_codons) > 0 else None | |
| 63 | |
| 64 | |
| 65 def get_bed_line(self,transcript_id,coding=False): | |
| 66 tx = self.get_transcript_idx()[transcript_id] | |
| 67 chrom = tx.gene.contig | |
| 68 chromStart = tx.coding_beg if coding else tx.beg | |
| 69 chromEnd = tx.coding_end if coding else tx.end | |
| 70 name = transcript_id | |
| 71 score = 0 | |
| 72 strand = '+' if tx.gene.strand else '-' | |
| 73 thickStart = tx.coding_beg if tx.coding_beg else chromStart | |
| 74 thickEnd = tx.coding_end if tx.coding_end else chromEnd | |
| 75 itemRgb = '0,0,0' | |
| 76 exons = tx.get_coding_exons() if coding else tx.get_exons() | |
| 77 blockCount = len(exons) | |
| 78 if tx.gene.strand: | |
| 79 strand = '+' | |
| 80 blockSizes = [abs(e-s) for s,e in exons] | |
| 81 | |
| 82 def get_bed_line(self,transcript_id,coding=False): | |
| 83 tx = self.get_transcript_idx()[transcript_id] | |
| 84 chrom = tx.gene.contig | |
| 85 chromStart = tx.coding_beg if coding else tx.beg | |
| 86 chromEnd = tx.coding_end if coding else tx.end | |
| 87 name = transcript_id | |
| 88 score = 0 | |
| 89 strand = '+' if tx.gene.strand else '-' | |
| 90 thickStart = tx.coding_beg if tx.coding_beg else chromStart | |
| 91 thickEnd = tx.coding_end if tx.coding_end else chromEnd | |
| 92 itemRgb = '0,0,0' | |
| 93 exons = tx.get_coding_exons() if coding else tx.get_exons() | |
| 94 blockCount = len(exons) | |
| 95 if tx.gene.strand: | |
| 96 strand = '+' | |
| 97 blockSizes = [abs(e-s) for s,e in exons] | |
| 98 blockStarts = [s - chromStart for s,e in exons] | |
| 99 else: | |
| 100 strand = '-' | |
| 101 blockSizes = [abs(e-s) for s,e in reversed(exons)] | |
| 102 blockStarts = [s - chromStart for s,e in reversed(exons)] | |
| 103 blockSizes = ','.join([str(x) for x in blockSizes]) | |
| 104 blockStarts = ','.join([str(x) for x in blockStarts]) | |
| 105 return '%s\t%d\t%d\t%s\t%s\t%s\t%d\t%d\t%s\t%d\t%s\t%s' % (chrom,chromStart,chromEnd,name,score,strand,thickStart,thickEnd,itemRgb,blockCount,blockSizes,blockStarts) | |
| 106 | |
| 107 | |
| 108 def transcripts_in_range(self,chrom,startpos,endpos,strand=None): | |
| 109 spos = min(startpos,endpos) if endpos else startpos | |
| 110 epos = max(startpos,endpos) if endpos else startpos | |
| 111 transcripts = [] | |
| 112 for i,t in self.get_transcript_idx().items(): | |
| 113 if t.gene.contig == chrom and t.beg <= epos and spos <= t.end: | |
| 114 if strand and t.gene.strand != strand: | |
| 115 continue | |
| 116 transcripts.append(t) | |
| 117 return transcripts | |
| 118 | |
| 119 | |
| 120 def genes_in_range(self,chrom,startpos,endpos,strand=None,gene_types=None): | |
| 121 spos = min(startpos,endpos) if endpos else startpos | |
| 122 epos = max(startpos,endpos) if endpos else startpos | |
| 123 gene_dict = self.get_gene_dict() | |
| 124 gtypes = set(gene_types) & set(gene_dict.keys()) if gene_types else set(gene_dict.keys()) | |
| 125 genes = [] | |
| 126 for gt in gtypes: | |
| 127 for gene in gene_dict[gt]: | |
| 128 if gene.contig == chrom and gene.beg <= epos and spos <= gene.end: | |
| 129 if strand and gene.strand != strand: | |
| 130 continue | |
| 131 genes.append(gene) | |
| 132 return genes | |
| 133 | |
| 134 | |
| 135 def get_sequence(self,chrom,start,end): | |
| 136 if self.twobit: | |
| 137 if chrom in self.twobit: | |
| 138 return self.twobit[chrom][start:end] | |
| 139 contig = chrom[3:] if chrom.startswith('chr') else 'chr%s' % chrom | |
| 140 if contig in self.twobit: | |
| 141 return self.twobit[contig][start:end] | |
| 142 return None | |
| 143 | |
| 144 | |
| 145 def sequence_sizes(self): | |
| 146 return self.twobit.sequence_sizes() | |
| 147 | |
| 148 | |
| 149 def get_transcript_seq(self,transcript_id,coding=False): | |
| 150 tx = self.get_transcript_idx()[transcript_id] | |
| 151 chrom = tx.gene.contig | |
| 152 exonbnds = tx.get_coding_exons() if coding else tx.get_exons() | |
| 153 if tx.gene.strand: | |
| 154 seqs = [self.get_sequence(chrom,s,e) for s,e in exonbnds] | |
| 155 else: | |
| 156 seqs = [reverse_complement(self.get_sequence(chrom,s,e)) for s,e in exonbnds] | |
| 157 return ''.join(seqs) | |
| 158 | |
| 159 | |
| 160 def get_cdna(self,transcript_id): | |
| 161 return self.get_transcript_seq(transcript_id,coding=False) | |
| 162 | |
| 163 | |
| 164 def get_cds(self,transcript_id): | |
| 165 return self.get_transcript_seq(transcript_id,coding=True) | |
| 166 | |
| 167 | |
| 168 def genome_to_transcript_pos(self,transcript_id,genome_pos,coding=False): | |
| 169 tx = self.get_transcript_idx()[transcript_id] | |
| 170 if not tx.beg <= genome_pos < tx.end: | |
| 171 return None | |
| 172 exonbnds = tx.get_coding_exons() if coding else tx.get_exons() | |
| 173 cdna_pos = 0 | |
| 174 if tx.gene.strand: | |
| 175 for s,e in exonbnds: | |
| 176 if s <= genome_pos < e: | |
| 177 cdna_pos += genome_pos - s | |
| 178 break | |
| 179 else: | |
| 180 cdna_pos += e - s | |
| 181 else: | |
| 182 for s,e in exonbnds: | |
| 183 if s <= genome_pos < e: | |
| 184 cdna_pos += e - genome_pos - 1 | |
| 185 break | |
| 186 else: | |
| 187 cdna_pos += e - s | |
| 188 return cdna_pos | |
| 189 | |
| 190 | |
| 191 def genome_to_cdna_pos(self,transcript_id,genome_pos): | |
| 192 return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=False) | |
| 193 | |
| 194 | |
| 195 def genome_to_cds_pos(self,transcript_id,genome_pos): | |
| 196 return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=True) | |
| 197 | |
| 198 |
