Mercurial > repos > dfornika > kma_result_to_mlst
diff kma_result_to_mlst.py @ 0:934f961a7189 draft default tip
"planemo upload for repository https://github.com/dfornika/galaxy/tree/master/tools/kma_result_to_mlst commit 62e7cd82cb9b209bf3f797ae288916e88bbe8bc6-dirty"
author | dfornika |
---|---|
date | Thu, 31 Oct 2019 14:12:43 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/kma_result_to_mlst.py Thu Oct 31 14:12:43 2019 -0400 @@ -0,0 +1,97 @@ +#!/usr/bin/env python + +from __future__ import print_function + +import argparse +import csv +import json +import os +import sys + +from pprint import pprint + +def parse_res_file(res_file_path): + LOCUS_ALLELE_DELIMITER = '_' + + res_fieldnames = [ + 'template', + 'score', + 'expected', + 'template_length', + 'template_identity', + 'template_coverage', + 'query_identity', + 'query_coverage', + 'depth', + 'q_value', + 'p_value', + ] + + with open(res_file_path, 'r') as f: + loci = {} + reader = csv.DictReader(f, fieldnames=res_fieldnames, dialect="excel-tab") + next(reader) #skip header + for row in reader: + locus, allele = map(str.strip, row['template'].split(LOCUS_ALLELE_DELIMITER)) + if locus in loci: + loci[locus][allele] = { + 'locus_id': locus, + 'allele_id': allele, + 'score': int(row['score'].strip()), + 'expected': int(row['expected'].strip()), + 'template_length': int(row['template_length'].strip()), + 'template_identity': float(row['template_identity'].strip()), + 'template_coverage': float(row['template_coverage'].strip()), + 'query_identity': float(row['query_identity'].strip()), + 'query_coverage': float(row['query_coverage'].strip()), + 'depth': float(row['depth'].strip()), + 'q_value': float(row['q_value'].strip()), + 'p_value': float(row['p_value'].strip()), + } + else: + loci[locus] = {} + loci[locus][allele] = { + 'locus_id': locus, + 'allele_id': allele, + 'score': int(row['score'].strip()), + 'expected': int(row['expected'].strip()), + 'template_length': int(row['template_length'].strip()), + 'template_identity': float(row['template_identity'].strip()), + 'template_coverage': float(row['template_coverage'].strip()), + 'query_identity': float(row['query_identity'].strip()), + 'query_coverage': float(row['query_coverage'].strip()), + 'depth': float(row['depth'].strip()), + 'q_value': float(row['q_value'].strip()), + 'p_value': float(row['p_value'].strip()), + } + + return loci + +def main(args): + + loci = parse_res_file(args.res) + print("\t".join([ + "locus_id", + "allele_id", + "template_identity", + "template_coverage", + "depth", + ])) + + for locus, alleles in loci.items(): + best_allele = sorted(alleles.values(), + key=lambda x: x['score'], reverse=True)[0]['allele_id'] + + print("\t".join([ + alleles[best_allele]['locus_id'], + alleles[best_allele]['allele_id'], + str(alleles[best_allele]['template_identity']), + str(alleles[best_allele]['template_coverage']), + str(alleles[best_allele]['depth']), + ])) + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("--res", dest="res", help="KMA result overview file") + args = parser.parse_args() + main(args)