Mercurial > repos > rv43 > tomo
view detector.py @ 49:26f99fdd8d61 draft
"planemo upload for repository https://github.com/rolfverberg/galaxytools commit 4f7738d02f4a3fd91373f43937ed311b6fe11a12"
author | rv43 |
---|---|
date | Thu, 28 Jul 2022 16:05:24 +0000 |
parents | |
children | 98a83f03d91b |
line wrap: on
line source
import logging import os import yaml from functools import cache from copy import deepcopy #from .general import * from general import illegal_value, is_int, is_num, input_yesno #from hexrd.instrument import HEDMInstrument, PlanarDetector class DetectorConfig: def __init__(self, config_source): self._config_source = config_source if isinstance(self._config_source, str): self._config_file = self._config_source self._config = self._load_config_file() elif isinstance(self._config_source, dict): self._config_file = None self._config = self._config_source else: self._config_file = None self._config = False self._valid = self._validate() if not self.valid: logging.error(f'Cannot create a valid instance of {self.__class__.__name__} '+ f'from {self._config_source}') def __repr__(self): return(f'{self.__class__.__name__}({self._config_source.__repr__()})') def __str__(self): return(f'{self.__class__.__name__} generated from {self._config_source}') @property def config_file(self): return(self._config_file) @property def config(self): return(deepcopy(self._config)) @property def valid(self): return(self._valid) def load_config_file(self): raise(NotImplementedError) def validate(self): raise(NotImplementedError) def _load_config_file(self): return(self.load_config_file()) def _validate(self): if not self.config: logging.error('A configuration must be loaded prior to calling Detector._validate') return(False) else: return(self.validate()) def _write_to_file(self, out_file): out_file = os.path.abspath(out_file) current_config_valid = self.validate() if not current_config_valid: write_invalid_config = input_yesno(s=f'This {self.__class__.__name__} is currently '+ f'invalid. Write the configuration to {out_file} anyways?', default='no') if not write_invalid_config: logging.info('In accordance with user input, the invalid configuration will '+ f'not be written to {out_file}') return if os.access(out_file, os.W_OK): if os.path.exists(out_file): overwrite = input_yesno(s=f'{out_file} already exists. Overwrite?', default='no') if overwrite: self.write_to_file(out_file) else: logging.info(f'In accordance with user input, {out_file} will not be '+ 'overwritten') else: self.write_to_file(out_file) else: logging.error(f'Insufficient permissions to write to {out_file}') def write_to_file(self, out_file): raise(NotImplementedError) class YamlDetectorConfig(DetectorConfig): def __init__(self, config_source, validate_yaml_pars=[]): self._validate_yaml_pars = validate_yaml_pars super().__init__(config_source) def load_config_file(self): if not os.path.splitext(self._config_file)[1]: if os.path.isfile(f'{self._config_file}.yml'): self._config_file = f'{self._config_file}.yml' if os.path.isfile(f'{self._config_file}.yaml'): self._config_file = f'{self._config_file}.yaml' if not os.path.isfile(self._config_file): logging.error(f'Unable to load {self._config_file}') return(False) with open(self._config_file, 'r') as infile: config = yaml.safe_load(infile) if isinstance(config, dict): return(config) else: logging.error(f'Unable to load {self._config_file} as a dictionary') return(False) def validate(self): if not self._validate_yaml_pars: logging.warning('There are no required parameters provided for this detector '+ 'configuration') return(True) def validate_nested_pars(config, validate_yaml_par): yaml_par_levels = validate_yaml_par.split(':') first_level_par = yaml_par_levels[0] try: first_level_par = int(first_level_par) except: pass try: next_level_config = config[first_level_par] if len(yaml_par_levels) > 1: next_level_pars = ':'.join(yaml_par_levels[1:]) return(validate_nested_pars(next_level_config, next_level_pars)) else: return(True) except: return(False) pars_missing = [p for p in self._validate_yaml_pars if not validate_nested_pars(self.config, p)] if len(pars_missing) > 0: logging.error(f'Missing item(s) in configuration: {", ".join(pars_missing)}') return(False) else: return(True) def write_to_file(self, out_file): with open(out_file, 'w') as outf: yaml.dump(self.config, outf) class TomoDetectorConfig(YamlDetectorConfig): def __init__(self, config_source): validate_yaml_pars = ['detector', 'lens_magnification', 'detector:pixels:rows', 'detector:pixels:columns', *[f'detector:pixels:size:{i}' for i in range(2)]] super().__init__(config_source, validate_yaml_pars=validate_yaml_pars) @property @cache def lens_magnification(self): lens_magnification = self.config.get('lens_magnification') if not isinstance(lens_magnification, (int, float)) or lens_magnification <= 0.: illegal_value(lens_magnification, 'lens_magnification', 'detector file') logging.warning('Using default lens_magnification value of 1.0') return(1.0) else: return(lens_magnification) @property @cache def pixel_size(self): pixel_size = self.config['detector'].get('pixels').get('size') if isinstance(pixel_size, (int, float)): if pixel_size <= 0.: illegal_value(pixel_size, 'pixel_size', 'detector file') return(None) pixel_size /= self.lens_magnification elif isinstance(pixel_size, list): if ((len(pixel_size) > 2) or (len(pixel_size) == 2 and pixel_size[0] != pixel_size[1])): illegal_value(pixel_size, 'pixel size', 'detector file') return(None) elif not is_num(pixel_size[0], 0.): illegal_value(pixel_size, 'pixel size', 'detector file') return(None) else: pixel_size = pixel_size[0]/self.lens_magnification else: illegal_value(pixel_size, 'pixel size', 'detector file') return(None) return(pixel_size) @property @cache def dimensions(self): pixels = self.config['detector'].get('pixels') num_rows = pixels.get('rows') if not is_int(num_rows, 1): illegal_value(num_rows, 'rows', 'detector file') return(None) num_columns = pixels.get('columns') if not is_int(num_columns, 1): illegal_value(num_columns, 'columns', 'detector file') return(None) return(num_rows, num_columns) class EDDDetectorConfig(YamlDetectorConfig): def __init__(self, config_source): validate_yaml_pars = ['num_bins', 'max_E', # 'angle', # KLS leave this out for now -- I think it has to do with the relative geometry of sample, beam, and detector (not a property of the detector on its own), so may not belong here in the DetectorConfig object? 'tth_angle', 'slope', 'intercept'] super().__init__(config_source, validate_yaml_pars=validate_yaml_pars) @property @cache def num_bins(self): try: num_bins = int(self.config['num_bins']) if num_bins <= 0: raise(ValueError) else: return(num_bins) except: illegal_value(self.config['num_bins'], 'num_bins') @property @cache def max_E(self): try: max_E = float(self.config['max_E']) if max_E <= 0: raise(ValueError) else: return(max_E) except: illegal_value(self.config['max_E'], 'max_E') return(None) @property def bin_energies(self): return(self.slope * np.linspace(0, self.max_E, self.num_bins, endpoint=False) + self.intercept) @property def tth_angle(self): try: return(float(self.config['tth_angle'])) except: illegal_value(tth_angle, 'tth_angle') return(None) @tth_angle.setter def tth_angle(self, value): try: self._config['tth_angle'] = float(value) except: illegal_value(value, 'tth_angle') @property def slope(self): try: return(float(self.config['slope'])) except: illegal_value(slope, 'slope') return(None) @slope.setter def slope(self, value): try: self._config['slope'] = float(value) except: illegal_value(value, 'slope') @property def intercept(self): try: return(float(self.config['intercept'])) except: illegal_value(intercept, 'intercept') return(None) @intercept.setter def intercept(self, value): try: self._config['intercept'] = float(value) except: illegal_value(value, 'intercept') # class HexrdDetectorConfig(YamlDetectorConfig): # def __init__(self, config_source, detector_names=[]): # self.detector_names = detector_names # validate_yaml_pars_each_detector = [*[f'buffer:{i}' for i in range(2)], # 'distortion:function_name', # *[f'distortion:parameters:{i}' for i in range(6)], # 'pixels:columns', # 'pixels:rows', # *['pixels:size:%i' % i for i in range(2)], # 'saturation_level', # *[f'transform:tilt:{i}' for i in range(3)], # *[f'transform:translation:{i}' for i in range(3)]] # validate_yaml_pars = [] # for detector_name in self.detector_names: # validate_yaml_pars += [f'detectors:{detector_name}:{par}' for par in validate_yaml_pars_each_detector] # super().__init__(config_source, validate_yaml_pars=validate_yaml_pars) # def validate(self): # yaml_valid = YamlDetectorConfig.validate(self) # if not yaml_valid: # return(False) # else: # hedm_instrument = HEDMInstrument(instrument_config=self.config) # for detector_name in self.detector_names: # if detector_name in hedm_instrument.detectors: # if isinstance(hedm_instrument.detectors[detector_name], PlanarDetector): # continue # else: # return(False) # else: # return(False) # return(True) # class SAXSWAXSDetectorConfig(DetectorConfig): # def __init__(self, config_source): # super().__init__(config_source) # @property # def ai(self): # return(self.config) # @ai.setter # def ai(self, value): # if isinstance(value, pyFAI.azimuthalIntegrator.AzimuthalIntegrator): # self.config = ai # else: # illegal_value(value, 'azimuthal integrator') # # pyFAI will perform its own error-checking for the mask attribute. # mask = property(self.ai.get_mask, self.ai,set_mask)