Mercurial > repos > rv43 > tomo
diff workflow/models.py @ 71:1cf15b61cd83 draft
planemo upload for repository https://github.com/rolfverberg/galaxytools commit 366e516aef0735af2998c6ff3af037181c8d5213
author | rv43 |
---|---|
date | Mon, 20 Mar 2023 13:56:57 +0000 |
parents | fba792d5f83b |
children |
line wrap: on
line diff
--- a/workflow/models.py Fri Mar 10 16:39:22 2023 +0000 +++ b/workflow/models.py Mon Mar 20 13:56:57 2023 +0000 @@ -11,9 +11,9 @@ from functools import cache from pathlib import PosixPath -from pydantic import validator, ValidationError, conint, confloat, constr, \ - conlist, FilePath, PrivateAttr from pydantic import BaseModel as PydanticBaseModel +from pydantic import validator, ValidationError, conint, confloat, constr, conlist, FilePath, \ + PrivateAttr from nexusformat.nexus import * from time import time from typing import Optional, Literal @@ -23,17 +23,35 @@ except: pass -from msnctools.general import is_int, is_num, input_int, input_int_list, input_num, input_yesno, \ - input_menu, index_nearest, string_to_list, file_exists_and_readable +try: + from msnctools.general import is_int, is_num, input_int, input_int_list, input_num, \ + input_yesno, input_menu, index_nearest, string_to_list, file_exists_and_readable +except: + from general import is_int, is_num, input_int, input_int_list, input_num, \ + input_yesno, input_menu, index_nearest, string_to_list, file_exists_and_readable def import_scanparser(station): if station in ('id1a3', 'id3a'): - from msnctools.scanparsers import SMBRotationScanParser - globals()['ScanParser'] = SMBRotationScanParser + try: + from msnctools.scanparsers import SMBRotationScanParser + globals()['ScanParser'] = SMBRotationScanParser + except: + try: + from scanparsers import SMBRotationScanParser + globals()['ScanParser'] = SMBRotationScanParser + except: + pass elif station in ('id3b'): - from msnctools.scanparsers import FMBRotationScanParser - globals()['ScanParser'] = FMBRotationScanParser + try: + from msnctools.scanparsers import FMBRotationScanParser + globals()['ScanParser'] = FMBRotationScanParser + except: + try: + from scanparsers import FMBRotationScanParser + globals()['ScanParser'] = FMBRotationScanParser + except: + pass else: raise RuntimeError(f'Invalid station: {station}') @@ -45,10 +63,9 @@ try: parser = ScanParser(spec_file, scan_number) try: - scan_type = parser.get_scan_type() + scan_type = parser.scan_type except: scan_type = None - pass except: scan_numbers.remove(scan_number) return(scan_numbers) @@ -434,15 +451,6 @@ def get_scanparser(self, scan_number): return(get_scanparser(self.spec_file, scan_number)) -# def get_detector_data(self, detector_prefix, scan_number=None, scan_step_index=None): -# if scan_number is None: -# scan_number = self.scan_numbers[0] -# if scan_step_index is None: -# scan_info = self.stack_info[self.get_scan_index(scan_number)] -# scan_step_index = scan_info['starting_image_offset'] -# parser = self.get_scanparser(scan_number) -# return(parser.get_detector_data(detector_prefix, scan_step_index)) - def get_detector_data(self, detector_prefix, scan_number=None, scan_step_index=None): image_stacks = [] if scan_number is None: @@ -460,10 +468,13 @@ else: image_stacks.append(parser.get_detector_data(detector_prefix, image_offset+scan_step_index)) - if len(image_stacks) == 1: + if scan_number is not None and scan_step_index is not None: + # Return a single image for a specific scan_number and scan_step_index request return(image_stacks[0]) else: + # Return a list otherwise return(image_stacks) + return(image_stacks) def scan_numbers_cli(self, attr_desc, **kwargs): available_scan_numbers = self.available_scan_numbers @@ -475,8 +486,11 @@ available_scan_numbers = [] for scan_number in self.available_scan_numbers: parser = self.get_scanparser(scan_number) - if parser.scan_type == scan_type: - available_scan_numbers.append(scan_number) + try: + if parser.scan_type == scan_type: + available_scan_numbers.append(scan_number) + except: + pass elif scan_type == 'df1': tomo_scan_numbers = kwargs['tomo_scan_numbers'] available_scan_numbers = [] @@ -587,8 +601,8 @@ scan_index = self.get_scan_index(scan_number) # Select the image set - last_image_index = image_offset+num_image-1 - print(f'Available good image set index range: [{image_offset}, {last_image_index}]') + last_image_index = image_offset+num_image + print(f'Available good image set index range: [{image_offset}, {last_image_index})') image_set_approved = False if scan_index is not None: scan_info = stack_info[scan_index] @@ -597,16 +611,16 @@ image_set_approved = input_yesno(f'Accept these values (y/n)?', 'y') if not image_set_approved: print(f'Default starting image offset and number of images: '+ - f'{image_offset} and {last_image_index-image_offset}') + f'{image_offset} and {num_image}') image_set_approved = input_yesno(f'Accept these values (y/n)?', 'y') if image_set_approved: offset = image_offset num = last_image_index-offset while not image_set_approved: offset = input_int(f'Enter the starting image offset', ge=image_offset, - le=last_image_index-1)#, default=image_offset) + lt=last_image_index)#, default=image_offset) num = input_int(f'Enter the number of images', ge=1, - le=last_image_index-offset+1)#, default=last_image_index-offset+1) + le=last_image_index-offset)#, default=last_image_index-offset) print(f'Current starting image offset and number of images: {offset} and {num}') image_set_approved = input_yesno(f'Accept these values (y/n)?', 'y') if scan_index is not None: @@ -688,7 +702,7 @@ scan_numbers = [scan_number] for scan_number in scan_numbers: parser = self.get_scanparser(scan_number) - horizontal_shifts.append(parser.get_horizontal_shift()) + horizontal_shifts.append(parser.horizontal_shift) if len(horizontal_shifts) == 1: return(horizontal_shifts[0]) else: @@ -702,7 +716,7 @@ scan_numbers = [scan_number] for scan_number in scan_numbers: parser = self.get_scanparser(scan_number) - vertical_shifts.append(parser.get_vertical_shift()) + vertical_shifts.append(parser.vertical_shift) if len(vertical_shifts) == 1: return(vertical_shifts[0]) else: @@ -728,22 +742,20 @@ return # Select the theta range for the tomo reconstruction from the first scan + theta_range_approved = False thetas = np.linspace(spec_theta_start, spec_theta_end, spec_num_theta) delta_theta = thetas[1]-thetas[0] - theta_range_approved = False - print(f'Theta range obtained from SPEC data: [{spec_theta_start}, {spec_theta_end})') + print(f'Theta range obtained from SPEC data: [{spec_theta_start}, {spec_theta_end}]') print(f'Theta step size = {delta_theta}') print(f'Number of theta values: {spec_num_theta}') - exit('Done') default_start = None default_end = None if station in ('id1a3', 'id3a'): theta_range_approved = input_yesno(f'Accept this theta range (y/n)?', 'y') if theta_range_approved: - theta_start = spec_theta_start - theta_end = spec_theta_end - num_theta = spec_num_theta - theta_index_start = 0 + self.theta_range = {'start': float(spec_theta_start), 'end': float(spec_theta_end), + 'num': int(spec_num_theta), 'start_index': 0} + return elif station in ('id3b'): if spec_theta_start <= 0.0 and spec_theta_end >= 180.0: default_start = 0 @@ -765,7 +777,8 @@ f'{theta_end})') print(f'Number of theta values: {num_theta}') theta_range_approved = input_yesno(f'Accept this theta range (y/n)?', 'y') - self.thetas = np.linspace(theta_start, theta_end, num_theta) + self.theta_range = {'start': float(theta_start), 'end': float(theta_end), + 'num': int(num_theta), 'start_index': int(theta_index_start)} def image_range_cli(self, attr_desc, detector_prefix): stack_info = self.stack_info @@ -805,13 +818,17 @@ btr = cli_kwargs.get('btr') station = cli_kwargs.get('station') detector = cli_kwargs.get('detector') + sample_name = cli_kwargs.get('sample_name') print(f'\n -- Configure the location of the {attr_desc}scan data -- ') if station in ('id1a3', 'id3a'): basedir = f'/nfs/chess/{station}/{cycle}/{btr}' runs = [d for d in os.listdir(basedir) if os.path.isdir(os.path.join(basedir, d))] #RV index = 15-1 #RV index = 7-1 - index = input_menu(runs, header='Choose a sample directory') + if sample_name is not None and sample_name in runs: + index = runs.index(sample_name) + else: + index = input_menu(runs, header='Choose a sample directory') self.spec_file = f'{basedir}/{runs[index]}/spec.log' self.scan_numbers_cli(attr_desc, station=station, scan_type='ts1') else: @@ -845,8 +862,8 @@ def cli(self): print('\n -- Configure the sample metadata -- ') -#RV self.name = 'test' #RV self.name = 'sobhani-3249-A' +#RV self.name = 'tenstom_1304r-1' self.set_single_attr_cli('name', 'the sample name') #RV self.description = 'test sample' self.set_single_attr_cli('description', 'a description of the sample (optional)') @@ -917,17 +934,19 @@ use_detector_config = input_yesno(f'Current detector settings:\n{self.detector}\n'+ f'Keep these settings? (y/n)') if not use_detector_config: -#RV have_detector_config = True - have_detector_config = input_yesno(f'Is a detector configuration file available? (y/n)') - if have_detector_config: -#RV detector_config_file = 'retiga.yaml' -#RV detector_config_file = 'andor2.yaml' - detector_config_file = input(f'Enter detector configuration file name: ') + menu_options = ['not listed', 'andor2', 'manta', 'retiga'] + input_mode = input_menu(menu_options, header='Choose one of the following detector '+ + 'configuration options') + if input_mode: + detector_config_file = f'{menu_options[input_mode]}.yaml' have_detector_config = self.detector.construct_from_yaml(detector_config_file) + else: + have_detector_config = False if not have_detector_config: self.set_single_attr_cli('detector', 'detector') self.set_single_attr_cli('tomo_fields', 'Tomo field', chain_attr_desc=True, - cycle=self.cycle, btr=self.btr, station=self.station, detector=self.detector) + cycle=self.cycle, btr=self.btr, station=self.station, detector=self.detector, + sample_name=self.sample.name) if self.station in ('id1a3', 'id3a'): have_dark_field = True tomo_spec_file = self.tomo_fields.spec_file @@ -1001,7 +1020,7 @@ nxspec_scans[field_name] = field.construct_nxcollection(image_key, thetas, self.detector) if include_raw_data: - image_stacks.append(field.get_detector_data(self.detector.prefix)) + image_stacks += field.get_detector_data(self.detector.prefix) for scan_number in field.scan_numbers: parser = field.get_scanparser(scan_number) scan_info = field.stack_info[field.get_scan_index(scan_number)]