Mercurial > repos > kls286 > chap_test_20230328
view CHAP/pipeline.py @ 0:cbbe42422d56 draft
planemo upload for repository https://github.com/CHESSComputing/ChessAnalysisPipeline/tree/galaxy commit 1401a7e1ae007a6bda260d147f9b879e789b73e0-dirty
author | kls286 |
---|---|
date | Tue, 28 Mar 2023 15:07:30 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python #-*- coding: utf-8 -*- #pylint: disable= """ File : pipeline.py Author : Valentin Kuznetsov <vkuznet AT gmail dot com> Description: """ # system modules import logging from time import time class Pipeline(): """ Pipeline represent generic Pipeline class """ def __init__(self, items=None, kwds=None): """ Pipeline class constructor :param items: list of objects :param kwds: list of method args for individual objects """ self.__name__ = self.__class__.__name__ self.items = items self.kwds = kwds self.logger = logging.getLogger(self.__name__) self.logger.propagate = False def execute(self): """ execute API """ t0 = time() self.logger.info(f'Executing "execute"\n') data = None for item, kwargs in zip(self.items, self.kwds): if hasattr(item, 'read'): self.logger.info(f'Calling "read" on {item}') data = item.read(**kwargs) if hasattr(item, 'process'): self.logger.info(f'Calling "process" on {item}') data = item.process(data, **kwargs) if hasattr(item, 'write'): self.logger.info(f'Calling "write" on {item}') data = item.write(data, **kwargs) self.logger.info(f'Exectuted "exectute" in {time()-t0:.3f} seconds') class PipelineObject(): """ PipelineObject represent generic Pipeline class """ def __init__(self, reader, writer, processor, fitter): """ PipelineObject class constructor """ self.reader = reader self.writer = writer self.processor = processor def read(self, filename): """ read object API """ return self.reader.read(filename) def write(self, data, filename): """ write object API """ return self.writer.write(data, filename) def process(self, data): """ process object API """ return self.processor.process(data)