Mercurial > repos > tduigou > get_db_info
diff get_db_info.py @ 4:61158f32e5c3 draft
planemo upload for repository https://github.com/brsynth commit 6ae809b563b40bcdb6be2e74fe2a84ddad5484ae
| author | tduigou |
|---|---|
| date | Fri, 18 Apr 2025 09:56:13 +0000 |
| parents | 0443378b44e5 |
| children | 56a0938d534d |
line wrap: on
line diff
--- a/get_db_info.py Fri Apr 11 13:22:51 2025 +0000 +++ b/get_db_info.py Fri Apr 18 09:56:13 2025 +0000 @@ -3,12 +3,16 @@ import argparse import socket import os +import re import pandas as pd -import json +from Bio import SeqIO +from Bio.Seq import Seq +from Bio.SeqRecord import SeqRecord from sqlalchemy import create_engine, inspect from sqlalchemy.sql import text from sqlalchemy.engine.url import make_url from sqlalchemy.exc import OperationalError +from Bio.SeqFeature import SeqFeature, FeatureLocation def fix_db_uri(uri): """Replace __at__ with @ in the URI if needed.""" @@ -79,8 +83,8 @@ time.sleep(2) raise Exception("Database connection failed after timeout.") -def fetch_annotations(csv_file, db_uri, table_name, fragment_column_name, output): - """Fetch annotations from the database and save the result as a JSON file.""" +def fetch_annotations(csv_file, sequence_column, annotation_columns, db_uri, table_name, fragment_column_name, output, output_missing): + """Fetch annotations from the database and save the result as GenBank files.""" db_uri = fix_db_uri(db_uri) df = pd.read_csv(csv_file, sep=',') @@ -102,6 +106,23 @@ all_rows = connection.execute(text(f"SELECT * FROM {table_name}")).fetchall() fragment_map = {row[fragment_column_index]: row for row in all_rows} + # Check if all fragments from CSV are present in DB + csv_fragments = set() + for _, row in df.iterrows(): + for col in df.columns: + if col != "ID": + csv_fragments.add(row[col]) + + db_fragments = set(fragment_map.keys()) + missing_fragments = sorted(list(csv_fragments - db_fragments)) + + if missing_fragments: + with open(output_missing, "w") as f: + for fragment in missing_fragments: + f.write(f"{fragment}\n") + print(f"Missing fragments written to {output_missing}") + return output_missing # Exit early if fragments are missing + for _, row in df.iterrows(): annotated_row = {"Backbone": row["ID"], "Fragments": []} @@ -125,22 +146,88 @@ print(f"Error occurred during annotation: {e}") return + # GenBank file generation per fragment try: - with open(output, "w") as f: - json.dump(annotated_data, f, indent=4) - print(f"Annotation saved to {output}") + for annotated_row in annotated_data: + backbone_id = annotated_row["Backbone"] + for fragment in annotated_row["Fragments"]: + fragment_id = fragment["id"] + sequence = fragment.get(sequence_column, "") + annotation = fragment.get(annotation_columns, "") + + # Create the SeqRecord + record = SeqRecord( + Seq(sequence), + id=fragment_id, + name=fragment_id, + description=f"Fragment {fragment_id} from Backbone {backbone_id}" + ) + + # Add annotations to GenBank header + record.annotations = { + k: str(fragment[k]) for k in annotation_columns if k in fragment + } + + # LOCUS line extraction from annotation (copy-paste the LOCUS from annotation) + locus_line_match = re.search(r"LOCUS\s+.+", annotation) + if locus_line_match: + locus_line = locus_line_match.group() + else: + print(f"LOCUS info missing for fragment {fragment_id}") + locus_line = f"LOCUS {fragment_id: <20} {len(sequence)} bp DNA linear UNK 01-JAN-2025" + + # Format sequence as per GenBank standards (with ORIGIN and line breaks) + if "ORIGIN" in sequence: + origin_block = sequence.strip() + else: + # Format sequence as per GenBank standards (with ORIGIN and line breaks) + formatted_sequence = "ORIGIN\n" + seq_str = str(record.seq) + for i in range(0, len(seq_str), 60): # 60 bases per line + line_seq = seq_str[i:i + 60] + formatted_sequence += f"{str(i + 1).rjust(9)} { ' '.join([line_seq[j:j+10] for j in range(0, len(line_seq), 10)]) }\n" + origin_block = formatted_sequence.strip() + + # Find and copy the FEATURES section directly from annotation + features_section = "" + features_start = annotation.find("FEATURES") + if features_start != -1: + features_section = annotation[features_start:] + + # Writing the GenBank file + if not os.path.exists(output): + os.makedirs(output) + + gb_filename = os.path.join(output, f"{fragment_id}.gb") + with open(gb_filename, "w") as f: + # Write the LOCUS line + f.write(locus_line + "\n") + # Write DEFINITION, ACCESSION, and other annotations + f.write(f"DEFINITION {record.description}\n") + f.write(f"ACCESSION {record.id}\n") + f.write(f"VERSION DB\n") + f.write(f"KEYWORDS .\n") + f.write(f"SOURCE .\n") + # Write the FEATURES section directly from annotation + f.write(features_section) + # Write the ORIGIN section + f.write(origin_block + "\n") + f.write("//\n") + except Exception as e: - print(f"Error saving output file: {e}") - - return output + print(f"Error saving GenBank files: {e}") + return def main(): parser = argparse.ArgumentParser(description="Fetch annotations from PostgreSQL database and save as JSON.") parser.add_argument("--input", required=True, help="Input CSV file") + parser.add_argument("--sequence_column", required=True, help="DB column contains sequence for ganbank file") + parser.add_argument("--annotation_columns", required=True, help="DB column contains head for ganbank file") parser.add_argument("--db_uri", required=True, help="Database URI connection string") parser.add_argument("--table", required=True, help="Table name in the database") parser.add_argument("--fragment_column", required=True, help="Fragment column name in the database") - parser.add_argument("--output", required=True, help="Output JSON file") + parser.add_argument("--output", required=True, help="Output dir for gb files") + parser.add_argument("--output_missing", required=True, help="Output txt file for missing fragment in the DB") args = parser.parse_args() # Start the Docker container (if not already running) @@ -152,7 +239,7 @@ wait_for_db(db_uri) # Fetch annotations from the database and save as JSON - fetch_annotations(args.input, db_uri, args.table, args.fragment_column, args.output) + fetch_annotations(args.input, args.sequence_column, args.annotation_columns, db_uri, args.table, args.fragment_column, args.output, args.output_missing) if __name__ == "__main__": main()
