diff workflow/models.py @ 71:1cf15b61cd83 draft

planemo upload for repository https://github.com/rolfverberg/galaxytools commit 366e516aef0735af2998c6ff3af037181c8d5213
author rv43
date Mon, 20 Mar 2023 13:56:57 +0000
parents fba792d5f83b
children
line wrap: on
line diff
--- a/workflow/models.py	Fri Mar 10 16:39:22 2023 +0000
+++ b/workflow/models.py	Mon Mar 20 13:56:57 2023 +0000
@@ -11,9 +11,9 @@
 
 from functools import cache
 from pathlib import PosixPath
-from pydantic import validator, ValidationError, conint, confloat, constr, \
-        conlist, FilePath, PrivateAttr
 from pydantic import BaseModel as PydanticBaseModel
+from pydantic import validator, ValidationError, conint, confloat, constr, conlist, FilePath, \
+        PrivateAttr
 from nexusformat.nexus import *
 from time import time
 from typing import Optional, Literal
@@ -23,17 +23,35 @@
 except:
     pass
 
-from msnctools.general import is_int, is_num, input_int, input_int_list, input_num, input_yesno, \
-        input_menu, index_nearest, string_to_list, file_exists_and_readable
+try:
+    from msnctools.general import is_int, is_num, input_int, input_int_list, input_num, \
+            input_yesno, input_menu, index_nearest, string_to_list, file_exists_and_readable
+except:
+    from general import is_int, is_num, input_int, input_int_list, input_num, \
+            input_yesno, input_menu, index_nearest, string_to_list, file_exists_and_readable
 
 
 def import_scanparser(station):
     if station in ('id1a3', 'id3a'):
-        from msnctools.scanparsers import SMBRotationScanParser
-        globals()['ScanParser'] = SMBRotationScanParser
+        try:
+            from msnctools.scanparsers import SMBRotationScanParser
+            globals()['ScanParser'] = SMBRotationScanParser
+        except:
+            try:
+                from scanparsers import SMBRotationScanParser
+                globals()['ScanParser'] = SMBRotationScanParser
+            except:
+                pass
     elif station in ('id3b'):
-        from msnctools.scanparsers import FMBRotationScanParser
-        globals()['ScanParser'] = FMBRotationScanParser
+        try:
+            from msnctools.scanparsers import FMBRotationScanParser
+            globals()['ScanParser'] = FMBRotationScanParser
+        except:
+            try:
+                from scanparsers import FMBRotationScanParser
+                globals()['ScanParser'] = FMBRotationScanParser
+            except:
+                pass
     else:
         raise RuntimeError(f'Invalid station: {station}')
 
@@ -45,10 +63,9 @@
         try:
             parser = ScanParser(spec_file, scan_number)
             try:
-                scan_type = parser.get_scan_type()
+                scan_type = parser.scan_type
             except:
                 scan_type = None
-                pass
         except:
             scan_numbers.remove(scan_number)
     return(scan_numbers)
@@ -434,15 +451,6 @@
     def get_scanparser(self, scan_number):
         return(get_scanparser(self.spec_file, scan_number))
 
-#    def get_detector_data(self, detector_prefix, scan_number=None, scan_step_index=None):
-#        if scan_number is None:
-#            scan_number = self.scan_numbers[0]
-#        if scan_step_index is None:
-#            scan_info = self.stack_info[self.get_scan_index(scan_number)]
-#            scan_step_index = scan_info['starting_image_offset']
-#        parser = self.get_scanparser(scan_number)
-#        return(parser.get_detector_data(detector_prefix, scan_step_index))
-
     def get_detector_data(self, detector_prefix, scan_number=None, scan_step_index=None):
         image_stacks = []
         if scan_number is None:
@@ -460,10 +468,13 @@
             else:
                 image_stacks.append(parser.get_detector_data(detector_prefix,
                         image_offset+scan_step_index))
-        if len(image_stacks) == 1:
+        if scan_number is not None and scan_step_index is not None:
+            # Return a single image for a specific scan_number and scan_step_index request
             return(image_stacks[0])
         else:
+            # Return a list otherwise
             return(image_stacks)
