Mercurial > repos > kls286 > chap_test_20230328
diff CHAP/pipeline.py @ 0:cbbe42422d56 draft
planemo upload for repository https://github.com/CHESSComputing/ChessAnalysisPipeline/tree/galaxy commit 1401a7e1ae007a6bda260d147f9b879e789b73e0-dirty
author | kls286 |
---|---|
date | Tue, 28 Mar 2023 15:07:30 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CHAP/pipeline.py Tue Mar 28 15:07:30 2023 +0000 @@ -0,0 +1,84 @@ +#!/usr/bin/env python +#-*- coding: utf-8 -*- +#pylint: disable= +""" +File : pipeline.py +Author : Valentin Kuznetsov <vkuznet AT gmail dot com> +Description: +""" + +# system modules +import logging +from time import time + +class Pipeline(): + """ + Pipeline represent generic Pipeline class + """ + def __init__(self, items=None, kwds=None): + """ + Pipeline class constructor + + :param items: list of objects + :param kwds: list of method args for individual objects + """ + self.__name__ = self.__class__.__name__ + + self.items = items + self.kwds = kwds + + self.logger = logging.getLogger(self.__name__) + self.logger.propagate = False + + def execute(self): + """ + execute API + """ + + t0 = time() + self.logger.info(f'Executing "execute"\n') + + data = None + for item, kwargs in zip(self.items, self.kwds): + if hasattr(item, 'read'): + self.logger.info(f'Calling "read" on {item}') + data = item.read(**kwargs) + if hasattr(item, 'process'): + self.logger.info(f'Calling "process" on {item}') + data = item.process(data, **kwargs) + if hasattr(item, 'write'): + self.logger.info(f'Calling "write" on {item}') + data = item.write(data, **kwargs) + + self.logger.info(f'Exectuted "exectute" in {time()-t0:.3f} seconds') + +class PipelineObject(): + """ + PipelineObject represent generic Pipeline class + """ + def __init__(self, reader, writer, processor, fitter): + """ + PipelineObject class constructor + """ + self.reader = reader + self.writer = writer + self.processor = processor + + def read(self, filename): + """ + read object API + """ + return self.reader.read(filename) + + def write(self, data, filename): + """ + write object API + """ + return self.writer.write(data, filename) + + def process(self, data): + """ + process object API + """ + return self.processor.process(data) +