view column_maker.py @ 6:0aeda7a81b46 draft

"planemo upload for repository https://github.com/galaxyproject/tools-devteam/tree/master/tools/column_maker commit a993d43d9d1702a6cf584683cf72527a3f999236"
author devteam
date Wed, 30 Dec 2020 00:49:52 +0000
parents c6fdc1118036
children e7c273e8d4d6
line wrap: on
line source

#!/usr/bin/env python
"""
This tool takes a tab-delimited textfile as input and creates another column in
the file which is the result of a computation performed on every row in the
original file. The tool will skip over invalid lines within the file,
informing the user about the number of lines skipped.
"""

import argparse
import json
import re

parser = argparse.ArgumentParser()
parser.add_argument('input', type=argparse.FileType('r'), help="input file")
parser.add_argument('output', type=argparse.FileType('wt'), help="output file")
parser.add_argument('cond', nargs='?', type=str, help="expression")
parser.add_argument('round', nargs='?', type=str, choices=['yes', 'no'],
                    help="round result")
parser.add_argument('columns', nargs='?', type=int, help="number of columns")
parser.add_argument('column_types', nargs='?', type=str, help="comma separated list of column types")
parser.add_argument('avoid_scientific_notation', nargs='?', type=str, choices=['yes', 'no'],
                    help="avoid scientific notation")
parser.add_argument('--load_json', default=None, type=argparse.FileType('r'),
                    help="overwrite parsed arguments from json file")
args = parser.parse_args()

argparse_dict = vars(args)
if args.load_json:
    json_dict = json.load(args.load_json)
    argparse_dict.update(json_dict)

fh = argparse_dict['input']
out = argparse_dict['output']
expr = argparse_dict['cond']
round_result = argparse_dict['round']
try:
    in_columns = int(argparse_dict['columns'])
except Exception:
    exit("Missing or invalid 'columns' metadata value, click the pencil icon in the history item and select the Auto-detect option to correct it.  This tool can only be used with tab-delimited data.")
if in_columns < 2:
    # To be considered tabular, data must fulfill requirements of the sniff.is_column_based() method.
    exit("Missing or invalid 'columns' metadata value, click the pencil icon in the history item and select the Auto-detect option to correct it.  This tool can only be used with tab-delimited data.")
try:
    in_column_types = argparse_dict['column_types'].split(',')
except Exception:
    exit("Missing or invalid 'column_types' metadata value, click the pencil icon in the history item and select the Auto-detect option to correct it.  This tool can only be used with tab-delimited data.")
if len(in_column_types) != in_columns:
    exit("The 'columns' metadata setting does not conform to the 'column_types' metadata setting, click the pencil icon in the history item and select the Auto-detect option to correct it.  This tool can only be used with tab-delimited data.")
avoid_scientific_notation = argparse_dict['avoid_scientific_notation']

# Unescape if input has been escaped
mapped_str = {
    '__lt__': '<',
    '__le__': '<=',
    '__eq__': '==',
    '__ne__': '!=',
    '__gt__': '>',
    '__ge__': '>=',
    '__sq__': '\'',
    '__dq__': '"',
}
for key, value in mapped_str.items():
    expr = expr.replace(key, value)

operators = 'is|not|or|and'
builtin_and_math_functions = 'abs|all|any|bin|chr|cmp|complex|divmod|float|bool|hex|int|len|long|max|min|oct|ord|pow|range|reversed|round|sorted|str|sum|type|unichr|unicode|log|log10|exp|sqrt|ceil|floor'
string_and_list_methods = [name for name in dir('') + dir([]) if not name.startswith('_')]
whitelist = r"^([c0-9\+\-\*\/\(\)\.\'\"><=,:! ]|%s|%s|%s)*$" % (operators, builtin_and_math_functions, '|'.join(string_and_list_methods))
if not re.compile(whitelist).match(expr):
    exit("Invalid expression")
if avoid_scientific_notation == "yes":
    expr = "format_float_positional(%s)" % expr

# Prepare the column variable names and wrappers for column data types
cols, type_casts = [], []
for col in range(1, in_columns + 1):
    col_name = "c%d" % col
    cols.append(col_name)
    col_type = in_column_types[col - 1].strip()
    if round_result == 'no' and col_type == 'int':
        col_type = 'float'
    type_cast = "%s(%s)" % (col_type, col_name)
    type_casts.append(type_cast)

col_str = ', '.join(cols)    # 'c1, c2, c3, c4'
type_cast_str = ', '.join(type_casts)  # 'str(c1), int(c2), int(c3), str(c4)'
assign = "%s = line.split('\\t')" % col_str
wrap = "%s = %s" % (col_str, type_cast_str)
skipped_lines = 0
first_invalid_line = 0
invalid_line = None
lines_kept = 0
total_lines = 0

# Read input file, skipping invalid lines, and perform computation that will result in a new column
code = '''
# import here since flake8 complains otherwise
from math import (
    ceil,
    exp,
    floor,
    log,
    log10,
    sqrt
)
from numpy import format_float_positional

for i, line in enumerate(fh):
    total_lines += 1
    line = line.rstrip('\\r\\n')
    if not line or line.startswith('#'):
        skipped_lines += 1
        if not invalid_line:
            first_invalid_line = i + 1
            invalid_line = line
        continue
    try:
        %s
        %s
        new_val = %s
        if round_result == "yes":
            new_val = int(round(new_val))
        new_line = line + '\\t' + str(new_val) + "\\n"
        out.write(new_line)
        lines_kept += 1
    except Exception:
        skipped_lines += 1
        if not invalid_line:
            first_invalid_line = i + 1
            invalid_line = line
fh.close()
''' % (assign, wrap, expr)

valid_expr = True
try:
    exec(code)
except Exception as e:
    out.close()
    if str(e).startswith('invalid syntax'):
        valid_expr = False
        exit('Expression "%s" likely invalid. See tool tips, syntax and examples.' % expr)
    else:
        exit(str(e))

if valid_expr:
    out.close()
    valid_lines = total_lines - skipped_lines
    print('Creating column %d with expression %s' % (in_columns + 1, expr))
    if valid_lines > 0:
        print('kept %4.2f%% of %d lines.' % (100.0 * lines_kept / valid_lines,
                                             total_lines))
    else:
        print('Possible invalid expression "%s" or non-existent column referenced. See tool tips, syntax and examples.' % expr)
    if skipped_lines > 0:
        print('Skipped %d invalid lines starting at line #%d: "%s"' %
              (skipped_lines, first_invalid_line, invalid_line))