+        return(image_stacks)
 
     def scan_numbers_cli(self, attr_desc, **kwargs):
         available_scan_numbers = self.available_scan_numbers
@@ -475,8 +486,11 @@
                 available_scan_numbers = []
                 for scan_number in self.available_scan_numbers:
                     parser = self.get_scanparser(scan_number)
-                    if parser.scan_type == scan_type:
-                        available_scan_numbers.append(scan_number)
+                    try:
+                        if parser.scan_type == scan_type:
+                            available_scan_numbers.append(scan_number)
+                    except:
+                        pass
             elif scan_type == 'df1':
                 tomo_scan_numbers = kwargs['tomo_scan_numbers']
                 available_scan_numbers = []
@@ -587,8 +601,8 @@
             scan_index = self.get_scan_index(scan_number)
 
             # Select the image set
-            last_image_index = image_offset+num_image-1
-            print(f'Available good image set index range: [{image_offset}, {last_image_index}]')
+            last_image_index = image_offset+num_image
+            print(f'Available good image set index range: [{image_offset}, {last_image_index})')
             image_set_approved = False
             if scan_index is not None:
                 scan_info = stack_info[scan_index]
@@ -597,16 +611,16 @@
                 image_set_approved = input_yesno(f'Accept these values (y/n)?', 'y')
             if not image_set_approved:
                 print(f'Default starting image offset and number of images: '+
-                        f'{image_offset} and {last_image_index-image_offset}')
+                        f'{image_offset} and {num_image}')
                 image_set_approved = input_yesno(f'Accept these values (y/n)?', 'y')
                 if image_set_approved:
                     offset = image_offset
                     num = last_image_index-offset
                 while not image_set_approved:
                     offset = input_int(f'Enter the starting image offset', ge=image_offset,
-                            le=last_image_index-1)#, default=image_offset)
+                            lt=last_image_index)#, default=image_offset)
                     num = input_int(f'Enter the number of images', ge=1,
-                            le=last_image_index-offset+1)#, default=last_image_index-offset+1)
+                            le=last_image_index-offset)#, default=last_image_index-offset)
                     print(f'Current starting image offset and number of images: {offset} and {num}')
                     image_set_approved = input_yesno(f'Accept these values (y/n)?', 'y')
                 if scan_index is not None:
@@ -688,7 +702,7 @@
             scan_numbers = [scan_number]
         for scan_number in scan_numbers:
             parser = self.get_scanparser(scan_number)
-            horizontal_shifts.append(parser.get_horizontal_shift())
+            horizontal_shifts.append(parser.horizontal_shift)
         if len(horizontal_shifts) == 1:
             return(horizontal_shifts[0])
         else:
@@ -702,7 +716,7 @@
             scan_numbers = [scan_number]
         for scan_number in scan_numbers:
             parser = self.get_scanparser(scan_number)
-            vertical_shifts.append(parser.get_vertical_shift())
+            vertical_shifts.append(parser.vertical_shift)
         if len(vertical_shifts) == 1:
             return(vertical_shifts[0])
         else:
@@ -728,22 +742,20 @@
             return
 
         # Select the theta range for the tomo reconstruction from the first scan
+        theta_range_approved = False
         thetas = np.linspace(spec_theta_start, spec_theta_end, spec_num_theta)
         delta_theta = thetas[1]-thetas[0]
-        theta_range_approved = False
-        print(f'Theta range obtained from SPEC data: [{spec_theta_start}, {spec_theta_end})')
+        print(f'Theta range obtained from SPEC data: [{spec_theta_start}, {spec_theta_end}]')
         print(f'Theta step size = {delta_theta}')
         print(f'Number of theta values: {spec_num_theta}')
-        exit('Done')
         default_start = None
         default_end = None
         if station in ('id1a3', 'id3a'):
             theta_range_approved = input_yesno(f'Accept this theta range (y/n)?', 'y')
             if theta_range_approved:
-                theta_start = spec_theta_start
-                theta_end = spec_theta_end
-                num_theta = spec_num_theta
-                theta_index_start = 0
+                self.theta_range = {'start': float(spec_theta_start), 'end': float(spec_theta_end),
+                        'num': int(spec_num_theta), 'start_index': 0}
+                return
         elif station in ('id3b'):
             if spec_theta_start <= 0.0 and spec_theta_end >= 180.0:
                 default_start = 0
