view sculpt_sequences.py @ 1:a0cd867780ec draft default tip

planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/Examples/blob/master/templates/template1.ipynb commit 6b6ce806b5d016b3c7f20318180eff2dbe64395a-dirty
author tduigou
date Thu, 17 Jul 2025 13:24:49 +0000
parents 0c7f75a2338b
children
line wrap: on
line source

import argparse
import json
import sys
import os
import re
import dnacauldron
import dnachisel
from Bio import SeqIO
import proglog


def smart_number(val):
        try:
            float_val = float(val)
            if float_val.is_integer():
                return int(float_val)
            else:
                return float_val
        except ValueError:
            raise ValueError(f"Invalid number: {val}")


def sculpt_sequances(files_to_sculpt, file_name_mapping, outdir_scul, outdir_unscul, use_file_names_as_id,
                     avoid_patterns, enforce_gc_content, DnaOptimizationProblemClass,
                     kmer_size,hairpin_constraints):

    files_to_sculpt = files_to_sculpt.split(',')

    records_to_sculpt = dnacauldron.biotools.load_records_from_files(
        files=files_to_sculpt,
        folder=None,
        use_file_names_as_ids=use_file_names_as_id
    )

    constraint_list = []

 # avoid_patterns pearsing
    for pattern in avoid_patterns:
        constraint_list.append(dnachisel.AvoidPattern(pattern))
        print(f"AvoidPattern constraint: {pattern}")

 # gc_constraints pearsing
    for constraint in enforce_gc_content:
        constraints = [c.strip() for c in constraint.split('  ') if c.strip()] 
        for line in constraints:
            if not line:
                continue
            print(f"GC constraint: {line}")

            try:
                pairs = [kv.strip() for kv in line.split(',')]
                params = {}
                for pair in pairs:
                    key, val = pair.split('=')
                    key = key.strip()
                    val = val.strip()
                    params[key] = smart_number(val)

                constraint_list.append(dnachisel.EnforceGCContent(**params))

            except Exception as e:
                print(f"Skipping invalid gc_constraints: {line} ({e})")

 # hairpin_constraints pearsing
    for constraint in hairpin_constraints:
        constraints = [c.strip() for c in constraint.split('  ') if c.strip()]
        for line in constraints:
            if not line:
                continue
            print(f"Hairpin constraint: {line}")

            try:
                pairs = [kv.strip() for kv in line.split(',')]
                params = {}
                for pair in pairs:
                    key, val = pair.split('=')
                    key = key.strip()
                    val = val.strip()
                    params[key] = smart_number(val)
                    
                constraint_list.append(dnachisel.AvoidHairpins(**params))

            except Exception as e:
                print(f"Skipping invalid hairpin_constraints: {line} ({e})")

 # k_size pearsing
    for k_size in kmer_size:
        try:
            constraint_list.append(dnachisel.UniquifyAllKmers(k=int(k_size)))
            print(f"k-mer size is: {k_size}")
        except ValueError:
            print(f"Skipping invalid k-mer size: {k_size}")
    
 #   refine the real record name dict
    if isinstance(file_name_mapping, str):
        file_name_mapping = dict(
            item.split(":") for item in file_name_mapping.split(",")
        )
    real_names = {
        os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "")
        for k, v in file_name_mapping.items()
    } 

    counter = 0
 #   not_divisible_seqs = []
    for record in records_to_sculpt:

 #       Look up the "real name" from the simplified dict (file_name_mapping)
        real_name = real_names.get(record.id, record.id)
        if real_name is None:
            raise ValueError(f"Error: {record.id} not found in the fragment names dictionary!")

        if DnaOptimizationProblemClass == 'DnaOptimizationProblem':
            problem = dnachisel.DnaOptimizationProblem.from_record(record, extra_constraints=constraint_list)
        elif DnaOptimizationProblemClass == 'CircularDnaOptimizationProblem':
            problem = dnachisel.CircularDnaOptimizationProblem.from_record(record, extra_constraints=constraint_list)

        if not problem.all_constraints_pass():
            counter += 1
            print(real_name)
            len_seq = len(problem.sequence)
            print("Length:", len_seq)
            target_path = os.path.join(outdir_scul, real_name + ".zip")
            problem.optimize_with_report(target=target_path)
            print()
            print(problem.constraints_text_summary())
            print()
        else:  # just save the genbank
            genbank_filename = real_name + ".gb"
            with open(os.path.join(outdir_unscul, genbank_filename), "w") as output_handle:
                SeqIO.write(record, output_handle, "genbank")
    print()
    print("Counter:", counter)
 #   print("Contents of outdir_scul:")
 #   for f in os.listdir(outdir_scul):
 #       print(" -", f)

 #   Unzip all files in scul
    import zipfile
    for file in os.listdir(outdir_scul):
        if file.endswith(".zip"):
            zip_path = os.path.join(outdir_scul, file)
            extract_dir = os.path.join(outdir_scul, os.path.splitext(file)[0])
            os.makedirs(extract_dir, exist_ok=True)
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(extract_dir)
 #   print("Contents of outdir_scul:")
 #   for f in os.listdir(outdir_scul):
 #       print(" -", f)
    reports = os.listdir(outdir_scul)
 #   print (f'report is {reports}')
    from shutil import copyfile
    for report in reports:
        if report.endswith(".zip"):
            continue
        gb_file = os.path.join(outdir_scul, report, "final_sequence.gb")

        target_filename = report + ".gb"
        target_file = os.path.join(outdir_unscul, target_filename)
        print(gb_file, target_file)
        copyfile(gb_file, target_file)
 #   print("Contents of outdir_unscul:")
 #   for f in os.listdir(outdir_unscul):
 #         print(" -", f)

    return outdir_scul, outdir_unscul


