view CHAP/pipeline.py @ 0:cbbe42422d56 draft

planemo upload for repository https://github.com/CHESSComputing/ChessAnalysisPipeline/tree/galaxy commit 1401a7e1ae007a6bda260d147f9b879e789b73e0-dirty
author kls286
date Tue, 28 Mar 2023 15:07:30 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python
#-*- coding: utf-8 -*-
#pylint: disable=
"""
File       : pipeline.py
Author     : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: 
"""

# system modules
import logging
from time import time

class Pipeline():
    """
    Pipeline represent generic Pipeline class
    """
    def __init__(self, items=None, kwds=None):
        """
        Pipeline class constructor
        
        :param items: list of objects
        :param kwds: list of method args for individual objects
        """
        self.__name__ = self.__class__.__name__

        self.items = items
        self.kwds = kwds

        self.logger = logging.getLogger(self.__name__)
        self.logger.propagate = False

    def execute(self):
        """
        execute API
        """

        t0 = time()
        self.logger.info(f'Executing "execute"\n')

        data = None
        for item, kwargs in zip(self.items, self.kwds):
            if hasattr(item, 'read'):
                self.logger.info(f'Calling "read" on {item}')
                data = item.read(**kwargs)
            if hasattr(item, 'process'):
                self.logger.info(f'Calling "process" on {item}')
                data = item.process(data, **kwargs)
            if hasattr(item, 'write'):
                self.logger.info(f'Calling "write" on {item}')
                data = item.write(data, **kwargs)

        self.logger.info(f'Exectuted "exectute" in {time()-t0:.3f} seconds')

class PipelineObject():
    """
    PipelineObject represent generic Pipeline class
    """
    def __init__(self, reader, writer, processor, fitter):
        """
        PipelineObject class constructor
        """
        self.reader = reader
        self.writer = writer
        self.processor = processor

    def read(self, filename):
        """
        read object API
        """
        return self.reader.read(filename)

    def write(self, data, filename):
        """
        write object API
        """
        return self.writer.write(data, filename)

    def process(self, data):
        """
        process object API
        """
        return self.processor.process(data)