diff detector.py @ 49:26f99fdd8d61 draft

"planemo upload for repository https://github.com/rolfverberg/galaxytools commit 4f7738d02f4a3fd91373f43937ed311b6fe11a12"
author rv43
date Thu, 28 Jul 2022 16:05:24 +0000
parents
children 98a83f03d91b
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/detector.py	Thu Jul 28 16:05:24 2022 +0000
@@ -0,0 +1,345 @@
+import logging
+import os
+import yaml
+from functools import cache
+from copy import deepcopy
+
+#from .general import *
+from general import illegal_value, is_int, is_num, input_yesno
+
+#from hexrd.instrument import HEDMInstrument, PlanarDetector
+
+class DetectorConfig:
+    def __init__(self, config_source):
+        self._config_source = config_source
+
+        if isinstance(self._config_source, str):
+            self._config_file = self._config_source
+            self._config = self._load_config_file()
+        elif isinstance(self._config_source, dict):
+            self._config_file = None
+            self._config = self._config_source
+        else:
+            self._config_file = None
+            self._config = False
+
+        self._valid = self._validate()
+
+        if not self.valid:
+            logging.error(f'Cannot create a valid instance of {self.__class__.__name__} '+
+                    f'from {self._config_source}')
+
+    def __repr__(self):
+        return(f'{self.__class__.__name__}({self._config_source.__repr__()})')
+    def __str__(self):
+        return(f'{self.__class__.__name__} generated from {self._config_source}')
+
+    @property
+    def config_file(self):
+        return(self._config_file)
+
+    @property
+    def config(self):
+        return(deepcopy(self._config))
+
+    @property
+    def valid(self):
+        return(self._valid)
+
+    def load_config_file(self):
+        raise(NotImplementedError)
+
+    def validate(self):
+        raise(NotImplementedError)
+
+    def _load_config_file(self):
+        return(self.load_config_file())
+
+    def _validate(self):
+        if not self.config:
+            logging.error('A configuration must be loaded prior to calling Detector._validate')
+            return(False)
+        else:
+            return(self.validate())
+
+    def _write_to_file(self, out_file):
+        out_file = os.path.abspath(out_file)
+
+        current_config_valid = self.validate()
+        if not current_config_valid:
+            write_invalid_config = input_yesno(s=f'This {self.__class__.__name__} is currently '+
+                    f'invalid. Write the configuration to {out_file} anyways?', default='no')
+            if not write_invalid_config:
+                logging.info('In accordance with user input, the invalid configuration will '+
+                        f'not be written to {out_file}')
+                return 
+
+        if os.access(out_file, os.W_OK):
+            if os.path.exists(out_file):
+                overwrite = input_yesno(s=f'{out_file} already exists. Overwrite?', default='no')
+                if overwrite:
+                    self.write_to_file(out_file)
+                else:
+                    logging.info(f'In accordance with user input, {out_file} will not be '+
+                            'overwritten')
+            else:
+                self.write_to_file(out_file)
+        else:
+            logging.error(f'Insufficient permissions to write to {out_file}')
+
+    def write_to_file(self, out_file):
+        raise(NotImplementedError)
+
+class YamlDetectorConfig(DetectorConfig):
+    def __init__(self, config_source, validate_yaml_pars=[]):
+        self._validate_yaml_pars = validate_yaml_pars
+        super().__init__(config_source)
+
+    def load_config_file(self):
+        if not os.path.splitext(self._config_file)[1]:
+            if os.path.isfile(f'{self._config_file}.yml'):
+                self._config_file = f'{self._config_file}.yml'
+            if os.path.isfile(f'{self._config_file}.yaml'):
+                self._config_file = f'{self._config_file}.yaml'
+        if not os.path.isfile(self._config_file):
+            logging.error(f'Unable to load {self._config_file}')
+            return(False)
+        with open(self._config_file, 'r') as infile:
+            config = yaml.safe_load(infile)
+        if isinstance(config, dict):
+            return(config)
+        else:
+            logging.error(f'Unable to load {self._config_file} as a dictionary')
+            return(False)
+
+    def validate(self):
+        if not self._validate_yaml_pars:
+            logging.warning('There are no required parameters provided for this detector '+
+                    'configuration')
+            return(True)
+
+        def validate_nested_pars(config, validate_yaml_par):
+            yaml_par_levels = validate_yaml_par.split(':')
+            first_level_par = yaml_par_levels[0]
+            try:
+                first_level_par = int(first_level_par)
+            except:
+                pass
+            try:
+                next_level_config = config[first_level_par]
+                if len(yaml_par_levels) > 1:
+                    next_level_pars = ':'.join(yaml_par_levels[1:])
+                    return(validate_nested_pars(next_level_config, next_level_pars))
+                else:
+                    return(True)
+            except:
+                return(False)
+
+        pars_missing = [p for p in self._validate_yaml_pars 
+                if not validate_nested_pars(self.config, p)]
+        if len(pars_missing) > 0:
+            logging.error(f'Missing item(s) in configuration: {", ".join(pars_missing)}')
+            return(False)
+        else:
+            return(True)
+
+    def write_to_file(self, out_file):
+        with open(out_file, 'w') as outf:
+            yaml.dump(self.config, outf)
+
+            
+class TomoDetectorConfig(YamlDetectorConfig):
+    def __init__(self, config_source):
+        validate_yaml_pars = ['detector',
+                              'lens_magnification',
+                              'detector:pixels:rows',
+                              'detector:pixels:columns',
+                              *[f'detector:pixels:size:{i}' for i in range(2)]]
+        super().__init__(config_source, validate_yaml_pars=validate_yaml_pars)
+
+    @property
+    @cache
+    def lens_magnification(self):
+        lens_magnification = self.config.get('lens_magnification')
+        if not isinstance(lens_magnification, (int, float)) or lens_magnification <= 0.:
+            illegal_value(lens_magnification, 'lens_magnification', 'detector file')
+            logging.warning('Using default lens_magnification value of 1.0')
+            return(1.0)
+        else:
+            return(lens_magnification)
+
+    @property
+    @cache
+    def pixel_size(self):
+        pixel_size = self.config['detector'].get('pixels').get('size')
+        if isinstance(pixel_size, (int, float)):
+            if pixel_size <= 0.:
+                illegal_value(pixel_size, 'pixel_size', 'detector file')
+                return(None)
+            pixel_size /= self.lens_magnification
+        elif isinstance(pixel_size, list):
+            if ((len(pixel_size) > 2) or
+                    (len(pixel_size) == 2 and pixel_size[0] != pixel_size[1])):
+                illegal_value(pixel_size, 'pixel size', 'detector file')
+                return(None)
+            elif not is_num(pixel_size[0], 0.):
+                illegal_value(pixel_size, 'pixel size', 'detector file')
+                return(None)
+            else:
+                pixel_size = pixel_size[0]/self.lens_magnification
+        else:
+            illegal_value(pixel_size, 'pixel size', 'detector file')
+            return(None)
+
+        return(pixel_size)
+
+    @property
+    @cache
+    def dimensions(self):
+        pixels = self.config['detector'].get('pixels')
+        num_rows = pixels.get('rows')
+        if not is_int(num_rows, 1):
+            illegal_value(num_rows, 'rows', 'detector file')
+            return(None)
+        num_columns = pixels.get('columns')
+        if not is_int(num_columns, 1):
+            illegal_value(num_columns, 'columns', 'detector file')
+            return(None)
+        return(num_rows, num_columns)
+
+
+class EDDDetectorConfig(YamlDetectorConfig):
+    def __init__(self, config_source):
+        validate_yaml_pars = ['num_bins',
+                              'max_E',
+                              # 'angle', # KLS leave this out for now -- I think it has to do with the relative geometry of sample, beam, and detector (not a property of the detector on its own), so may not belong here in the DetectorConfig object?
+                              'tth_angle',
+                              'slope',
+                              'intercept']
+        super().__init__(config_source, validate_yaml_pars=validate_yaml_pars)
+
+    @property
+    @cache
+    def num_bins(self):
+        try:
+            num_bins = int(self.config['num_bins'])
+            if num_bins <= 0:
+                raise(ValueError)
+            else:
+                return(num_bins)
+        except:
+            illegal_value(self.config['num_bins'], 'num_bins')
+    @property
+    @cache
+    def max_E(self):
+        try:
+            max_E = float(self.config['max_E'])
+            if max_E <= 0:
+                raise(ValueError)
+            else:
+                return(max_E)
+        except:
+            illegal_value(self.config['max_E'], 'max_E')
+            return(None)
+            
+    @property
+    def bin_energies(self):
+        return(self.slope * np.linspace(0, self.max_E, self.num_bins, endpoint=False) + 
+                self.intercept)
+
+    @property
+    def tth_angle(self):
+        try:
+            return(float(self.config['tth_angle']))
+        except:
+            illegal_value(tth_angle, 'tth_angle')
+            return(None)
+    @tth_angle.setter
+    def tth_angle(self, value):
+        try:
+            self._config['tth_angle'] = float(value)
+        except:
+            illegal_value(value, 'tth_angle')
+
+    @property
+    def slope(self):
+        try:
+            return(float(self.config['slope']))
+        except:
+            illegal_value(slope, 'slope')
+            return(None)
+    @slope.setter
+    def slope(self, value):
+        try:
+            self._config['slope'] = float(value)
+        except:
+            illegal_value(value, 'slope')
+
+    @property
+    def intercept(self):
+        try:
+            return(float(self.config['intercept']))
+        except:
+            illegal_value(intercept, 'intercept')
+            return(None)
+    @intercept.setter
+    def intercept(self, value):
+        try:
+            self._config['intercept'] = float(value)
+        except:
+            illegal_value(value, 'intercept')
+
+
+# class HexrdDetectorConfig(YamlDetectorConfig):
+#     def __init__(self, config_source, detector_names=[]):
+#         self.detector_names = detector_names
+#         validate_yaml_pars_each_detector = [*[f'buffer:{i}' for i in range(2)],
+#                                             'distortion:function_name',
+#                                             *[f'distortion:parameters:{i}' for i in range(6)],
+#                                             'pixels:columns',
+#                                             'pixels:rows',
+#                                             *['pixels:size:%i' % i for i in range(2)],
+#                                             'saturation_level',
+#                                             *[f'transform:tilt:{i}' for i in range(3)],
+#                                             *[f'transform:translation:{i}' for i in range(3)]]
+#         validate_yaml_pars = []
+#         for detector_name in self.detector_names:
+#             validate_yaml_pars += [f'detectors:{detector_name}:{par}' for par in validate_yaml_pars_each_detector]
+
+#         super().__init__(config_source, validate_yaml_pars=validate_yaml_pars)
+
+#     def validate(self):
+#         yaml_valid = YamlDetectorConfig.validate(self)
+#         if not yaml_valid:
+#             return(False)
+#         else:
+#             hedm_instrument = HEDMInstrument(instrument_config=self.config)
+#             for detector_name in self.detector_names:
+#                 if detector_name in hedm_instrument.detectors:
+#                     if isinstance(hedm_instrument.detectors[detector_name], PlanarDetector):
+#                         continue
+#                     else:
+#                         return(False)
+#                 else:
+#                     return(False)
+#             return(True)
+        
+# class SAXSWAXSDetectorConfig(DetectorConfig):
+#     def __init__(self, config_source):
+#         super().__init__(config_source)
+
+#     @property
+#     def ai(self):
+#         return(self.config)
+#     @ai.setter
+#     def ai(self, value):
+#         if isinstance(value, pyFAI.azimuthalIntegrator.AzimuthalIntegrator):
+#             self.config = ai
+#         else:
+#             illegal_value(value, 'azimuthal integrator')
+
+#     # pyFAI will perform its own error-checking for the mask attribute.
+#     mask = property(self.ai.get_mask, self.ai,set_mask)
+
+
+