Mercurial > repos > davidvanzessen > combined_immune_repertoire_pipeline
view uploadzip.py @ 12:316b5467f724 draft
Uploaded
author | davidvanzessen |
---|---|
date | Wed, 08 Jan 2014 06:29:57 -0500 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python #Processes uploads from the user. # WARNING: Changes in this tool (particularly as related to parsing) may need # to be reflected in galaxy.web.controllers.tool_runner and galaxy.tools import urllib, sys, os, gzip, tempfile, shutil, re, gzip, zipfile, codecs, binascii from galaxy import eggs # need to import model before sniff to resolve a circular import dependency import galaxy.model from galaxy.datatypes.checkers import * from galaxy.datatypes import sniff from galaxy.datatypes.binary import * from galaxy.datatypes.images import Pdf from galaxy.datatypes.registry import Registry from galaxy import util from galaxy.datatypes.util.image_util import * from galaxy.util.json import * try: import Image as PIL except ImportError: try: from PIL import Image as PIL except: PIL = None try: import bz2 except: bz2 = None assert sys.version_info[:2] >= ( 2, 4 ) def stop_err( msg, ret=1 ): sys.stderr.write( msg ) sys.exit( ret ) def file_err( msg, dataset, json_file ): json_file.write( to_json_string( dict( type = 'dataset', ext = 'data', dataset_id = dataset.dataset_id, stderr = msg ) ) + "\n" ) # never remove a server-side upload if dataset.type in ( 'server_dir', 'path_paste' ): return try: os.remove( dataset.path ) except: pass def safe_dict(d): """ Recursively clone json structure with UTF-8 dictionary keys http://mellowmachines.com/blog/2009/06/exploding-dictionary-with-unicode-keys-as-python-arguments/ """ if isinstance(d, dict): return dict([(k.encode('utf-8'), safe_dict(v)) for k,v in d.iteritems()]) elif isinstance(d, list): return [safe_dict(x) for x in d] else: return d def parse_outputs( args ): rval = {} for arg in args: id, files_path, path = arg.split( ':', 2 ) rval[int( id )] = ( path, files_path ) return rval def add_file( dataset, registry, json_file, output_path ): data_type = None line_count = None converted_path = None stdout = None link_data_only = dataset.get( 'link_data_only', 'copy_files' ) in_place = dataset.get( 'in_place', True ) try: ext = dataset.file_type except AttributeError: file_err( 'Unable to process uploaded file, missing file_type parameter.', dataset, json_file ) return if dataset.type == 'url': try: page = urllib.urlopen( dataset.path ) #page will be .close()ed by sniff methods temp_name, dataset.is_multi_byte = sniff.stream_to_file( page, prefix='url_paste', source_encoding=util.get_charset_from_http_headers( page.headers ) ) except Exception, e: file_err( 'Unable to fetch %s\n%s' % ( dataset.path, str( e ) ), dataset, json_file ) return dataset.path = temp_name # See if we have an empty file if not os.path.exists( dataset.path ): file_err( 'Uploaded temporary file (%s) does not exist.' % dataset.path, dataset, json_file ) return if not os.path.getsize( dataset.path ) > 0: file_err( 'The uploaded file is empty', dataset, json_file ) return if not dataset.type == 'url': # Already set is_multi_byte above if type == 'url' try: dataset.is_multi_byte = util.is_multi_byte( codecs.open( dataset.path, 'r', 'utf-8' ).read( 100 ) ) except UnicodeDecodeError, e: dataset.is_multi_byte = False # Is dataset an image? image = check_image( dataset.path ) if image: if not PIL: image = None # get_image_ext() returns None if nor a supported Image type ext = get_image_ext( dataset.path, image ) data_type = ext # Is dataset content multi-byte? elif dataset.is_multi_byte: data_type = 'multi-byte char' ext = sniff.guess_ext( dataset.path, is_multi_byte=True ) # Is dataset content supported sniffable binary? else: type_info = Binary.is_sniffable_binary( dataset.path ) if type_info: data_type = type_info[0] ext = type_info[1] if not data_type: shutil.move( dataset.path, output_path ) #data_type = "data" # Save job info for the framework if ext == 'auto' and dataset.ext: ext = dataset.ext if ext == 'auto': ext = 'data' datatype = registry.get_datatype_by_extension( ext ) if dataset.type in ( 'server_dir', 'path_paste' ) and link_data_only == 'link_to_files': # Never alter a file that will not be copied to Galaxy's local file store. if datatype.dataset_content_needs_grooming( dataset.path ): err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \ '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.' file_err( err_msg, dataset, json_file ) return if link_data_only == 'copy_files' and dataset.type in ( 'server_dir', 'path_paste' ) and data_type not in [ 'gzip', 'bz2', 'zip' ]: # Move the dataset to its "real" path if converted_path is not None: shutil.copy( converted_path, output_path ) try: os.remove( converted_path ) except: pass else: # This should not happen, but it's here just in case shutil.copy( dataset.path, output_path ) elif link_data_only == 'copy_files': if os.path.exists(dataset.path): shutil.move( dataset.path, output_path ) # Write the job info stdout = stdout or 'uploaded %s file' % data_type info = dict( type = 'dataset', dataset_id = dataset.dataset_id, ext = ext, stdout = stdout, name = dataset.name, line_count = line_count ) if dataset.get('uuid', None) is not None: info['uuid'] = dataset.get('uuid') json_file.write( to_json_string( info ) + "\n" ) if link_data_only == 'copy_files' and datatype.dataset_content_needs_grooming( output_path ): # Groom the dataset content if necessary datatype.groom_dataset_content( output_path ) def add_composite_file( dataset, registry, json_file, output_path, files_path ): if dataset.composite_files: os.mkdir( files_path ) for name, value in dataset.composite_files.iteritems(): value = util.bunch.Bunch( **value ) if dataset.composite_file_paths[ value.name ] is None and not value.optional: file_err( 'A required composite data file was not provided (%s)' % name, dataset, json_file ) break elif dataset.composite_file_paths[value.name] is not None: dp = dataset.composite_file_paths[value.name][ 'path' ] isurl = dp.find('://') <> -1 # todo fixme if isurl: try: temp_name, dataset.is_multi_byte = sniff.stream_to_file( urllib.urlopen( dp ), prefix='url_paste' ) except Exception, e: file_err( 'Unable to fetch %s\n%s' % ( dp, str( e ) ), dataset, json_file ) return dataset.path = temp_name dp = temp_name if not value.is_binary: if dataset.composite_file_paths[ value.name ].get( 'space_to_tab', value.space_to_tab ): sniff.convert_newlines_sep2tabs( dp ) else: sniff.convert_newlines( dp ) shutil.move( dp, os.path.join( files_path, name ) ) # Move the dataset to its "real" path shutil.move( dataset.primary_file, output_path ) # Write the job info info = dict( type = 'dataset', dataset_id = dataset.dataset_id, stdout = 'uploaded %s file' % dataset.file_type ) json_file.write( to_json_string( info ) + "\n" ) def __main__(): if len( sys.argv ) < 4: print >>sys.stderr, 'usage: upload.py <root> <datatypes_conf> <json paramfile> <output spec> ...' sys.exit( 1 ) output_paths = parse_outputs( sys.argv[4:] ) json_file = open( 'galaxy.json', 'w' ) registry = Registry() registry.load_datatypes( root_dir=sys.argv[1], config=sys.argv[2] ) for line in open( sys.argv[3], 'r' ): dataset = from_json_string( line ) dataset = util.bunch.Bunch( **safe_dict( dataset ) ) try: output_path = output_paths[int( dataset.dataset_id )][0] except: print >>sys.stderr, 'Output path for dataset %s not found on command line' % dataset.dataset_id sys.exit( 1 ) if dataset.type == 'composite': files_path = output_paths[int( dataset.dataset_id )][1] add_composite_file( dataset, registry, json_file, output_path, files_path ) else: add_file( dataset, registry, json_file, output_path ) # clean up paramfile # TODO: this will not work when running as the actual user unless the # parent directory is writable by the user. try: os.remove( sys.argv[3] ) except: pass if __name__ == '__main__': __main__()