comparison validate_fasta.py @ 0:67c179acafdd draft default tip

"planemo upload for repository https://github.com/usegalaxy-au/galaxy-local-tools commit a510e97ebd604a5e30b1f16e5031f62074f23e86-dirty"
author galaxy-australia
date Thu, 03 Mar 2022 02:54:20 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:67c179acafdd
1 """Validate input FASTA sequence."""
2
3 import re
4 import argparse
5 from typing import List, TextIO
6
7
8 class Fasta:
9 def __init__(self, header_str: str, seq_str: str):
10 self.header = header_str
11 self.aa_seq = seq_str
12
13
14 class FastaLoader:
15 def __init__(self, fasta_path: str):
16 """Initialize from FASTA file."""
17 self.fastas = []
18 self.load(fasta_path)
19 print("Loaded FASTA sequences:")
20 for f in self.fastas:
21 print(f.header)
22 print(f.aa_seq)
23
24 def load(self, fasta_path: str):
25 """Load bare or FASTA formatted sequence."""
26 with open(fasta_path, 'r') as f:
27 self.content = f.read()
28
29 if "__cn__" in self.content:
30 # Pasted content with escaped characters
31 self.newline = '__cn__'
32 self.caret = '__gt__'
33 else:
34 # Uploaded file with normal content
35 self.newline = '\n'
36 self.caret = '>'
37
38 self.lines = self.content.split(self.newline)
39 header, sequence = self.interpret_first_line()
40
41 i = 0
42 while i < len(self.lines):
43 line = self.lines[i]
44 if line.startswith(self.caret):
45 self.update_fastas(header, sequence)
46 header = '>' + self.strip_header(line)
47 sequence = ''
48 else:
49 sequence += line.strip('\n ')
50 i += 1
51
52 # after reading whole file, header & sequence buffers might be full
53 self.update_fastas(header, sequence)
54
55 def interpret_first_line(self):
56 line = self.lines[0]
57 if line.startswith(self.caret):
58 header = '>' + self.strip_header(line)
59 return header, ''
60 else:
61 return '', line
62
63 def strip_header(self, line):
64 """Strip characters escaped with underscores from pasted text."""
65 return re.sub(r'\_\_.{2}\_\_', '', line).strip('>')
66
67 def update_fastas(self, header: str, sequence: str):
68 # if we have a sequence
69 if sequence:
70 # create generic header if not exists
71 if not header:
72 fasta_count = len(self.fastas)
73 header = f'>sequence_{fasta_count}'
74
75 # Create new Fasta
76 self.fastas.append(Fasta(header, sequence))
77
78
79 class FastaValidator:
80 def __init__(self, fasta_list: List[Fasta]):
81 self.fasta_list = fasta_list
82 self.min_length = 30
83 self.max_length = 2000
84 self.iupac_characters = {
85 'A', 'B', 'C', 'D', 'E', 'F', 'G',
86 'H', 'I', 'K', 'L', 'M', 'N', 'P',
87 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
88 'Y', 'Z', '-'
89 }
90
91 def validate(self):
92 """performs fasta validation"""
93 self.validate_num_seqs()
94 self.validate_length()
95 self.validate_alphabet()
96 # not checking for 'X' nucleotides at the moment.
97 # alphafold can throw an error if it doesn't like it.
98 #self.validate_x()
99
100 def validate_num_seqs(self) -> None:
101 if len(self.fasta_list) > 1:
102 raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input')
103 elif len(self.fasta_list) == 0:
104 raise Exception(f'Error encountered validating fasta: input file has no fasta sequences')
105
106 def validate_length(self):
107 """Confirms whether sequence length is valid. """
108 fasta = self.fasta_list[0]
109 if len(fasta.aa_seq) < self.min_length:
110 raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa')
111 if len(fasta.aa_seq) > self.max_length:
112 raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa')
113
114 def validate_alphabet(self):
115 """
116 Confirms whether the sequence conforms to IUPAC codes.
117 If not, reports the offending character and its position.
118 """
119 fasta = self.fasta_list[0]
120 for i, char in enumerate(fasta.aa_seq.upper()):
121 if char not in self.iupac_characters:
122 raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: "{char}"')
123
124 def validate_x(self):
125 """checks if any bases are X. TODO check whether alphafold accepts X bases. """
126 fasta = self.fasta_list[0]
127 for i, char in enumerate(fasta.aa_seq.upper()):
128 if char == 'X':
129 raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}')
130
131
132 class FastaWriter:
133 def __init__(self) -> None:
134 self.outfile = 'alphafold.fasta'
135 self.formatted_line_len = 60
136
137 def write(self, fasta: Fasta):
138 with open(self.outfile, 'w') as fp:
139 header = fasta.header
140 seq = self.format_sequence(fasta.aa_seq)
141 fp.write(header + '\n')
142 fp.write(seq + '\n')
143
144 def format_sequence(self, aa_seq: str):
145 formatted_seq = ''
146 for i in range(0, len(aa_seq), self.formatted_line_len):
147 formatted_seq += aa_seq[i: i + self.formatted_line_len] + '\n'
148 return formatted_seq
149
150
151 def main():
152 # load fasta file
153 args = parse_args()
154 fas = FastaLoader(args.input_fasta)
155
156 # validate
157 fv = FastaValidator(fas.fastas)
158 fv.validate()
159
160 # write cleaned version
161 fw = FastaWriter()
162 fw.write(fas.fastas[0])
163
164
165 def parse_args() -> argparse.Namespace:
166 parser = argparse.ArgumentParser()
167 parser.add_argument(
168 "input_fasta",
169 help="input fasta file",
170 type=str
171 )
172 return parser.parse_args()
173
174
175
176 if __name__ == '__main__':
177 main()