Mercurial > repos > tduigou > get_db_info
diff get_db_info.py @ 0:41ac63b5d221 draft
planemo upload for repository https://github.com/brsynth commit 15dbdd1f0a222a8e1b0fb5c16b36885520a3d005
| author | tduigou |
|---|---|
| date | Thu, 10 Apr 2025 08:45:18 +0000 |
| parents | |
| children | 0443378b44e5 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/get_db_info.py Thu Apr 10 08:45:18 2025 +0000 @@ -0,0 +1,154 @@ +import subprocess +import time +import argparse +import socket +import os +import pandas as pd +import json +from sqlalchemy import create_engine, inspect +from sqlalchemy.sql import text +from sqlalchemy.engine.url import make_url +from sqlalchemy.exc import OperationalError + +def fix_db_uri(uri): + """Replace __at__ with @ in the URI if needed.""" + return uri.replace("__at__", "@") + +def is_port_in_use(port): + """Check if a TCP port is already in use on localhost.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex(('localhost', port)) == 0 + +def extract_db_name(uri): + """Extract the database name from the SQLAlchemy URI.""" + url = make_url(uri) + return url.database + +def start_postgres_container(db_name): + """Start a PostgreSQL container with the given database name as the container name.""" + container_name = db_name + + # Check if container is already running + container_running = subprocess.run( + f"docker ps -q -f name={container_name}", shell=True, capture_output=True, text=True + ) + + if container_running.stdout.strip(): + print(f"Container '{container_name}' is already running.") + return + + # Check if container exists (stopped) + container_exists = subprocess.run( + f"docker ps -a -q -f name={container_name}", shell=True, capture_output=True, text=True + ) + + if container_exists.stdout.strip(): + print(f"Starting existing container '{container_name}'...") + subprocess.run(f"docker start {container_name}", shell=True) + print(f"PostgreSQL Docker container '{container_name}' activated.") + return + + # If container does not exist, create and start a new one + port = 5432 if not is_port_in_use(5432) else 5433 + postgres_password = os.getenv("POSTGRES_PASSWORD", "RK17") + + start_command = [ + "docker", "run", "--name", container_name, + "-e", f"POSTGRES_PASSWORD={postgres_password}", + "-p", f"{port}:5432", + "-d", "postgres" + ] + + try: + subprocess.run(start_command, check=True) + print(f"PostgreSQL Docker container '{container_name}' started on port {port}.") + except subprocess.CalledProcessError as e: + print(f"Failed to start Docker container: {e}") + +def wait_for_db(uri, timeout=60): + """Try connecting to the DB until it works or timeout.""" + engine = create_engine(uri) + start_time = time.time() + while time.time() - start_time < timeout: + try: + with engine.connect(): + print("Connected to database.") + return + except OperationalError: + print("Database not ready, retrying...") + time.sleep(2) + raise Exception("Database connection failed after timeout.") + +def fetch_annotations(csv_file, db_uri, table_name, fragment_column_name, output): + """Fetch annotations from the database and save the result as a JSON file.""" + db_uri = fix_db_uri(db_uri) + df = pd.read_csv(csv_file, sep=',') + + engine = create_engine(db_uri) + connection = engine.connect() + + annotated_data = [] + + try: + with connection: + inspector = inspect(engine) + columns = [column['name'] for column in inspector.get_columns(table_name)] + + # Fetch all fragments from the table once + all_rows = connection.execute(text(f"SELECT * FROM {table_name}")).fetchall() + fragment_map = {row[0]: row for row in all_rows} # Assuming the first column is fragment ID + + for _, row in df.iterrows(): + annotated_row = {"Backbone": row["ID"], "Fragments": []} + + for col in df.columns: + if col != "ID": + fragment = row[col] + db_row = fragment_map.get(fragment) + + if db_row: + fragment_data = {"id": fragment} + for i, column_name in enumerate(columns[1:]): # skip ID column + fragment_data[column_name] = db_row[i + 1] + else: + fragment_data = {"id": fragment, "metadata": "No data found"} + + annotated_row["Fragments"].append(fragment_data) + + annotated_data.append(annotated_row) + + except Exception as e: + print(f"Error occurred during annotation: {e}") + return + + try: + with open(output, "w") as f: + json.dump(annotated_data, f, indent=4) + print(f"Annotation saved to {output}") + except Exception as e: + print(f"Error saving output file: {e}") + + return output + +def main(): + parser = argparse.ArgumentParser(description="Fetch annotations from PostgreSQL database and save as JSON.") + parser.add_argument("--input", required=True, help="Input CSV file") + parser.add_argument("--db_uri", required=True, help="Database URI connection string") + parser.add_argument("--table", required=True, help="Table name in the database") + parser.add_argument("--fragment_column", required=True, help="Fragment column name in the database") + parser.add_argument("--output", required=True, help="Output JSON file") + args = parser.parse_args() + + # Start the Docker container (if not already running) + db_uri = fix_db_uri(args.db_uri) + db_name = extract_db_name(db_uri) + start_postgres_container(db_name) + + # Wait until the database is ready + wait_for_db(db_uri) + + # Fetch annotations from the database and save as JSON + fetch_annotations(args.input, db_uri, args.table, args.fragment_column, args.output) + +if __name__ == "__main__": + main()
