diff CHAP/models/integration.py @ 0:cbbe42422d56 draft

planemo upload for repository https://github.com/CHESSComputing/ChessAnalysisPipeline/tree/galaxy commit 1401a7e1ae007a6bda260d147f9b879e789b73e0-dirty
author kls286
date Tue, 28 Mar 2023 15:07:30 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CHAP/models/integration.py	Tue Mar 28 15:07:30 2023 +0000
@@ -0,0 +1,515 @@
+import copy
+from functools import cache, lru_cache
+import json
+import logging
+import os
+from time import time
+from typing import Literal, Optional
+
+# from multiprocessing.pool import ThreadPool
+# from nexusformat.nexus import (NXdata,
+#                                NXdetector,
+#                                NXfield,
+#                                NXprocess,
+#                                NXroot)
+import numpy as np
+from pydantic import (BaseModel,
+                      validator,
+                      constr,
+                      conlist,
+                      conint,
+                      confloat,
+                      FilePath)
+#import pyFAI, pyFAI.multi_geometry, pyFAI.units
+from pyFAI import load as pyfai_load
+from pyFAI.multi_geometry import MultiGeometry
+from pyFAI.units import AZIMUTHAL_UNITS, RADIAL_UNITS
+#from pyspec.file.tiff import TiffFile
+
+#from .map import MapConfig, SpecScans
+
+
+class Detector(BaseModel):
+    """
+    Detector class to represent a single detector used in the experiment.
+    
+    :param prefix: Prefix of the detector in the SPEC file.
+    :type prefix: str
+    :param poni_file: Path to the poni file.
+    :type poni_file: str
+    :param mask_file: Optional path to the mask file.
+    :type mask_file: str, optional
+    """
+    prefix: constr(strip_whitespace=True, min_length=1)
+    poni_file: FilePath
+    mask_file: Optional[FilePath]
+    @validator('poni_file', allow_reuse=True)
+    def validate_poni_file(cls, poni_file):
+        """
+        Validate the poni file by checking if it's a valid PONI file.
+        
+        :param poni_file: Path to the poni file.
+        :type poni_file: str
+        :raises ValueError: If poni_file is not a valid PONI file.
+        :returns: Absolute path to the poni file.
+        :rtype: str
+        """
+        poni_file = os.path.abspath(poni_file)
+        try:
+            ai = azimuthal_integrator(poni_file)
+        except:
+            raise(ValueError(f'{poni_file} is not a valid PONI file'))
+        else:
+            return(poni_file)
+    @validator('mask_file', allow_reuse=True)
+    def validate_mask_file(cls, mask_file, values):
+        """
+        Validate the mask file. If a mask file is provided, it checks if it's a valid TIFF file.
+        
+        :param mask_file: Path to the mask file.
+        :type mask_file: str or None
+        :param values: A dictionary of the Detector fields.
+        :type values: dict
+        :raises ValueError: If mask_file is provided and it's not a valid TIFF file.
+        :raises ValueError: If `'poni_file'` is not provided in `values`.
+        :returns: Absolute path to the mask file or None.
+        :rtype: str or None        
+        """
+        if mask_file is None:
+            return(mask_file)
+        else:
+            mask_file = os.path.abspath(mask_file)
+            poni_file = values.get('poni_file')
+            if poni_file is None:
+                raise(ValueError('Cannot validate mask file without a PONI file.'))
+            else:
+                try:
+                    mask_array = get_mask_array(mask_file, poni_file)
+                except BaseException as e:
+                    raise(ValueError(f'Unable to open {mask_file} as a TIFF file'))
+                else:
+                    return(mask_file)
+    @property
+    def azimuthal_integrator(self):
+        return(azimuthal_integrator(self.poni_file))
+    @property
+    def mask_array(self):
+        return(get_mask_array(self.mask_file, self.poni_file))
+
+@cache
+def azimuthal_integrator(poni_file:str):
+    if not isinstance(poni_file, str):
+        poni_file = str(poni_file)
+    return(pyfai_load(poni_file))
+@cache
+def get_mask_array(mask_file:str, poni_file:str):
+    if mask_file is not None:
+        if not isinstance(mask_file, str):
+            mask_file = str(mask_file)
+
+        from pyspec.file.tiff import TiffFile
+        with TiffFile(mask_file) as tiff:
+            mask_array = tiff.asarray()
+    else:
+        mask_array = np.zeros(azimuthal_integrator(poni_file).detector.shape)
+    return(mask_array)
+
+class IntegrationConfig(BaseModel):
+    """
+    Class representing the configuration for a raw detector data integration.
+
+    :ivar tool_type: type of integration tool; always set to "integration"
+    :type tool_type: str, optional
+    :ivar title: title of the integration
+    :type title: str
+    :ivar integration_type: type of integration, one of "azimuthal", "radial", or "cake"
+    :type integration_type: str
+    :ivar detectors: list of detectors used in the integration
+    :type detectors: List[Detector]
+    :ivar radial_units: radial units for the integration, defaults to `'q_A^-1'`
+    :type radial_units: str, optional
+    :ivar radial_min: minimum radial value for the integration range
+    :type radial_min: float, optional
+    :ivar radial_max: maximum radial value for the integration range
+    :type radial_max: float, optional
+    :ivar radial_npt: number of points in the radial range for the integration
+    :type radial_npt: int, optional
+    :ivar azimuthal_units: azimuthal units for the integration
+    :type azimuthal_units: str, optional
+    :ivar azimuthal_min: minimum azimuthal value for the integration range
+    :type azimuthal_min: float, optional
+    :ivar azimuthal_max: maximum azimuthal value for the integration range
+    :type azimuthal_max: float, optional
+    :ivar azimuthal_npt: number of points in the azimuthal range for the integration
+    :type azimuthal_npt: int, optional
+    :ivar error_model: error model for the integration, one of "poisson" or "azimuthal"
+    :type error_model: str, optional
+    """
+    tool_type: Literal['integration'] = 'integration'
+    title: constr(strip_whitespace=True, min_length=1)
+    integration_type: Literal['azimuthal', 'radial', 'cake']
+    detectors: conlist(item_type=Detector, min_items=1)
+    radial_units: str = 'q_A^-1'
+    radial_min: confloat(ge=0)
+    radial_max: confloat(gt=0)
+    radial_npt: conint(gt=0) = 1800
+    azimuthal_units: str = 'chi_deg'
+    azimuthal_min: confloat(ge=-180) = -180
+    azimuthal_max: confloat(le=360) = 180
+    azimuthal_npt: conint(gt=0) = 3600
+    error_model: Optional[Literal['poisson', 'azimuthal']]
+    sequence_index: Optional[conint(gt=0)]
+    @validator('radial_units', allow_reuse=True)
+    def validate_radial_units(cls, radial_units):
+        """
+        Validate the radial units for the integration.
+
+        :param radial_units: unvalidated radial units for the integration
+        :type radial_units: str
+        :raises ValueError: if radial units are not one of the recognized radial units
+        :return: validated radial units
+        :rtype: str
+        """
+        if radial_units in RADIAL_UNITS.keys():
+            return(radial_units)
+        else:
+            raise(ValueError(f'Invalid radial units: {radial_units}. Must be one of {", ".join(RADIAL_UNITS.keys())}'))
+    @validator('azimuthal_units', allow_reuse=True)
+    def validate_azimuthal_units(cls, azimuthal_units):
+        """
+        Validate that `azimuthal_units` is one of the keys in the
+        `pyFAI.units.AZIMUTHAL_UNITS` dictionary.
+
+        :param azimuthal_units: The string representing the unit to be validated.
+        :type azimuthal_units: str
+        :raises ValueError: If `azimuthal_units` is not one of the keys in `pyFAI.units.AZIMUTHAL_UNITS`
+        :return: The original supplied value, if is one of the keys in `pyFAI.units.AZIMUTHAL_UNITS`.
+        :rtype: str
+        """
+        if azimuthal_units in AZIMUTHAL_UNITS.keys():
+            return(azimuthal_units)
+        else:
+            raise(ValueError(f'Invalid azimuthal units: {azimuthal_units}. Must be one of {", ".join(AZIMUTHAL_UNITS.keys())}'))
+    def validate_range_max(range_name:str):
+        """Validate the maximum value of an integration range.
+
+        :param range_name: The name of the integration range (e.g. radial, azimuthal).
+        :type range_name: str
+        :return: The callable that performs the validation.
+        :rtype: callable
+        """
+        def _validate_range_max(cls, range_max, values):
+            """Check if the maximum value of the integration range is greater than its minimum value.
+
+            :param range_max: The maximum value of the integration range.
+            :type range_max: float
+            :param values: The values of the other fields being validated.
+            :type values: dict
+            :raises ValueError: If the maximum value of the integration range is not greater than its minimum value.
+            :return: The validated maximum range value
+            :rtype: float
+            """
+            range_min = values.get(f'{range_name}_min')
+            if range_min < range_max:
+                return(range_max)
+            else:
+                raise(ValueError(f'Maximum value of integration range must be greater than minimum value of integration range ({range_name}_min={range_min}).'))
+        return(_validate_range_max)
+    _validate_radial_max = validator('radial_max', allow_reuse=True)(validate_range_max('radial'))
+    _validate_azimuthal_max = validator('azimuthal_max', allow_reuse=True)(validate_range_max('azimuthal'))        
+    def validate_for_map_config(self, map_config:BaseModel):
+        """
+        Validate the existence of the detector data file for all scan points in `map_config`.
+        
+        :param map_config: The `MapConfig` instance to validate against.
+        :type map_config: MapConfig
+        :raises RuntimeError: If a detector data file could not be found for a scan point occurring in `map_config`.
+        :return: None
+        :rtype: None
+        """
+        for detector in self.detectors:
+            for scans in map_config.spec_scans:
+                for scan_number in scans.scan_numbers:
+                    scanparser = scans.get_scanparser(scan_number)
+                    for scan_step_index in range(scanparser.spec_scan_npts):
+                        # Make sure the detector data file exists for all scan points
+                        try:
+                            detector_data_file = scanparser.get_detector_data_file(detector.prefix, scan_step_index)
+                        except:
+                            raise(RuntimeError(f'Could not find data file for detector prefix {detector.prefix} on scan number {scan_number} in spec file {scans.spec_file}'))
+    def get_azimuthal_adjustments(self):
+        """To enable a continuous range of integration in the azimuthal direction
+        for radial and cake integration, obtain adjusted values for this 
+        `IntegrationConfig`'s `azimuthal_min` and `azimuthal_max` values, the
+        angle amount by which those values were adjusted, and the proper location
+        of the discontinuity in the azimuthal direction.
+
+        :return: Adjusted chi_min, adjusted chi_max, chi_offset, chi_discontinuity
+        :rtype: tuple[float,float,float,float]
+        """
+        return(get_azimuthal_adjustments(self.azimuthal_min, self.azimuthal_max))
+    def get_azimuthal_integrators(self):
+        """Get a list of `AzimuthalIntegrator`s that correspond to the detector
+        configurations in this instance of `IntegrationConfig`.
+
+        The returned `AzimuthalIntegrator`s are (if need be) artificially rotated
+        in the azimuthal direction to achieve a continuous range of integration
+        in the azimuthal direction.
+
+        :returns: A list of `AzimuthalIntegrator`s appropriate for use by this
+            `IntegrationConfig` tool
+        :rtype: list[pyFAI.azimuthalIntegrator.AzimuthalIntegrator]
+        """
+        chi_min, chi_max, chi_offset, chi_disc = self.get_azimuthal_adjustments()
+        return(get_azimuthal_integrators(tuple([detector.poni_file for detector in self.detectors]), chi_offset=chi_offset))        
+    def get_multi_geometry_integrator(self):
+        """Get a `MultiGeometry` integrator suitable for use by this instance of
+        `IntegrationConfig`.
+
+        :return: A `MultiGeometry` integrator
+        :rtype: pyFAI.multi_geometry.MultiGeometry
+        """
+        poni_files = tuple([detector.poni_file for detector in self.detectors])
+        radial_range = (self.radial_min, self.radial_max)
+        azimuthal_range = (self.azimuthal_min, self.azimuthal_max)
+        return(get_multi_geometry_integrator(poni_files, self.radial_units, radial_range, azimuthal_range))
+    def get_azimuthally_integrated_data(self, spec_scans:BaseModel, scan_number:int, scan_step_index:int):
+        """Return azimuthally-integrated data for the scan step specified.
+        
+        :param spec_scans: An instance of `SpecScans` containing the scan step requested.
+        :type spec_scans: SpecScans
+        :param scan_number: The number of the scan containing the scan step requested.
+        :type scan_number: int
+        :param scan_step_index: The index of the scan step requested.
+        :type scan_step_index: int
+        :return: A 1D array of azimuthally-integrated raw detector intensities.
+        :rtype: np.ndarray
+        """
+        detector_data = spec_scans.get_detector_data(self.detectors, scan_number, scan_step_index)
+        integrator = self.get_multi_geometry_integrator()
+        lst_mask = [detector.mask_array for detector in self.detectors]
+        result = integrator.integrate1d(detector_data, lst_mask=lst_mask, npt=self.radial_npt, error_model=self.error_model)
+        if result.sigma is None:
+            return(result.intensity)
+        else:
+            return(result.intensity, result.sigma)
+    def get_radially_integrated_data(self, spec_scans:BaseModel, scan_number:int, scan_step_index:int):
+        """Return radially-integrated data for the scan step specified.
+        
+        :param spec_scans: An instance of `SpecScans` containing the scan step requested.
+        :type spec_scans: SpecScans
+        :param scan_number: The number of the scan containing the scan step requested.
+        :type scan_number: int
+        :param scan_step_index: The index of the scan step requested.
+        :type scan_step_index: int
+        :return: A 1D array of radially-integrated raw detector intensities.
+        :rtype: np.ndarray
+        """
+        # Handle idiosyncracies of azimuthal ranges in pyFAI
+        # Adjust chi ranges to get a continuous range of iintegrated data
+        chi_min, chi_max, chi_offset, chi_disc = self.get_azimuthal_adjustments()
+        # Perform radial integration on a detector-by-detector basis.
+        I_each_detector = []
+        variance_each_detector = []
+        integrators = self.get_azimuthal_integrators()
+        for i,(integrator,detector) in enumerate(zip(integrators,self.detectors)):
+            detector_data = spec_scans.get_detector_data([detector], scan_number, scan_step_index)[0]
+            result = integrator.integrate_radial(detector_data, self.azimuthal_npt,
+                                                 unit=self.azimuthal_units, azimuth_range=(chi_min,chi_max),
+                                                 radial_unit=self.radial_units, radial_range=(self.radial_min,self.radial_max),
+                                                 mask=detector.mask_array) #, error_model=self.error_model)
+            I_each_detector.append(result.intensity)
+            if result.sigma is not None:
+                variance_each_detector.append(result.sigma**2)
+        # Add the individual detectors' integrated intensities together
+        I = np.nansum(I_each_detector, axis=0)
+        # Ignore data at values of chi for which there was no data
+        I = np.where(I==0, np.nan, I)
+        if len(I_each_detector) != len(variance_each_detector):
+            return(I)
+        else:
+            # Get the standard deviation of the summed detectors' intensities
+            sigma = np.sqrt(np.nansum(variance_each_detector, axis=0))
+            return(I, sigma)
+    def get_cake_integrated_data(self, spec_scans:BaseModel, scan_number:int, scan_step_index:int):
+        """Return cake-integrated data for the scan step specified.
+        
+        :param spec_scans: An instance of `SpecScans` containing the scan step requested.
+        :type spec_scans: SpecScans
+        :param scan_number: The number of the scan containing the scan step requested.
+        :type scan_number: int
+        :param scan_step_index: The index of the scan step requested.
+        :type scan_step_index: int
+        :return: A 2D array of cake-integrated raw detector intensities.
+        :rtype: np.ndarray
+        """
+        detector_data = spec_scans.get_detector_data(self.detectors, scan_number, scan_step_index)
+        integrator = self.get_multi_geometry_integrator()
+        lst_mask = [detector.mask_array for detector in self.detectors]
+        result = integrator.integrate2d(detector_data, lst_mask=lst_mask,
+                                        npt_rad=self.radial_npt, npt_azim=self.azimuthal_npt,
+                                        method='bbox',
+                                        error_model=self.error_model)
+        if result.sigma is None:
+            return(result.intensity)
+        else:
+            return(result.intensity, result.sigma)
+    def get_integrated_data(self, spec_scans:BaseModel, scan_number:int, scan_step_index:int):
+        """Return integrated data for the scan step specified.
+        
+        :param spec_scans: An instance of `SpecScans` containing the scan step requested.
+        :type spec_scans: SpecScans
+        :param scan_number: The number of the scan containing the scan step requested.
+        :type scan_number: int
+        :param scan_step_index: The index of the scan step requested.
+        :type scan_step_index: int
+        :return: An array of integrated raw detector intensities.
+        :rtype: np.ndarray
+        """
+        if self.integration_type == 'azimuthal':
+            return(self.get_azimuthally_integrated_data(spec_scans, scan_number, scan_step_index))
+        elif self.integration_type == 'radial':
+            return(self.get_radially_integrated_data(spec_scans, scan_number, scan_step_index))
+        elif self.integration_type == 'cake':
+            return(self.get_cake_integrated_data(spec_scans, scan_number, scan_step_index))
+
+    @property
+    def integrated_data_coordinates(self):
+        """
+        Return a dictionary of coordinate arrays for navigating the dimension(s)
+        of the integrated data produced by this instance of `IntegrationConfig`.
+        
+        :return: A dictionary with either one or two keys: 'azimuthal' and/or
+            'radial', each of which points to a 1-D `numpy` array of coordinate
+            values.
+        :rtype: dict[str,np.ndarray]
+        """
+        if self.integration_type == 'azimuthal':
+            return(get_integrated_data_coordinates(radial_range=(self.radial_min,self.radial_max),
+                                                   radial_npt=self.radial_npt))
+        elif self.integration_type == 'radial':
+            return(get_integrated_data_coordinates(azimuthal_range=(self.azimuthal_min,self.azimuthal_max),
+                                                   azimuthal_npt=self.azimuthal_npt))
+        elif self.integration_type == 'cake':
+            return(get_integrated_data_coordinates(radial_range=(self.radial_min,self.radial_max),
+                                                   radial_npt=self.radial_npt,
+                                                   azimuthal_range=(self.azimuthal_min,self.azimuthal_max),
+                                                   azimuthal_npt=self.azimuthal_npt))
+    @property
+    def integrated_data_dims(self):
+        """Return a tuple of the coordinate labels for the integrated data
+        produced by this instance of `IntegrationConfig`.
+        """
+        directions = list(self.integrated_data_coordinates.keys())
+        dim_names = [getattr(self, f'{direction}_units') for direction in directions]
+        return(dim_names)
+    @property
+    def integrated_data_shape(self):
+        """Return a tuple representing the shape of the integrated data
+        produced by this instance of `IntegrationConfig` for a single scan step.
+        """
+        return(tuple([len(coordinate_values) for coordinate_name,coordinate_values in self.integrated_data_coordinates.items()]))
+
+@cache
+def get_azimuthal_adjustments(chi_min:float, chi_max:float):
+    """
+    Fix chi discontinuity at 180 degrees and return the adjusted chi range,
+    offset, and discontinuty.
+
+    If the discontinuity is crossed, obtain the offset to artificially rotate
+    detectors to achieve a continuous azimuthal integration range.
+
+    :param chi_min: The minimum value of the azimuthal range.
+    :type chi_min: float
+    :param chi_max: The maximum value of the azimuthal range.
+    :type chi_max: float
+    :return: The following four values: the adjusted minimum value of the
+        azimuthal range, the adjusted maximum value of the azimuthal range, the
+        value by which the chi angle was adjusted, the position of the chi
+        discontinuity.
+    """
+    # Fix chi discontinuity at 180 degrees for now.
+    chi_disc = 180
+    # If the discontinuity is crossed, artificially rotate the detectors to
+    # achieve a continuous azimuthal integration range
+    if chi_min < chi_disc and chi_max > chi_disc:
+        chi_offset = chi_max - chi_disc
+    else:
+        chi_offset = 0
+    return(chi_min-chi_offset, chi_max-chi_offset, chi_offset, chi_disc)
+@cache
+def get_azimuthal_integrators(poni_files:tuple, chi_offset=0):
+    """
+    Return a list of `AzimuthalIntegrator` objects generated from PONI files.
+    
+    :param poni_files: Tuple of strings, each string being a path to a PONI file. : tuple
+    :type poni_files: tuple
+    :param chi_offset: The angle in degrees by which the `AzimuthalIntegrator` objects will be rotated, defaults to 0.
+    :type chi_offset: float, optional
+    :return: List of `AzimuthalIntegrator` objects
+    :rtype: list[pyFAI.azimuthalIntegrator.AzimuthalIntegrator]
+    """
+    ais = []
+    for poni_file in poni_files:
+        ai = copy.deepcopy(azimuthal_integrator(poni_file))
+        ai.rot3 += chi_offset * np.pi/180
+        ais.append(ai)
+    return(ais)
+@cache
+def get_multi_geometry_integrator(poni_files:tuple, radial_unit:str, radial_range:tuple, azimuthal_range:tuple):
+    """Return a `MultiGeometry` instance that can be used for azimuthal or cake
+    integration.
+
+    :param poni_files: Tuple of PONI files that describe the detectors to be
+        integrated.
+    :type poni_files: tuple
+    :param radial_unit: Unit to use for radial integration range.
+    :type radial_unit: str
+    :param radial_range: Tuple describing the range for radial integration.
+    :type radial_range: tuple[float,float]
+    :param azimuthal_range:Tuple describing the range for azimuthal integration.
+    :type azimuthal_range: tuple[float,float]
+    :return: `MultiGeometry` instance that can be used for azimuthal or cake
+        integration.
+    :rtype: pyFAI.multi_geometry.MultiGeometry
+    """
+    chi_min, chi_max, chi_offset, chi_disc = get_azimuthal_adjustments(*azimuthal_range)
+    ais = copy.deepcopy(get_azimuthal_integrators(poni_files, chi_offset=chi_offset))
+    multi_geometry = MultiGeometry(ais,
+                                   unit=radial_unit,
+                                   radial_range=radial_range,
+                                   azimuth_range=(chi_min,chi_max),
+                                   wavelength=sum([ai.wavelength for ai in ais])/len(ais),
+                                   chi_disc=chi_disc)
+    return(multi_geometry)
+@cache
+def get_integrated_data_coordinates(azimuthal_range:tuple=None, azimuthal_npt:int=None, radial_range:tuple=None, radial_npt:int=None):
+    """
+    Return a dictionary of coordinate arrays for the specified radial and/or
+    azimuthal integration ranges.
+        
+    :param azimuthal_range: Tuple specifying the range of azimuthal angles over
+        which to generate coordinates, in the format (min, max), defaults to
+        None.
+    :type azimuthal_range: tuple[float,float], optional
+    :param azimuthal_npt: Number of azimuthal coordinate points to generate,
+        defaults to None.
+    :type azimuthal_npt: int, optional
+    :param radial_range: Tuple specifying the range of radial distances over
+        which to generate coordinates, in the format (min, max), defaults to
+        None.
+    :type radial_range: tuple[float,float], optional
+    :param radial_npt: Number of radial coordinate points to generate, defaults
+        to None.
+    :type radial_npt: int, optional
+    :return: A dictionary with either one or two keys: 'azimuthal' and/or
+        'radial', each of which points to a 1-D `numpy` array of coordinate
+        values.
+    :rtype: dict[str,np.ndarray]
+    """
+    integrated_data_coordinates = {}
+    if azimuthal_range is not None and azimuthal_npt is not None:
+        integrated_data_coordinates['azimuthal'] = np.linspace(*azimuthal_range, azimuthal_npt)
+    if radial_range is not None and radial_npt is not None:
+        integrated_data_coordinates['radial'] = np.linspace(*radial_range, radial_npt)
+    return(integrated_data_coordinates)