def parse_command_line_args():
    parser = argparse.ArgumentParser(description="Evaluate manufacturability of DNA sequences.")

    parser.add_argument("--files_to_sculpt", required=True,
                        help="List of GenBank files (Comma-separated)")
    parser.add_argument('--file_name_mapping', type=str,
                        help='Mapping of Galaxy filenames to original filenames')
    parser.add_argument("--outdir_scul", required=True, help="scul file dir")
    parser.add_argument("--outdir_unscul", required=True, help="unscul file dir")
    parser.add_argument("--use_file_names_as_id", type=lambda x: x.lower() == 'true', default=True,
                        help="Use file names as IDs (True/False)")
    parser.add_argument("--avoid_patterns", required=False,
                        help="List of patterns to avoid (comma-separated, e.g., 'BsaI_site,BsmBI_site')")
    parser.add_argument("--gc_constraints", required=False,
                        help="GC content constraints as 'min;max;window' (space-separated, e.g., '0.3;0.7;100 0.1;0.3;100')")
    parser.add_argument("--DnaOptimizationProblemClass", required=False,
                        help="the class to use for DnaOptimizationProblem")
    parser.add_argument("--hairpin_constraints", required=False,
                        help="Hairpin constraints as 'stem_size;window_size' (space-separated, e.g., '20;200 30;250')")
    parser.add_argument("--kmer_size", required = False,
                        help="K-mer uniqueness size (e.g., '15')")
    parser.add_argument("--json_params", required=False,
                        help="JSON params for the tool")
    parser.add_argument("--use_json_param", required=True,
                            help="If use JSON as param source")

    return parser.parse_args()


def extract_constraints_from_args(args):
    """Extract constraints directly from the command-line arguments."""

    split_pattern = r'(?:__cn__|\s{2,})'

    # 1. Avoid patterns (split by any whitespace)
    avoid_patterns = re.split(split_pattern, args.avoid_patterns.strip()) if args.avoid_patterns.strip() else []

    # 2. Hairpin constraint: one dictionary (print as string later)
    hairpin_constraints = re.split(split_pattern,args.hairpin_constraints.strip()) if args.hairpin_constraints.strip() else []

    # 3. GC constraints: split by 2+ spaces or newlines
    gc_constraints = re.split(split_pattern, args.gc_constraints.strip()) if args.gc_constraints.strip() else []

    # 4. k-mer size: single value or list
    kmer_size = [int(k.strip()) for k in args.kmer_size.strip().split(',') if k.strip()] if args.kmer_size.strip() else []
    
    # 5. DnaOptimizationProblemClass (as string)
    DnaOptimizationProblemClass = args.DnaOptimizationProblemClass if args.DnaOptimizationProblemClass else None
    
    return avoid_patterns, hairpin_constraints, gc_constraints, kmer_size, DnaOptimizationProblemClass


def load_constraints_from_json(json_path):
    with open(json_path, 'r') as f:
        params = json.load(f)

    def split_lines(val):
        if isinstance(val, str):
            return [line.strip() for line in val.strip().split('\n') if line.strip()]
        return val

    avoid_patterns = split_lines(params.get("avoid_patterns", ""))
    hairpin_constraints = split_lines(params.get("hairpin_constraints", ""))
    gc_constraints = split_lines(params.get("gc_constraints", ""))
    kmer_size = [int(k.strip()) for k in str(params.get("kmer_size", "")).split(',') if k.strip()]
    DnaOptimizationProblemClass = params.get("DnaOptimizationProblemClass", None)

    return {
        "avoid_patterns": avoid_patterns,
        "hairpin_constraints": hairpin_constraints,
        "gc_constraints": gc_constraints,
        "kmer_size": kmer_size,
        "DnaOptimizationProblemClass": DnaOptimizationProblemClass
    }

if __name__ == "__main__":
    args = parse_command_line_args()

    avoid_patterns, hairpin_constraints, gc_constraints, kmer_size, DnaOptimizationProblemClass = extract_constraints_from_args(args)

    # Check if the flag --use_json_param is present and set to true
    if "--use_json_param" in sys.argv:
        use_json_index = sys.argv.index("--use_json_param") + 1
        use_json = sys.argv[use_json_index].lower() == "true"
    else:
        use_json = False

    # Now only check --json_params if use_json is True
    if use_json:
        if "--json_params" in sys.argv:
            json_index = sys.argv.index("--json_params") + 1
            json_file = sys.argv[json_index]
            if json_file.lower() != "none":
                json_constraints = load_constraints_from_json(json_file)
                avoid_patterns = json_constraints["avoid_patterns"]
                hairpin_constraints = json_constraints["hairpin_constraints"]
                gc_constraints = json_constraints["gc_constraints"]
                kmer_size = json_constraints["kmer_size"]
                DnaOptimizationProblemClass = json_constraints["DnaOptimizationProblemClass"]

    params = {
        "files_to_sculpt": args.files_to_sculpt,
        "file_name_mapping": args.file_name_mapping,
        "outdir_unscul": args.outdir_unscul,
        "outdir_scul": args.outdir_scul,
        "use_file_names_as_id": args.use_file_names_as_id,
        "avoid_patterns": avoid_patterns,
        "hairpin_constraints": hairpin_constraints,
        "gc_constraints": gc_constraints,
        "kmer_size": kmer_size,
        "DnaOptimizationProblemClass": DnaOptimizationProblemClass
    }


    sculpt_sequances(
        args.files_to_sculpt, args.file_name_mapping,
        args.outdir_scul, args.outdir_unscul,
        args.use_file_names_as_id, avoid_patterns,
        gc_constraints, DnaOptimizationProblemClass,
        kmer_size, hairpin_constraints
    )