diff CHAP/pipeline.py @ 0:cbbe42422d56 draft

planemo upload for repository https://github.com/CHESSComputing/ChessAnalysisPipeline/tree/galaxy commit 1401a7e1ae007a6bda260d147f9b879e789b73e0-dirty
author kls286
date Tue, 28 Mar 2023 15:07:30 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CHAP/pipeline.py	Tue Mar 28 15:07:30 2023 +0000
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+#-*- coding: utf-8 -*-
+#pylint: disable=
+"""
+File       : pipeline.py
+Author     : Valentin Kuznetsov <vkuznet AT gmail dot com>
+Description: 
+"""
+
+# system modules
+import logging
+from time import time
+
+class Pipeline():
+    """
+    Pipeline represent generic Pipeline class
+    """
+    def __init__(self, items=None, kwds=None):
+        """
+        Pipeline class constructor
+        
+        :param items: list of objects
+        :param kwds: list of method args for individual objects
+        """
+        self.__name__ = self.__class__.__name__
+
+        self.items = items
+        self.kwds = kwds
+
+        self.logger = logging.getLogger(self.__name__)
+        self.logger.propagate = False
+
+    def execute(self):
+        """
+        execute API
+        """
+
+        t0 = time()
+        self.logger.info(f'Executing "execute"\n')
+
+        data = None
+        for item, kwargs in zip(self.items, self.kwds):
+            if hasattr(item, 'read'):
+                self.logger.info(f'Calling "read" on {item}')
+                data = item.read(**kwargs)
+            if hasattr(item, 'process'):
+                self.logger.info(f'Calling "process" on {item}')
+                data = item.process(data, **kwargs)
+            if hasattr(item, 'write'):
+                self.logger.info(f'Calling "write" on {item}')
+                data = item.write(data, **kwargs)
+
+        self.logger.info(f'Exectuted "exectute" in {time()-t0:.3f} seconds')
+
+class PipelineObject():
+    """
+    PipelineObject represent generic Pipeline class
+    """
+    def __init__(self, reader, writer, processor, fitter):
+        """
+        PipelineObject class constructor
+        """
+        self.reader = reader
+        self.writer = writer
+        self.processor = processor
+
+    def read(self, filename):
+        """
+        read object API
+        """
+        return self.reader.read(filename)
+
+    def write(self, data, filename):
+        """
+        write object API
+        """
+        return self.writer.write(data, filename)
+
+    def process(self, data):
+        """
+        process object API
+        """
+        return self.processor.process(data)
+