Mercurial > repos > pimarin > data_manager_bakta
diff data_manager/bakta_build_database.py @ 1:faae5d8ce0cb draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_bakta commit 4c11a77fb4c0bb74c0c4ae381c85971e01ed973e-dirty
author | pimarin |
---|---|
date | Thu, 27 Oct 2022 13:54:13 +0000 |
parents | 9d08486abf8e |
children | 9c65e5da43e2 |
line wrap: on
line diff
--- a/data_manager/bakta_build_database.py Wed Aug 17 14:42:22 2022 +0000 +++ b/data_manager/bakta_build_database.py Thu Oct 27 13:54:13 2022 +0000 @@ -1,358 +1,257 @@ +import argparse import hashlib import json -import logging import os -import shutil import subprocess as sp -import stat import sys import tarfile - +from datetime import datetime from pathlib import Path -from alive_progress import alive_bar -import requests - -import bakta -import bakta.config as cfg import bakta.constants as bc import bakta.utils as bu - - -log = logging.getLogger('DB') - - -FILE_NAMES = [ - 'antifam.h3f', - 'antifam.h3i', - 'antifam.h3m', - 'antifam.h3p', - 'bakta.db', - 'expert-protein-sequences.dmnd', - 'ncRNA-genes.i1f', - 'ncRNA-genes.i1i', - 'ncRNA-genes.i1m', - 'ncRNA-genes.i1p', - 'ncRNA-regions.i1f', - 'ncRNA-regions.i1i', - 'ncRNA-regions.i1m', - 'ncRNA-regions.i1p', - 'oric.fna', - 'orit.fna', - 'pfam.h3f', - 'pfam.h3i', - 'pfam.h3m', - 'pfam.h3p', - 'psc.dmnd', - 'rfam-go.tsv', - 'rRNA.i1f', - 'rRNA.i1i', - 'rRNA.i1m', - 'rRNA.i1p', - 'sorf.dmnd' - ] +import requests +from alive_progress import alive_bar -def check(db_path: Path) -> dict: - """Check if database directory exists, is accessible and contains necessary files.""" +class GetBaktaDatabaseInfo: + """ + Extract bakta database information to make a json file for data_manager + """ - if(db_path is None): - log.error('directory neither provided nor detected!') - sys.exit('ERROR: database directory not provided nor detected! Please provide a valid path to the database directory.') + def __init__(self, + data_table_name="bakta_database", + db_name=Path.cwd().joinpath("db"), + db_version="latest"): + self.bakta_table_list = None + self.db_url = None + self.data_table_entry = None + self.data_table_name = data_table_name + self.db_name = db_name + self.db_version = db_version - if(not os.access(str(db_path), os.R_OK & os.X_OK)): - log.error('directory (%s) not readable/accessible!', db_path) - sys.exit(f'ERROR: database directory ({db_path}) not readable/accessible!') - - version_path = db_path.joinpath('version.json') - if(not os.access(str(version_path), os.R_OK) or not version_path.is_file()): - log.error('version file not readable!') - sys.exit('ERROR: database version file (version.json) not readable!') + def get_data_table_format(self): + """ + Build a data table format for galaxy + using the bakta database information + @str database_value: string of the database name + @str database_date: string of the database date of build (YY-M-D) + @str database_bakta_version: string of the version of bakta tool + to apply a filter on version compatibility + @str database_path: string of the database path + for the database location + return: a data table formatted for json output + """ + self.data_table_entry = { + "data_tables": { + self.data_table_name: {} + } + } + return self.data_table_entry - try: - with version_path.open() as fh: - db_info = json.load(fh) - except: - log.exception('could not parse database version file!') - sys.exit('ERROR: could not parse database version file!') - - for key in ['date', 'major', 'minor']: - if(key not in db_info): - log.error('wrong db version info file content! missed key=%s', key) - sys.exit(f"ERROR: wrong db version info file format! Missed key '{key}' in JSON structure.") + def fetch_db_versions(self, db_version="latest"): + """ + Use method from bakta tool to extract database info + db_version: a string of the version number + in the galaxy wrapper list or just latest + return: info for the select or the latest bakta db version + """ + try: + with requests.get(bc.DB_VERSIONS_URL) as resp: + versions = json.loads(resp.content) + except IOError as e: + print(e, file=sys.stderr) + raise e + else: + if db_version == "latest": + db_date_list = [] + for db_dic in versions: + db_date_list.append(datetime.strptime(db_dic["date"], + '%Y-%m-%d').date()) + max(db_date_list) + filtered_version = next(item for item in versions + if max(db_date_list)) + elif db_version == "test": + filtered_version = {"date": "date_test", + "major": "0", + "minor": "0", + "doi": "10.5281/zenodo.7197299", + "record": "7197299", + "md5": "8b0250c17078742fc12207d4efb0fc1a", + "software-min": {"major": "0", + "minor": "0"} + } + else: + major_version = str(db_version.split(sep=".")[0]) + minor_version = str(db_version.split(sep=".")[1]) + try: + filtered_version = next( + item for item in versions + if str(item["major"]) == major_version + and str(item["minor"]) == minor_version) + except StopIteration: + print("No available version detected in the list") + filtered_version = None + if filtered_version is not None: + self.db_url = f"https://zenodo.org/record/" \ + f"{filtered_version['record']}/files/db.tar.gz" + self.db_version = db_version + return filtered_version - log.info('detected: major=%i, minor=%i, date=%s', db_info['major'], db_info['minor'], db_info['date']) - if(db_info['major'] < bakta.__db_schema_version__): - log.error('wrong database version detected! required=%i, detected=%i', bakta.__db_schema_version__, db_info['major']) - sys.exit(f"ERROR: wrong database version detected!\nBakta version {bakta.__version__} requires database version {bakta.__db_schema_version__}.x, but {db_info['major']}.{db_info['minor']} was detected. Please, update the database from https://doi.org/10.5281/zenodo.4247253") - elif(db_info['major'] > bakta.__db_schema_version__): - log.error('wrong database version detected! required=%i, detected=%i', bakta.__db_schema_version__, db_info['major']) - sys.exit(f"ERROR: wrong database version detected!\nBakta version {bakta.__version__} requires database version {bakta.__db_schema_version__}.x, but {db_info['major']}.{db_info['minor']} was detected. Please, update Bakta or download a compatible database version from https://doi.org/10.5281/zenodo.4247253") - - for file_name in FILE_NAMES: - path = db_path.joinpath(file_name) - if(not os.access(str(path), os.R_OK) or not path.is_file()): - log.error('file not readable! file=%s', file_name) - sys.exit(f'ERROR: database file ({file_name}) not readable!') - - return db_info + def get_data_manager(self, bakta_database_info, output_path): + self.bakta_table_list = self.get_data_table_format() + bakta_value = f"bakta_{bakta_database_info['major']}." \ + f"{bakta_database_info['minor']}" + data_info = dict(value=bakta_value, + dbkey=bakta_database_info['date'], + database_record=bakta_database_info['record'], + bakta_version=str( + f"{bakta_database_info['software-min']['major']}." + f"{bakta_database_info['software-min']['minor']}" + ), path=output_path) + self.bakta_table_list["data_tables"][self.data_table_name] = data_info + return self.bakta_table_list -def fetch_db_versions(): - try: - with requests.get(bc.DB_VERSIONS_URL) as resp: - versions = json.loads(resp.content) - except IOError as e: - print(e, file=sys.stderr) - raise e - else: - return versions +class InstallBaktaDatabase(GetBaktaDatabaseInfo): + """ + Download the bakta database, + check md5 sum, + untar the download db and update for the amrfinderplus database + """ + + def __init__(self, + db_dir=Path.cwd(), + db_name="bakta", + tarball_name="db.tar.gz"): + super().__init__() + self.md5 = None + self.db_dir = db_dir + self.db_name = db_name + self.tarball_name = tarball_name + self.tarball_path = None + bu.test_dependency(bu.DEPENDENCY_AMRFINDERPLUS) + + def download(self): + self.db_name = f'{self.db_name}_{self.db_version}' + bakta_path = Path(self.db_dir).joinpath(self.tarball_name) + try: + with bakta_path.open('wb') as fh_out, \ + requests.get(self.db_url, stream=True) as resp: + total_length = resp.headers.get('content-length') + if total_length is None: # no content length header + with alive_bar() as bar: + for data in resp.iter_content(chunk_size=1024 * 1024): + fh_out.write(data) + bar() + else: + total_length = int(int(total_length) / 1024) + with alive_bar(total=total_length) as bar: + for data in resp.iter_content(chunk_size=1024 * 1024): + fh_out.write(data) + bar(incr=len(data) / 1024) + print(f'Download bakta database {self.db_version}') + self.tarball_path = bakta_path + except IOError: + print(f'ERROR: Could not download file from Zenodo!' + f' url={self.db_url}, path={self.tarball_name}') + + def untar(self): + db_path = Path(self.db_dir).joinpath(self.db_name) + try: + with self.tarball_path.open('rb') as fh_in, \ + tarfile.open(fileobj=fh_in, mode='r:gz') as tar_file: + tar_file.extractall(path=str(db_path)) + print(f'Untar the database in {db_path}') + return db_path.as_posix() + except OSError: + sys.exit(f'ERROR: Could not extract {self.tarball_name} ' + f'to {self.db_name}') + + def calc_md5_sum(self, buffer_size=1048576): + tarball_path = Path(self.db_dir).joinpath(self.tarball_name) + self.md5 = self.fetch_db_versions(db_version=self.db_version)["md5"] + md5 = hashlib.md5() + with tarball_path.open('rb') as fh: + data = fh.read(buffer_size) + while data: + md5.update(data) + data = fh.read(buffer_size) + if md5.hexdigest() == self.md5: + print('\t...md5 control database OK') + else: + print(f"Error: corrupt database file! " + f"calculated md5 = {md5.hexdigest()}" + f" different from {self.md5} ") + + def update_amrfinderplus_db(self): + amrfinderplus_db_path = f"{self.db_dir}/{self.db_name}/db/amrfinderplus-db" + if self.db_version == "test": + cmd = [ + 'amrfinder_update', + '--database', str(amrfinderplus_db_path), + '--force_update', + '--help' + ] + else: + cmd = [ + 'amrfinder_update', + '--database', str(amrfinderplus_db_path), + '--force_update' + ] + proc = sp.run( + cmd, + universal_newlines=True + ) + if proc.returncode != 0: + print(f"ERROR: AMRFinderPlus failed! " + f"command: 'amrfinder_update --force_update" + f" --database {amrfinderplus_db_path}'") + else: + print("AMRFinderPlus database download") -def download(db_url: str, tarball_path: Path): - try: - with tarball_path.open('wb') as fh_out, requests.get(db_url, stream=True) as resp: - total_length = resp.headers.get('content-length') - if(total_length is None): # no content length header - with alive_bar() as bar: - for data in resp.iter_content(chunk_size=1024*1024): - fh_out.write(data) - bar() - else: - total_length = int(int(total_length)/1024) # length in Kb - with alive_bar(total=total_length) as bar: - for data in resp.iter_content(chunk_size=1024*1024): - fh_out.write(data) - bar(incr=len(data)/1024) - except IOError: - sys.exit(f'ERROR: Could not download file from Zenodo! url={db_url}, path={tarball_path}') - - -def calc_md5_sum(tarball_path: Path, buffer_size: int=1024*1024) -> str: - md5 = hashlib.md5() - with tarball_path.open('rb') as fh: - data = fh.read(buffer_size) - while data: - md5.update(data) - data = fh.read(buffer_size) - return md5.hexdigest() - - -def untar(tarball_path: Path, output_path: Path): - try: - with tarball_path.open('rb') as fh_in, tarfile.open(fileobj=fh_in, mode='r:gz') as tar_file: - tar_file.extractall(path=str(output_path)) - except OSError: - sys.exit(f'ERROR: Could not extract {tarball_path} to {output_path}') +def parse_arguments(): + # parse options and arguments + arg_parser = argparse.ArgumentParser() + arg_parser.add_argument("data_manager_json") + arg_parser.add_argument("-d", "--database_version", + help='Select the database version ' + '(major and minor eg. 4.0),' + 'default is the latest version', + default="latest", + required=True) + return arg_parser.parse_args() def main(): - # parse options and arguments - parser = bu.init_parser(sub_command='_db') - group_runtime = parser.add_argument_group('Runtime & auxiliary options') - group_runtime.add_argument('--help', '-h', action='help', help='Show this help message and exit') - group_runtime.add_argument('--version', '-V', action='version', version=f'%(prog)s {bakta.__version__}') - - subparsers = parser.add_subparsers(dest='subcommand', help='sub-command help') - parser_list = subparsers.add_parser('list', help='List available database versions') # add list sub-command options - parser_list.add_argument('--all', action='store_true', help='Show all versions including incompatible') - - parser_download = subparsers.add_parser('download', help='Download a database') # add download sub-command options - parser_download.add_argument('--output', '-o', action='store', default=Path.cwd(), help='output directory (default = current working directory)') - parser_download.add_argument('--minor', '-n', action='store', type=int, default=0, help='Database minor version (default = most recent db minor version)') - - parser_update = subparsers.add_parser('update', help='Update an existing database to the most recent compatible version') # add download sub-command options - parser_update.add_argument('--db', '-d', action='store', default=None, help='Current database path (default = <bakta_path>/db). Can also be provided as BAKTA_DB environment variable.') - parser_update.add_argument('--tmp-dir', '-t', action='store', dest='tmp_dir', default=Path.cwd(), help='Temporary directory to download & extract (default = current working directory)') - - args = parser.parse_args() - if(args.subcommand == 'list'): - print(f'Required database schema version: {bakta.__db_schema_version__}\n') - versions = fetch_db_versions() - if(not args.all): - versions = [v for v in versions if v['major'] == bakta.__db_schema_version__] - - print('Available DB versions:') - for v in sorted(versions, key=lambda v: (v['major'], v['minor'])): - print(f"{v['major']}.{v['minor']}\t{v['date']}\t{v['doi']}") - elif(args.subcommand == 'download'): - bu.test_dependency(bu.DEPENDENCY_AMRFINDERPLUS) - output_path = cfg.check_output_path(args) - - print('fetch DB versions...') - versions = fetch_db_versions() - compatible_versions = [v for v in versions if v['major'] == bakta.__db_schema_version__] - if(len(compatible_versions) == 0): - sys.exit(f'Error: no compatible version available for current major db version {bakta.__db_schema_version__}') - else: - print(f'\t... compatible DB versions: {len(compatible_versions)}') + all_args = parse_arguments() - required_version = None - if(args.minor > 0): - for v in versions: - if(v['minor'] == args.minor): - required_version = v - break - if(required_version is None): - sys.exit(f"requested DB minor version {args.minor} is not available. Please use 'bakta_db list' to get a list of available DB versions") - else: - compatible_sorted = sorted(compatible_versions, key=lambda v: v['minor'], reverse=True) - required_version = compatible_sorted[0] - - tarball_path = output_path.joinpath('db.tar.gz') - db_url = f"https://zenodo.org/record/{required_version['record']}/files/db.tar.gz" - print(f"download database: v{required_version['major']}.{required_version['minor']}, {required_version['date']}, DOI: {required_version['doi']}, URL: {db_url}...") - download(db_url, tarball_path) - print('\t... done') - - print('check MD5 sum...') - md5_sum = calc_md5_sum(tarball_path) - if(md5_sum == required_version['md5']): - print(f'\t...database file OK: {md5_sum}') - else: - sys.exit(f"Error: corrupt database file! MD5 should be '{required_version['md5']}' but is '{md5_sum}'") + with open(all_args.data_manager_json) as fh: + data_manager_input = json.load(fh) - print(f'extract DB tarball: file={tarball_path}, output={output_path}') - untar(tarball_path, output_path) - tarball_path.unlink() - - db_path = output_path.joinpath('db') - db_info = check(db_path) - if(db_info['major'] != required_version['major']): - sys.exit(f"ERROR: wrong major db detected! required={required_version['major']}, detected={db_info['major']}") - elif(db_info['minor'] != required_version['minor']): - sys.exit(f"ERROR: wrong minor db detected! required={required_version['minor']}, detected={db_info['minor']}") - print('successfully downloaded Bakta database!') - print(f"\tversion: {required_version['major']}.{required_version['minor']}") - print(f"\tDOI: {required_version['doi']}") - print(f'\tpath: {db_path}') - - try: - db_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) # set write permissions on old (existing) directory with updated content - for db_file_path in db_path.iterdir(): - db_file_path.chmod(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) - except: - sys.exit(f'ERROR: cannot set read|execute permissions on new database! path={db_path}, owner={db_path.owner()}, group={db_path.group()}, permissions={oct(db_path.stat().st_mode )[-3:]}') - - print('update AMRFinderPlus database...') - update_amrfinderplus_db(db_path) - print('\t... done') + target_dir = data_manager_input['output_data'][0]['extra_files_path'] + os.makedirs(target_dir) - print(f"\nRun Bakta using '--db {db_path}' or set a BAKTA_DB environment variable: 'export BAKTA_DB={db_path}'") - elif(args.subcommand == 'update'): - bu.test_dependency(bu.DEPENDENCY_AMRFINDERPLUS) - tmp_path = cfg.check_tmp_path(args) - db_old_path = cfg.check_db_path(args) - db_old_info = check(db_old_path) - print(f"existing database: v{db_old_info['major']}.{db_old_info['minor']}") - print('fetch DB versions...') - versions = fetch_db_versions() - compatible_versions = [v for v in versions if v['major'] == bakta.__db_schema_version__] - if(len(compatible_versions) == 0): - sys.exit(f'Error: no compatible version available for current major db version {bakta.__db_schema_version__}') - else: - print(f'\t... compatible DB versions: {len(compatible_versions)}') - - compatible_sorted = sorted(compatible_versions, key=lambda v: v['minor'], reverse=True) - if(compatible_sorted[0]['minor'] <= db_old_info['minor']): - print(f"Database version {db_old_info['major']}.{db_old_info['minor']} is up-to-date") - sys.exit() - required_version = compatible_sorted[0] - - tarball_path = tmp_path.joinpath('db.tar.gz') - db_url = f"https://zenodo.org/record/{required_version['record']}/files/db.tar.gz" - print(f"download database: v{required_version['major']}.{required_version['minor']}, {required_version['date']}, DOI: {required_version['doi']}, URL: {db_url}...") - download(db_url, tarball_path) - print('\t... done') - - print('check MD5 sum...') - md5_sum = calc_md5_sum(tarball_path) - if(md5_sum == required_version['md5']): - print(f'\t...database file OK: {md5_sum}') - else: - sys.exit(f"Error: corrupt database file! MD5 should be '{required_version['md5']}' but is '{md5_sum}'") - - print(f'extract DB tarball: file={tarball_path}, output-directory={tmp_path}') - untar(tarball_path, tmp_path) - tarball_path.unlink() - - db_new_path = tmp_path.joinpath('db') - db_new_info = check(db_new_path) - if(db_new_info['major'] != required_version['major']): - sys.exit(f"ERROR: wrong major db detected! required={required_version['major']}, detected={db_new_info['major']}") - elif(db_new_info['minor'] != required_version['minor']): - sys.exit(f"ERROR: wrong minor db detected! required={required_version['minor']}, detected={db_new_info['minor']}") - print('successfully downloaded Bakta DB:') - print(f"\tversion: {required_version['major']}.{required_version['minor']}") - print(f"\tDOI: {required_version['doi']}") - print(f'\tpath: {db_new_path}') - print('remove old database...') - try: - db_old_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) # set write permissions on old directory - for db_old_file_path in db_old_path.iterdir(): - if(db_old_file_path.is_dir()): - db_old_file_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - else: - db_old_file_path.chmod(stat.S_IRUSR | stat.S_IWUSR) - except: - sys.exit(f'ERROR: cannot set read|write|execute permissions on old database! path={db_old_path}, owner={db_old_path.owner()}, group={db_old_path.group()}, permissions={oct(db_old_path.stat().st_mode )[-3:]}') - try: - shutil.rmtree(db_old_path) - except: - sys.exit(f'ERROR: cannot remove old database! path={db_old_path}, owner={db_old_path.owner()}, group={db_old_path.group()}, permissions={oct(db_old_path.stat().st_mode )[-3:]}') - db_old_path.mkdir() - - try: - db_new_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) # set write permissions on db_new_path directory - for db_new_file_path in db_new_path.iterdir(): - db_new_file_path.chmod(stat.S_IRUSR | stat.S_IWUSR) - except: - sys.exit(f'ERROR: cannot set read|write|execute permissions on new database! path={db_new_path}, owner={db_new_path.owner()}, group={db_new_path.group()}, permissions={oct(db_new_path.stat().st_mode )[-3:]}') - try: - for db_new_file_path in db_new_path.iterdir(): # move new db files into old (existing) db directory - file_name = db_new_file_path.name - shutil.move(db_new_file_path, db_old_path.joinpath(file_name)) - except: - sys.exit(f'ERROR: cannot move new database to existing path! new-path={db_new_path}, existing-path={db_old_path.parent}') - shutil.rmtree(tmp_path) - - try: - db_old_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) # set write permissions on old (existing) directory with updated content - for db_old_file_path in db_old_path.iterdir(): - db_old_file_path.chmod(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) - except: - sys.exit(f'ERROR: cannot set read(|execute) permissions on new database! path={db_old_path}, owner={db_old_path.owner()}, group={db_old_path.group()}, permissions={oct(db_old_path.stat().st_mode )[-3:]}') - - print('\t... done') - - print('update AMRFinderPlus database...') - update_amrfinderplus_db(db_old_path) - print('\t... done') - - print(f"\nRun Bakta using '--db {db_old_path}' or set a BAKTA_DB environment variable: 'export BAKTA_DB={db_old_path}'") - else: - parser.print_help() - sys.exit('Error: no subcommand provided!') - - -def update_amrfinderplus_db(db_path: Path): - amrfinderplus_db_path = db_path.joinpath('amrfinderplus-db') - cmd = [ - 'amrfinder_update', - '--database', str(amrfinderplus_db_path), - '--force_update' - ] - log.debug('cmd=%s', cmd) - proc = sp.run( - cmd, - stdout=sp.PIPE, - stderr=sp.PIPE, - universal_newlines=True - ) - if(proc.returncode != 0): - log.debug('stdout=\'%s\', stderr=\'%s\'', proc.stdout, proc.stderr) - log.warning('AMRFinderPlus failed! amrfinder-error-code=%d', proc.returncode) - sys.exit(f"ERROR: AMRFinderPlus failed! command: 'amrfinder_update --force_update --database {amrfinderplus_db_path}', error code: {proc.returncode}") + # init the class to download bakta db + bakta_upload = InstallBaktaDatabase() + # extract the version + bakta_db = bakta_upload.fetch_db_versions( + db_version=all_args.database_version) + # update the path for galaxy + bakta_upload.db_dir = target_dir + # download the database + bakta_upload.download() + # check md5 sum + bakta_upload.calc_md5_sum() + # untar db + bakta_extracted_path = bakta_upload.untar() + # update for amrfinderplus + bakta_upload.update_amrfinderplus_db() + # make the data_manager metadata + bakta_data_manager = bakta_upload.get_data_manager(bakta_database_info=bakta_db, output_path=bakta_extracted_path) + with open(all_args.data_manager_json, 'w') as fh: + json.dump(bakta_data_manager, fh, indent=2, sort_keys=True) if __name__ == '__main__':