| 3 | 1 #!/usr/bin/python | 
|  | 2 from datetime import datetime, timedelta | 
|  | 3 from io import BytesIO as BIO | 
|  | 4 import logging | 
|  | 5 import os | 
|  | 6 import subprocess | 
|  | 7 import tarfile | 
|  | 8 import time | 
|  | 9 from watchdog.observers import Observer | 
|  | 10 from watchdog.events import FileSystemEventHandler | 
|  | 11 from watchdog.events import PatternMatchingEventHandler | 
|  | 12 | 
|  | 13 class ToolHandler(PatternMatchingEventHandler): | 
|  | 14 | 
|  | 15     def __init__(self, watchme): | 
|  | 16         PatternMatchingEventHandler.__init__(self, patterns=['*.xml'], | 
|  | 17                 ignore_directories=False, case_sensitive=False) | 
|  | 18         self.last_modified = datetime.now() | 
|  | 19         self.tool_dir = watchme | 
|  | 20         self.work_dir = os.getcwd() | 
|  | 21         self.galaxy_root = os.path.split(watchme)[0] | 
|  | 22         logging.info(self.galaxy_root) | 
|  | 23         self.tar_dir = os.path.join(self.galaxy_root, 'tooltardir') | 
|  | 24         if not os.path.exists(self.tar_dir): | 
|  | 25                 os.mkdir(self.tar_dir) | 
|  | 26 | 
|  | 27     def on_created(self, event): | 
|  | 28         self.on_modified(event) | 
|  | 29 | 
|  | 30     def on_modified(self, event): | 
|  | 31         if datetime.now() - self.last_modified < timedelta(seconds=1): | 
|  | 32             return | 
|  | 33         else: | 
|  | 34             if os.path.exists(event.src_path): | 
|  | 35                 self.last_modified = datetime.now() | 
|  | 36                 logging.info(f"{event.src_path} was {event.event_type}") | 
|  | 37                 p = self.planemo_test(event.src_path) | 
|  | 38                 if p: | 
|  | 39                     if p.returncode == 0: | 
|  | 40                         newtarpath = self.makeToolTar(event.src_path) | 
|  | 41                         logging.info('### Tested toolshed tarball %s written' % newtarpath) | 
|  | 42                     else: | 
|  | 43                         logging.debug('### planemo stdout:') | 
|  | 44                         logging.debug(p.stdout) | 
|  | 45                         logging.debug('### planemo stderr:') | 
|  | 46                         logging.debug(p.stderr) | 
|  | 47                         logging.info('### Planemo call return code =' % p.returncode) | 
|  | 48             else: | 
|  | 49                 logging.info('Directory %s deleted' % event.src_path) | 
|  | 50 | 
|  | 51     def planemo_test(self, xml_path): | 
|  | 52         toolpath, toolfile = os.path.split(xml_path) | 
|  | 53         dirlist = os.listdir(toolpath) | 
|  | 54         toolname = os.path.basename(toolpath) | 
|  | 55         logging.info('### test dirlist %s, path %s toolname %s' % (dirlist, xml_path, toolname)) | 
|  | 56         xmls = [x for x in dirlist if os.path.splitext(x)[1] == '.xml'] | 
|  | 57         if not len(xmls) > 0: | 
|  | 58             logging.warning('Found no xml files after change to %s' % xml_path) | 
|  | 59             return None | 
|  | 60         tool_test_output = os.path.join(toolpath, f"{toolname}_planemo_test_report.html") | 
|  | 61         cll = [ | 
|  | 62             "planemo", | 
|  | 63             "test", | 
|  | 64             "--test_output", | 
|  | 65             tool_test_output, | 
|  | 66             "--galaxy_root", | 
|  | 67             self.galaxy_root, | 
|  | 68             "--update_test_data", | 
|  | 69             xml_path, | 
|  | 70         ] | 
|  | 71         logging.info('### calling %s' % ' '.join(cll)) | 
|  | 72         p = subprocess.run( | 
|  | 73             cll, | 
|  | 74             cwd = toolpath, | 
|  | 75             shell=False, | 
|  | 76             capture_output=True, | 
|  | 77             encoding='utf8', | 
|  | 78         ) | 
|  | 79         return p | 
|  | 80 | 
|  | 81     def makeToolTar(self, xml_path): | 
|  | 82         """move outputs into test-data and prepare the tarball""" | 
|  | 83         excludeme = "_planemo_test_report.html" | 
|  | 84 | 
|  | 85         def exclude_function(tarinfo): | 
|  | 86             filename = tarinfo.name | 
|  | 87             return None if filename.endswith(excludeme) else tarinfo | 
|  | 88 | 
|  | 89         tooldir, xml_file = os.path.split(xml_path) | 
|  | 90         os.chdir(self.tool_dir) | 
|  | 91         toolname = os.path.splitext(xml_file)[0] | 
|  | 92         newtarpath = os.path.join(self.tar_dir, '%s_toolshed.gz' % toolname) | 
|  | 93         tf = tarfile.open(newtarpath, "w:gz") | 
|  | 94         tf.add( | 
|  | 95             name=toolname, | 
|  | 96             arcname=toolname, | 
|  | 97             filter=exclude_function, | 
|  | 98         ) | 
|  | 99         tf.close() | 
|  | 100         os.chdir(self.work_dir) | 
|  | 101         return newtarpath | 
|  | 102 | 
|  | 103 | 
|  | 104 if __name__ == "__main__": | 
|  | 105     watchme = '/home/ross/gal21/tools' | 
|  | 106     logging.basicConfig(level=logging.INFO, | 
|  | 107                     #filename = os.path.join(watchme,"toolwatcher.log") | 
|  | 108                     #filemode = "w", | 
|  | 109                     format='%(asctime)s - %(message)s', | 
|  | 110                     datefmt='%Y-%m-%d %H:%M:%S') | 
|  | 111     event_handler = ToolHandler(watchme=watchme) | 
|  | 112     observer = Observer() | 
|  | 113     observer.schedule(event_handler, path=watchme, recursive=True) | 
|  | 114     observer.start() | 
|  | 115     try: | 
|  | 116         while True: | 
|  | 117             time.sleep(1) | 
|  | 118     except KeyboardInterrupt: | 
|  | 119         observer.stop() | 
|  | 120     observer.join() | 
|  | 121 | 
|  | 122 | 
|  | 123 |