@@ -765,7 +777,8 @@
                     f'{theta_end})')
             print(f'Number of theta values: {num_theta}')
             theta_range_approved = input_yesno(f'Accept this theta range (y/n)?', 'y')
-        self.thetas = np.linspace(theta_start, theta_end, num_theta)
+        self.theta_range = {'start': float(theta_start), 'end': float(theta_end),
+                'num': int(num_theta), 'start_index': int(theta_index_start)}
 
     def image_range_cli(self, attr_desc, detector_prefix):
         stack_info = self.stack_info
@@ -805,13 +818,17 @@
         btr = cli_kwargs.get('btr')
         station = cli_kwargs.get('station')
         detector = cli_kwargs.get('detector')
+        sample_name = cli_kwargs.get('sample_name')
         print(f'\n -- Configure the location of the {attr_desc}scan data -- ')
         if station in ('id1a3', 'id3a'):
             basedir = f'/nfs/chess/{station}/{cycle}/{btr}'
             runs = [d for d in os.listdir(basedir) if os.path.isdir(os.path.join(basedir, d))]
 #RV            index = 15-1
 #RV            index = 7-1
-            index = input_menu(runs, header='Choose a sample directory')
+            if sample_name is not None and sample_name in runs:
+                index = runs.index(sample_name)
+            else:
+                index = input_menu(runs, header='Choose a sample directory')
             self.spec_file = f'{basedir}/{runs[index]}/spec.log'
             self.scan_numbers_cli(attr_desc, station=station, scan_type='ts1')
         else:
@@ -845,8 +862,8 @@
 
     def cli(self):
         print('\n -- Configure the sample metadata -- ')
-#RV        self.name = 'test'
 #RV        self.name = 'sobhani-3249-A'
+#RV        self.name = 'tenstom_1304r-1'
         self.set_single_attr_cli('name', 'the sample name')
 #RV        self.description = 'test sample'
         self.set_single_attr_cli('description', 'a description of the sample (optional)')
@@ -917,17 +934,19 @@
             use_detector_config = input_yesno(f'Current detector settings:\n{self.detector}\n'+
                     f'Keep these settings? (y/n)')
         if not use_detector_config:
-#RV            have_detector_config = True
-            have_detector_config = input_yesno(f'Is a detector configuration file available? (y/n)')
-            if have_detector_config:
-#RV                detector_config_file = 'retiga.yaml'
-#RV                detector_config_file = 'andor2.yaml'
-                detector_config_file = input(f'Enter detector configuration file name: ')
+            menu_options = ['not listed', 'andor2', 'manta', 'retiga']
+            input_mode = input_menu(menu_options, header='Choose one of the following detector '+
+                    'configuration options')
+            if input_mode:
+                detector_config_file = f'{menu_options[input_mode]}.yaml'
                 have_detector_config = self.detector.construct_from_yaml(detector_config_file)
+            else:
+                have_detector_config = False
             if not have_detector_config:
                 self.set_single_attr_cli('detector', 'detector')
         self.set_single_attr_cli('tomo_fields', 'Tomo field', chain_attr_desc=True,
-                cycle=self.cycle, btr=self.btr, station=self.station, detector=self.detector)
+                cycle=self.cycle, btr=self.btr, station=self.station, detector=self.detector,
+                sample_name=self.sample.name)
         if self.station in ('id1a3', 'id3a'):
             have_dark_field = True
             tomo_spec_file = self.tomo_fields.spec_file
@@ -1001,7 +1020,7 @@
             nxspec_scans[field_name] = field.construct_nxcollection(image_key, thetas,
                     self.detector)
             if include_raw_data:
-                image_stacks.append(field.get_detector_data(self.detector.prefix))
+                image_stacks += field.get_detector_data(self.detector.prefix)
                 for scan_number in field.scan_numbers:
                     parser = field.get_scanparser(scan_number)
                     scan_info = field.stack_info[field.get_scan_index(scan_number)]