Mercurial > repos > galaxy-australia > alphafold2
diff validate_fasta.py @ 0:67c179acafdd draft default tip
"planemo upload for repository https://github.com/usegalaxy-au/galaxy-local-tools commit a510e97ebd604a5e30b1f16e5031f62074f23e86-dirty"
| author | galaxy-australia |
|---|---|
| date | Thu, 03 Mar 2022 02:54:20 +0000 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/validate_fasta.py Thu Mar 03 02:54:20 2022 +0000 @@ -0,0 +1,177 @@ +"""Validate input FASTA sequence.""" + +import re +import argparse +from typing import List, TextIO + + +class Fasta: + def __init__(self, header_str: str, seq_str: str): + self.header = header_str + self.aa_seq = seq_str + + +class FastaLoader: + def __init__(self, fasta_path: str): + """Initialize from FASTA file.""" + self.fastas = [] + self.load(fasta_path) + print("Loaded FASTA sequences:") + for f in self.fastas: + print(f.header) + print(f.aa_seq) + + def load(self, fasta_path: str): + """Load bare or FASTA formatted sequence.""" + with open(fasta_path, 'r') as f: + self.content = f.read() + + if "__cn__" in self.content: + # Pasted content with escaped characters + self.newline = '__cn__' + self.caret = '__gt__' + else: + # Uploaded file with normal content + self.newline = '\n' + self.caret = '>' + + self.lines = self.content.split(self.newline) + header, sequence = self.interpret_first_line() + + i = 0 + while i < len(self.lines): + line = self.lines[i] + if line.startswith(self.caret): + self.update_fastas(header, sequence) + header = '>' + self.strip_header(line) + sequence = '' + else: + sequence += line.strip('\n ') + i += 1 + + # after reading whole file, header & sequence buffers might be full + self.update_fastas(header, sequence) + + def interpret_first_line(self): + line = self.lines[0] + if line.startswith(self.caret): + header = '>' + self.strip_header(line) + return header, '' + else: + return '', line + + def strip_header(self, line): + """Strip characters escaped with underscores from pasted text.""" + return re.sub(r'\_\_.{2}\_\_', '', line).strip('>') + + def update_fastas(self, header: str, sequence: str): + # if we have a sequence + if sequence: + # create generic header if not exists + if not header: + fasta_count = len(self.fastas) + header = f'>sequence_{fasta_count}' + + # Create new Fasta + self.fastas.append(Fasta(header, sequence)) + + +class FastaValidator: + def __init__(self, fasta_list: List[Fasta]): + self.fasta_list = fasta_list + self.min_length = 30 + self.max_length = 2000 + self.iupac_characters = { + 'A', 'B', 'C', 'D', 'E', 'F', 'G', + 'H', 'I', 'K', 'L', 'M', 'N', 'P', + 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', + 'Y', 'Z', '-' + } + + def validate(self): + """performs fasta validation""" + self.validate_num_seqs() + self.validate_length() + self.validate_alphabet() + # not checking for 'X' nucleotides at the moment. + # alphafold can throw an error if it doesn't like it. + #self.validate_x() + + def validate_num_seqs(self) -> None: + if len(self.fasta_list) > 1: + raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input') + elif len(self.fasta_list) == 0: + raise Exception(f'Error encountered validating fasta: input file has no fasta sequences') + + def validate_length(self): + """Confirms whether sequence length is valid. """ + fasta = self.fasta_list[0] + if len(fasta.aa_seq) < self.min_length: + raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa') + if len(fasta.aa_seq) > self.max_length: + raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa') + + def validate_alphabet(self): + """ + Confirms whether the sequence conforms to IUPAC codes. + If not, reports the offending character and its position. + """ + fasta = self.fasta_list[0] + for i, char in enumerate(fasta.aa_seq.upper()): + if char not in self.iupac_characters: + raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: "{char}"') + + def validate_x(self): + """checks if any bases are X. TODO check whether alphafold accepts X bases. """ + fasta = self.fasta_list[0] + for i, char in enumerate(fasta.aa_seq.upper()): + if char == 'X': + raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}') + + +class FastaWriter: + def __init__(self) -> None: + self.outfile = 'alphafold.fasta' + self.formatted_line_len = 60 + + def write(self, fasta: Fasta): + with open(self.outfile, 'w') as fp: + header = fasta.header + seq = self.format_sequence(fasta.aa_seq) + fp.write(header + '\n') + fp.write(seq + '\n') + + def format_sequence(self, aa_seq: str): + formatted_seq = '' + for i in range(0, len(aa_seq), self.formatted_line_len): + formatted_seq += aa_seq[i: i + self.formatted_line_len] + '\n' + return formatted_seq + + +def main(): + # load fasta file + args = parse_args() + fas = FastaLoader(args.input_fasta) + + # validate + fv = FastaValidator(fas.fastas) + fv.validate() + + # write cleaned version + fw = FastaWriter() + fw.write(fas.fastas[0]) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument( + "input_fasta", + help="input fasta file", + type=str + ) + return parser.parse_args() + + + +if __name__ == '__main__': + main()
