Mercurial > repos > tduigou > sculpt_sequences
diff sculpt_sequences.py @ 0:0c7f75a2338b draft
planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/Examples/blob/master/templates/template1.ipynb commit fa4c85dd6ad48d404a28e21667f18b628bbdc702-dirty
| author | tduigou |
|---|---|
| date | Mon, 07 Jul 2025 13:11:42 +0000 |
| parents | |
| children | a0cd867780ec |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sculpt_sequences.py Mon Jul 07 13:11:42 2025 +0000 @@ -0,0 +1,224 @@ +import argparse +import os +import re +import dnacauldron +import dnachisel +from Bio import SeqIO +import proglog + + +def smart_number(val): + try: + float_val = float(val) + if float_val.is_integer(): + return int(float_val) + else: + return float_val + except ValueError: + raise ValueError(f"Invalid number: {val}") + + +def sculpt_sequances(files_to_sculpt, file_name_mapping, outdir_scul, outdir_unscul, use_file_names_as_id, + avoid_patterns, enforce_gc_content, DnaOptimizationProblemClass, + kmer_size,hairpin_constraints): + + files_to_sculpt = files_to_sculpt.split(',') + + records_to_sculpt = dnacauldron.biotools.load_records_from_files( + files=files_to_sculpt, + folder=None, + use_file_names_as_ids=use_file_names_as_id + ) + + constraint_list = [] + + # avoid_patterns pearsing + for pattern in avoid_patterns: + constraint_list.append(dnachisel.AvoidPattern(pattern)) + print(f"AvoidPattern constraint: {pattern}") + + # gc_constraints pearsing + for constraint in enforce_gc_content: + constraints = [c.strip() for c in constraint.split(' ') if c.strip()] + for line in constraints: + if not line: + continue + print(f"GC constraint: {line}") + + try: + pairs = [kv.strip() for kv in line.split(',')] + params = {} + for pair in pairs: + key, val = pair.split('=') + key = key.strip() + val = val.strip() + params[key] = smart_number(val) + + constraint_list.append(dnachisel.EnforceGCContent(**params)) + + except Exception as e: + print(f"Skipping invalid gc_constraints: {line} ({e})") + + # hairpin_constraints pearsing + for constraint in hairpin_constraints: + constraints = [c.strip() for c in constraint.split(' ') if c.strip()] + for line in constraints: + if not line: + continue + print(f"Hairpin constraint: {line}") + + try: + pairs = [kv.strip() for kv in line.split(',')] + params = {} + for pair in pairs: + key, val = pair.split('=') + key = key.strip() + val = val.strip() + params[key] = smart_number(val) + + constraint_list.append(dnachisel.AvoidHairpins(**params)) + + except Exception as e: + print(f"Skipping invalid hairpin_constraints: {line} ({e})") + + # k_size pearsing + for k_size in kmer_size: + try: + constraint_list.append(dnachisel.UniquifyAllKmers(k=int(k_size))) + print(f"k-mer size is: {k_size}") + except ValueError: + print(f"Skipping invalid k-mer size: {k_size}") + + # refine the real record name dict + if isinstance(file_name_mapping, str): + file_name_mapping = dict( + item.split(":") for item in file_name_mapping.split(",") + ) + real_names = { + os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "") + for k, v in file_name_mapping.items() + } + + counter = 0 + # not_divisible_seqs = [] + for record in records_to_sculpt: + + # Look up the "real name" from the simplified dict (file_name_mapping) + real_name = real_names.get(record.id, record.id) + if real_name is None: + raise ValueError(f"Error: {record.id} not found in the fragment names dictionary!") + + if DnaOptimizationProblemClass == 'DnaOptimizationProblem': + problem = dnachisel.DnaOptimizationProblem.from_record(record, extra_constraints=constraint_list) + elif DnaOptimizationProblemClass == 'CircularDnaOptimizationProblem': + problem = dnachisel.CircularDnaOptimizationProblem.from_record(record, extra_constraints=constraint_list) + + if not problem.all_constraints_pass(): + counter += 1 + print(real_name) + len_seq = len(problem.sequence) + print("Length:", len_seq) + target_path = os.path.join(outdir_scul, real_name + ".zip") + problem.optimize_with_report(target=target_path) + print() + print(problem.constraints_text_summary()) + print() + else: # just save the genbank + genbank_filename = real_name + ".gb" + with open(os.path.join(outdir_unscul, genbank_filename), "w") as output_handle: + SeqIO.write(record, output_handle, "genbank") + print() + print("Counter:", counter) + # print("Contents of outdir_scul:") + # for f in os.listdir(outdir_scul): + # print(" -", f) + + # Unzip all files in scul + import zipfile + for file in os.listdir(outdir_scul): + if file.endswith(".zip"): + zip_path = os.path.join(outdir_scul, file) + extract_dir = os.path.join(outdir_scul, os.path.splitext(file)[0]) + os.makedirs(extract_dir, exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + zip_ref.extractall(extract_dir) + # print("Contents of outdir_scul:") + # for f in os.listdir(outdir_scul): + # print(" -", f) + reports = os.listdir(outdir_scul) + # print (f'report is {reports}') + from shutil import copyfile + for report in reports: + if report.endswith(".zip"): + continue + gb_file = os.path.join(outdir_scul, report, "final_sequence.gb") + + target_filename = report + ".gb" + target_file = os.path.join(outdir_unscul, target_filename) + print(gb_file, target_file) + copyfile(gb_file, target_file) + # print("Contents of outdir_unscul:") + # for f in os.listdir(outdir_unscul): + # print(" -", f) + + return outdir_scul, outdir_unscul + + +def parse_command_line_args(): + parser = argparse.ArgumentParser(description="Evaluate manufacturability of DNA sequences.") + + parser.add_argument("--files_to_sculpt", required=True, + help="List of GenBank files (Comma-separated)") + parser.add_argument('--file_name_mapping', type=str, + help='Mapping of Galaxy filenames to original filenames') + parser.add_argument("--outdir_scul", required=True, help="scul file dir") + parser.add_argument("--outdir_unscul", required=True, help="unscul file dir") + parser.add_argument("--use_file_names_as_id", type=lambda x: x.lower() == 'true', default=True, + help="Use file names as IDs (True/False)") + parser.add_argument("--avoid_patterns", required=True, + help="List of patterns to avoid (comma-separated, e.g., 'BsaI_site,BsmBI_site')") + parser.add_argument("--gc_constraints", required=True, + help="GC content constraints as 'min;max;window' (space-separated, e.g., '0.3;0.7;100 0.1;0.3;100')") + parser.add_argument("--DnaOptimizationProblemClass", required=True, + help="the class to use for DnaOptimizationProblem") + parser.add_argument("--hairpin_constraints", required=True, + help="Hairpin constraints as 'stem_size;window_size' (space-separated, e.g., '20;200 30;250')") + parser.add_argument("--kmer_size", required = True, + help="K-mer uniqueness size (e.g., '15')") + + return parser.parse_args() + + +def extract_constraints_from_args(args): + """Extract constraints directly from the command-line arguments.""" + + split_pattern = r'(?:__cn__|\s{2,})' + + # 1. Avoid patterns (split by any whitespace) + avoid_patterns = re.split(split_pattern, args.avoid_patterns.strip()) if args.avoid_patterns.strip() else [] + + # 2. Hairpin constraint: one dictionary (print as string later) + hairpin_constraints = re.split(split_pattern,args.hairpin_constraints.strip()) if args.hairpin_constraints.strip() else [] + + # 3. GC constraints: split by 2+ spaces or newlines + gc_constraints = re.split(split_pattern, args.gc_constraints.strip()) if args.gc_constraints.strip() else [] + + # 4. k-mer size: single value or list + kmer_size = [int(k.strip()) for k in args.kmer_size.strip().split(',') if k.strip()] if args.kmer_size.strip() else [] + + return avoid_patterns, hairpin_constraints, gc_constraints, kmer_size + + + +if __name__ == "__main__": + args = parse_command_line_args() + + avoid_patterns, hairpin_constraints, gc_constraints, kmer_size, = extract_constraints_from_args(args) + + sculpt_sequances( + args.files_to_sculpt, args.file_name_mapping, + args.outdir_scul, args.outdir_unscul, + args.use_file_names_as_id, avoid_patterns, + gc_constraints, args.DnaOptimizationProblemClass, + kmer_size, hairpin_constraints + )
