Mercurial > repos > tduigou > get_db_info
diff get_db_info.py @ 6:56a0938d534d draft
planemo upload for repository https://github.com/brsynth commit 6ae809b563b40bcdb6be2e74fe2a84ddad5484ae
| author | tduigou |
|---|---|
| date | Fri, 18 Apr 2025 12:52:49 +0000 |
| parents | 61158f32e5c3 |
| children | 8984fabea52c |
line wrap: on
line diff
--- a/get_db_info.py Fri Apr 18 09:57:55 2025 +0000 +++ b/get_db_info.py Fri Apr 18 12:52:49 2025 +0000 @@ -5,14 +5,12 @@ import os import re import pandas as pd -from Bio import SeqIO from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord from sqlalchemy import create_engine, inspect from sqlalchemy.sql import text from sqlalchemy.engine.url import make_url from sqlalchemy.exc import OperationalError -from Bio.SeqFeature import SeqFeature, FeatureLocation def fix_db_uri(uri): """Replace __at__ with @ in the URI if needed.""" @@ -83,7 +81,7 @@ time.sleep(2) raise Exception("Database connection failed after timeout.") -def fetch_annotations(csv_file, sequence_column, annotation_columns, db_uri, table_name, fragment_column_name, output, output_missing): +def fetch_annotations(csv_file, sequence_column, annotation_columns, db_uri, table_name, fragment_column_name, output): """Fetch annotations from the database and save the result as GenBank files.""" db_uri = fix_db_uri(db_uri) df = pd.read_csv(csv_file, sep=',') @@ -101,12 +99,12 @@ # Fetch all fragments from the table once if fragment_column_name not in columns: raise ValueError(f"Fragment column '{fragment_column_name}' not found in table '{table_name}'.") - + fragment_column_index = columns.index(fragment_column_name) all_rows = connection.execute(text(f"SELECT * FROM {table_name}")).fetchall() fragment_map = {row[fragment_column_index]: row for row in all_rows} - - # Check if all fragments from CSV are present in DB + + # Compare fragments between CSV and DB csv_fragments = set() for _, row in df.iterrows(): for col in df.columns: @@ -117,15 +115,13 @@ missing_fragments = sorted(list(csv_fragments - db_fragments)) if missing_fragments: - with open(output_missing, "w") as f: - for fragment in missing_fragments: - f.write(f"{fragment}\n") - print(f"Missing fragments written to {output_missing}") - return output_missing # Exit early if fragments are missing - + raise ValueError( + f" Missing fragments in DB: {', '.join(missing_fragments)}" + ) + + # === CONTINUE WITH GB FILE CREATION === for _, row in df.iterrows(): annotated_row = {"Backbone": row["ID"], "Fragments": []} - for col in df.columns: if col != "ID": fragment = row[col] @@ -144,7 +140,7 @@ except Exception as e: print(f"Error occurred during annotation: {e}") - return + raise # Ensures the error exits the script # GenBank file generation per fragment try: @@ -227,7 +223,6 @@ parser.add_argument("--table", required=True, help="Table name in the database") parser.add_argument("--fragment_column", required=True, help="Fragment column name in the database") parser.add_argument("--output", required=True, help="Output dir for gb files") - parser.add_argument("--output_missing", required=True, help="Output txt file for missing fragment in the DB") args = parser.parse_args() # Start the Docker container (if not already running) @@ -239,7 +234,7 @@ wait_for_db(db_uri) # Fetch annotations from the database and save as JSON - fetch_annotations(args.input, args.sequence_column, args.annotation_columns, db_uri, args.table, args.fragment_column, args.output, args.output_missing) + fetch_annotations(args.input, args.sequence_column, args.annotation_columns, db_uri, args.table, args.fragment_column, args.output) if __name__ == "__main__": main()
