view build/lib/CHAP/processor.py @ 0:cbbe42422d56 draft

planemo upload for repository https://github.com/CHESSComputing/ChessAnalysisPipeline/tree/galaxy commit 1401a7e1ae007a6bda260d147f9b879e789b73e0-dirty
author kls286
date Tue, 28 Mar 2023 15:07:30 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python
#-*- coding: utf-8 -*-
#pylint: disable=
"""
File       : processor.py
Author     : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: Processor module
"""

# system modules
import argparse
import json
import logging
import sys
from time import time

# local modules
# from pipeline import PipelineObject

class Processor():
    """
    Processor represent generic processor
    """
    def __init__(self):
        """
        Processor constructor
        """
        self.__name__ = self.__class__.__name__
        self.logger = logging.getLogger(self.__name__)
        self.logger.propagate = False

    def process(self, data):
        """
        process data API
        """

        t0 = time()
        self.logger.info(f'Executing "process" with type(data)={type(data)}')

        data = self._process(data)

        self.logger.info(f'Finished "process" in {time()-t0:.3f} seconds\n')

        return(data)

    def _process(self, data):
        # If needed, extract data from a returned value of Reader.read
        if isinstance(data, list):
            if all([isinstance(d,dict) for d in data]):
                data = data[0]['data']
        # process operation is a simple print function
        data += "process part\n"
        # and we return data back to pipeline
        return data


class TFaaSImageProcessor(Processor):
    '''
    A Processor to get predictions from TFaaS inference server.
    '''
    def process(self, data, url, model, verbose=False):
        """
        process data API
        """

        t0 = time()
        self.logger.info(f'Executing "process" with url {url} model {model}')

        data = self._process(data, url, model, verbose)

        self.logger.info(f'Finished "process" in {time()-t0:.3f} seconds\n')

        return(data)

    def _process(self, data, url, model, verbose):
        '''Print and return the input data.

        :param data: Input image data, either file name or actual image data
        :type data: object
        :return: `data`
        :rtype: object
        '''
        from MLaaS.tfaas_client import predictImage
        from pathlib import Path
        self.logger.info(f"input data {type(data)}")
        if isinstance(data, str) and Path(data).is_file():
            imgFile = data
            data = predictImage(url, imgFile, model, verbose)
        else:
            rdict = data[0]
            import requests
            img = rdict['data']
            session = requests.Session()
            rurl = url + '/predict/image'
            payload = dict(model=model)
            files = dict(image=img)
            self.logger.info(f"HTTP request {rurl} with image file and {payload} payload")
            req = session.post(rurl, files=files, data=payload )
            data = req.content
            data = data.decode("utf-8").replace('\n', '')
            self.logger.info(f"HTTP response {data}")

        return(data)

class URLResponseProcessor(Processor):
    def _process(self, data):
        '''Take data returned from URLReader.read and return a decoded version of
        the content.

        :param data: input data (output of URLReader.read)
        :type data: list[dict]
        :return: decoded data contents
        :rtype: object
        '''

        data = data[0]

        content = data['data']
        encoding = data['encoding']

        self.logger.debug(f'Decoding content of type {type(content)} with {encoding}')

        try:
            content = content.decode(encoding)
        except:
            self.logger.warning(f'Failed to decode content of type {type(content)} with {encoding}')

        return(content)

class PrintProcessor(Processor):
    '''A Processor to simply print the input data to stdout and return the
    original input data, unchanged in any way.
    '''

    def _process(self, data):
        '''Print and return the input data.

        :param data: Input data
        :type data: object
        :return: `data`
        :rtype: object
        '''

        print(f'{self.__name__} data :')

        if callable(getattr(data, '_str_tree', None)):
            # If data is likely an NXobject, print its tree representation
            # (since NXobjects' str representations are just their nxname -- not
            # very helpful).
            print(data._str_tree(attrs=True, recursive=True))
        else:
            print(str(data))

        return(data)

class NexusToNumpyProcessor(Processor):
    '''A class to convert the default plottable data in an `NXobject` into an
    `numpy.ndarray`.
    '''

    def _process(self, data):
        '''Return the default plottable data signal in `data` as an
        `numpy.ndarray`.

        :param data: input NeXus structure
        :type data: nexusformat.nexus.tree.NXobject
        :raises ValueError: if `data` has no default plottable data signal
        :return: default plottable data signal in `data`
        :rtype: numpy.ndarray
        '''

        default_data = data.plottable_data

        if default_data is None:
            default_data_path = data.attrs['default']
            default_data = data.get(default_data_path)
        if default_data is None:
            raise(ValueError(f'The structure of {data} contains no default data'))

        default_signal = default_data.attrs.get('signal')
        if default_signal is None:
            raise(ValueError(f'The signal of {default_data} is unknown'))
        default_signal = default_signal.nxdata

        np_data = default_data[default_signal].nxdata

        return(np_data)

class NexusToXarrayProcessor(Processor):
    '''A class to convert the default plottable data in an `NXobject` into an
    `xarray.DataArray`.'''

    def _process(self, data):
        '''Return the default plottable data signal in `data` as an
        `xarray.DataArray`.

        :param data: input NeXus structure
        :type data: nexusformat.nexus.tree.NXobject
        :raises ValueError: if metadata for `xarray` is absen from `data`
        :return: default plottable data signal in `data`
        :rtype: xarray.DataArray
        '''

        from xarray import DataArray

        default_data = data.plottable_data

        if default_data is None:
            default_data_path = data.attrs['default']
            default_data = data.get(default_data_path)
        if default_data is None:
            raise(ValueError(f'The structure of {data} contains no default data'))

        default_signal = default_data.attrs.get('signal')
        if default_signal is None:
            raise(ValueError(f'The signal of {default_data} is unknown'))
        default_signal = default_signal.nxdata

        signal_data = default_data[default_signal].nxdata

        axes = default_data.attrs['axes']
        coords = {}
        for axis_name in axes:
            axis = default_data[axis_name]
            coords[axis_name] = (axis_name,
                                 axis.nxdata,
                                 axis.attrs)

        dims = tuple(axes)

        name = default_signal

        attrs = default_data[default_signal].attrs

        return(DataArray(data=signal_data,
                         coords=coords,
                         dims=dims,
                         name=name,
                         attrs=attrs))

class XarrayToNexusProcessor(Processor):
    '''A class to convert the data in an `xarray` structure to an
    `nexusformat.nexus.NXdata`.
    '''

    def _process(self, data):
        '''Return `data` represented as an `nexusformat.nexus.NXdata`.

        :param data: The input `xarray` structure
        :type data: typing.Union[xarray.DataArray, xarray.Dataset]
        :return: The data and metadata in `data`
        :rtype: nexusformat.nexus.NXdata
        '''

        from nexusformat.nexus import NXdata, NXfield

        signal = NXfield(value=data.data, name=data.name, attrs=data.attrs)

        axes = []
        for name, coord in data.coords.items():
            axes.append(NXfield(value=coord.data, name=name, attrs=coord.attrs))
        axes = tuple(axes)

        return(NXdata(signal=signal, axes=axes))

class XarrayToNumpyProcessor(Processor):
    '''A class to convert the data in an `xarray.DataArray` structure to an
    `numpy.ndarray`.
    '''

    def _process(self, data):
        '''Return just the signal values contained in `data`.

        :param data: The input `xarray.DataArray`
        :type data: xarray.DataArray
        :return: The data in `data`
        :rtype: numpy.ndarray
        '''

        return(data.data)

class MapProcessor(Processor):
    '''Class representing a process that takes a map configuration and returns a
    `nexusformat.nexus.NXentry` representing that map's metadata and any
    scalar-valued raw data requseted by the supplied map configuration.
    '''

    def _process(self, data):
        '''Process the output of a `Reader` that contains a map configuration and
        return a `nexusformat.nexus.NXentry` representing the map.

        :param data: Result of `Reader.read` where at least one item has the
            value `'MapConfig'` for the `'schema'` key.
        :type data: list[dict[str,object]]
        :return: Map data & metadata (SPEC only, no detector)
        :rtype: nexusformat.nexus.NXentry
        '''

        map_config = self.get_map_config(data)
        nxentry = self.__class__.get_nxentry(map_config)

        return(nxentry)

    def get_map_config(self, data):
        '''Get an instance of `MapConfig` from a returned value of `Reader.read`

        :param data: Result of `Reader.read` where at least one item has the
            value `'MapConfig'` for the `'schema'` key.
        :type data: list[dict[str,object]]
        :raises Exception: If a valid `MapConfig` cannot be constructed from `data`.
        :return: a valid instance of `MapConfig` with field values taken from `data`.
        :rtype: MapConfig
        '''

        from CHAP.models.map import MapConfig

        map_config = False
        if isinstance(data, list):
            for item in data:
                if isinstance(item, dict):
                    if item.get('schema') == 'MapConfig':
                        map_config = item.get('data')
                        break

        if not map_config:
            raise(ValueError('No map configuration found'))

        return(MapConfig(**map_config))
        
    @staticmethod
    def get_nxentry(map_config):
        '''Use a `MapConfig` to construct a `nexusformat.nexus.NXentry`

        :param map_config: a valid map configuration
        :type map_config: MapConfig
        :return: the map's data and metadata contained in a NeXus structure
        :rtype: nexusformat.nexus.NXentry
        '''

        from nexusformat.nexus import (NXcollection,
                                       NXdata,
                                       NXentry,
                                       NXfield,
                                       NXsample)
        import numpy as np

        nxentry = NXentry(name=map_config.title)

        nxentry.map_config = json.dumps(map_config.dict())

        nxentry[map_config.sample.name] = NXsample(**map_config.sample.dict())

        nxentry.attrs['station'] = map_config.station
        
        nxentry.spec_scans = NXcollection()
        for scans in map_config.spec_scans:
            nxentry.spec_scans[scans.scanparsers[0].scan_name] = \
                NXfield(value=scans.scan_numbers,
                        dtype='int8',
                        attrs={'spec_file':str(scans.spec_file)})

        nxentry.data = NXdata()
        nxentry.data.attrs['axes'] = map_config.dims
        for i,dim in enumerate(map_config.independent_dimensions[::-1]):
            nxentry.data[dim.label] = NXfield(value=map_config.coords[dim.label],
                                              units=dim.units,
                                              attrs={'long_name': f'{dim.label} ({dim.units})', 
                                                     'data_type': dim.data_type,
                                                     'local_name': dim.name})
            nxentry.data.attrs[f'{dim.label}_indices'] = i

        signal = False
        auxilliary_signals = []
        for data in map_config.all_scalar_data:
            nxentry.data[data.label] = NXfield(value=np.empty(map_config.shape),
                                               units=data.units,
                                               attrs={'long_name': f'{data.label} ({data.units})',
                                                      'data_type': data.data_type,
                                                      'local_name': data.name})
            if not signal:
                signal = data.label
            else:
                auxilliary_signals.append(data.label)

        if signal:
            nxentry.data.attrs['signal'] = signal
            nxentry.data.attrs['auxilliary_signals'] = auxilliary_signals

        for scans in map_config.spec_scans:
            for scan_number in scans.scan_numbers:
                scanparser = scans.get_scanparser(scan_number)
                for scan_step_index in range(scanparser.spec_scan_npts):
                    map_index = scans.get_index(scan_number, scan_step_index, map_config)
                    for data in map_config.all_scalar_data:
                        nxentry.data[data.label][map_index] = data.get_value(scans, scan_number, scan_step_index)

        return(nxentry)

class IntegrationProcessor(Processor):
    '''Class for integrating 2D detector data
    '''

    def _process(self, data):
        '''Integrate the input data with the integration method and keyword
        arguments supplied and return the results.

        :param data: input data, including raw data, integration method, and
            keyword args for the integration method.
        :type data: tuple[typing.Union[numpy.ndarray, list[numpy.ndarray]],
                          callable,
                          dict]
        :param integration_method: the method of a
            `pyFAI.azimuthalIntegrator.AzimuthalIntegrator` or
            `pyFAI.multi_geometry.MultiGeometry` that returns the desired
            integration results.
        :return: integrated raw data
        :rtype: pyFAI.containers.IntegrateResult
        '''

        detector_data, integration_method, integration_kwargs = data

        return(integration_method(detector_data, **integration_kwargs))

class IntegrateMapProcessor(Processor):
    '''Class representing a process that takes a map and integration
    configuration and returns a `nexusformat.nexus.NXprocess` containing a map of
    the integrated detector data requested.
    '''

    def _process(self, data):
        '''Process the output of a `Reader` that contains a map and integration
        configuration and return a `nexusformat.nexus.NXprocess` containing a map
        of the integrated detector data requested

        :param data: Result of `Reader.read` where at least one item has the
            value `'MapConfig'` for the `'schema'` key, and at least one item has
            the value `'IntegrationConfig'` for the `'schema'` key.
        :type data: list[dict[str,object]]
        :return: integrated data and process metadata
        :rtype: nexusformat.nexus.NXprocess
        '''

        map_config, integration_config = self.get_configs(data)
        nxprocess = self.get_nxprocess(map_config, integration_config)

        return(nxprocess)

    def get_configs(self, data):
        '''Return valid instances of `MapConfig` and `IntegrationConfig` from the
        input supplied by `MultipleReader`.

        :param data: Result of `Reader.read` where at least one item has the
            value `'MapConfig'` for the `'schema'` key, and at least one item has
            the value `'IntegrationConfig'` for the `'schema'` key.
        :type data: list[dict[str,object]]
        :raises ValueError: if `data` cannot be parsed into map and integration configurations.
        :return: valid map and integration configuration objects.
        :rtype: tuple[MapConfig, IntegrationConfig]
        '''

        self.logger.debug('Getting configuration objects')
        t0 = time()

        from CHAP.models.map import MapConfig
        from CHAP.models.integration import IntegrationConfig

        map_config = False
        integration_config = False
        if isinstance(data, list):
            for item in data:
                if isinstance(item, dict):
                    schema = item.get('schema')
                    if schema == 'MapConfig':
                        map_config = item.get('data')
                    elif schema == 'IntegrationConfig':
                        integration_config = item.get('data')

        if not map_config:
            raise(ValueError('No map configuration found'))
        if not integration_config:
            raise(ValueError('No integration configuration found'))

        map_config = MapConfig(**map_config)
        integration_config = IntegrationConfig(**integration_config)

        self.logger.debug(f'Got configuration objects in {time()-t0:.3f} seconds')

        return(map_config, integration_config)

    def get_nxprocess(self, map_config, integration_config):
        '''Use a `MapConfig` and `IntegrationConfig` to construct a
        `nexusformat.nexus.NXprocess`

        :param map_config: a valid map configuration
        :type map_config: MapConfig
        :param integration_config: a valid integration configuration
        :type integration_config" IntegrationConfig
        :return: the integrated detector data and metadata contained in a NeXus
            structure
        :rtype: nexusformat.nexus.NXprocess
        '''

        self.logger.debug('Constructing NXprocess')
        t0 = time()

        from nexusformat.nexus import (NXdata,
                                       NXdetector,
                                       NXfield,
                                       NXprocess)
        import numpy as np
        import pyFAI

        nxprocess = NXprocess(name=integration_config.title)

        nxprocess.map_config = json.dumps(map_config.dict())
        nxprocess.integration_config = json.dumps(integration_config.dict())

        nxprocess.program = 'pyFAI'
        nxprocess.version = pyFAI.version

        for k,v in integration_config.dict().items():
            if k == 'detectors': 
                continue
            nxprocess.attrs[k] = v

        for detector in integration_config.detectors:
            nxprocess[detector.prefix] = NXdetector()
            nxprocess[detector.prefix].local_name = detector.prefix
            nxprocess[detector.prefix].distance = detector.azimuthal_integrator.dist
            nxprocess[detector.prefix].distance.attrs['units'] = 'm'
            nxprocess[detector.prefix].calibration_wavelength = detector.azimuthal_integrator.wavelength
            nxprocess[detector.prefix].calibration_wavelength.attrs['units'] = 'm'
            nxprocess[detector.prefix].attrs['poni_file'] = str(detector.poni_file)
            nxprocess[detector.prefix].attrs['mask_file'] = str(detector.mask_file)
            nxprocess[detector.prefix].raw_data_files = np.full(map_config.shape, '', dtype='|S256')

        nxprocess.data = NXdata()

        nxprocess.data.attrs['axes'] = (*map_config.dims, *integration_config.integrated_data_dims)
        for i,dim in enumerate(map_config.independent_dimensions[::-1]):
            nxprocess.data[dim.label] = NXfield(value=map_config.coords[dim.label],
                                              units=dim.units,
                                              attrs={'long_name': f'{dim.label} ({dim.units})', 
                                                     'data_type': dim.data_type,
                                                     'local_name': dim.name})
            nxprocess.data.attrs[f'{dim.label}_indices'] = i

        for i,(coord_name,coord_values) in enumerate(integration_config.integrated_data_coordinates.items()):
            if coord_name == 'radial':
                type_ = pyFAI.units.RADIAL_UNITS
            elif coord_name == 'azimuthal':
                type_ = pyFAI.units.AZIMUTHAL_UNITS
            coord_units = pyFAI.units.to_unit(getattr(integration_config, f'{coord_name}_units'), type_=type_)
            nxprocess.data[coord_units.name] = coord_values
            nxprocess.data.attrs[f'{coord_units.name}_indices'] = i+len(map_config.coords)
            nxprocess.data[coord_units.name].units = coord_units.unit_symbol
            nxprocess.data[coord_units.name].attrs['long_name'] = coord_units.label

        nxprocess.data.attrs['signal'] = 'I'
        nxprocess.data.I = NXfield(value=np.empty((*tuple([len(coord_values) for coord_name,coord_values in map_config.coords.items()][::-1]), *integration_config.integrated_data_shape)),
                                   units='a.u',
                                   attrs={'long_name':'Intensity (a.u)'})

        integrator = integration_config.get_multi_geometry_integrator()
        if integration_config.integration_type == 'azimuthal':
            integration_method = integrator.integrate1d
            integration_kwargs = {
                'lst_mask': [detector.mask_array for detector in integration_config.detectors],
                'npt': integration_config.radial_npt
            }
        elif integration_config.integration_type == 'cake':
            integration_method = integrator.integrate2d
            integration_kwargs = {
                'lst_mask': [detector.mask_array for detector in integration_config.detectors],
                'npt_rad': integration_config.radial_npt,
                'npt_azim': integration_config.azimuthal_npt,
                'method': 'bbox'
            }

        integration_processor = IntegrationProcessor()
        integration_processor.logger.setLevel(self.logger.getEffectiveLevel())
        integration_processor.logger.addHandler(self.logger.handlers[0])
        lst_args = []
        for scans in map_config.spec_scans:
            for scan_number in scans.scan_numbers:
                scanparser = scans.get_scanparser(scan_number)
                for scan_step_index in range(scanparser.spec_scan_npts):
                    map_index = scans.get_index(scan_number, scan_step_index, map_config)
                    detector_data = scans.get_detector_data(integration_config.detectors, scan_number, scan_step_index)
                    result = integration_processor.process((detector_data, integration_method, integration_kwargs))
                    nxprocess.data.I[map_index] = result.intensity
                    for detector in integration_config.detectors:
                        nxprocess[detector.prefix].raw_data_files[map_index] = scanparser.get_detector_data_file(detector.prefix, scan_step_index)

        self.logger.debug(f'Constructed NXprocess in {time()-t0:.3f} seconds')

        return(nxprocess)

class MCACeriaCalibrationProcessor(Processor):
    '''Class representing the procedure to use a CeO2 scan to obtain tuned values
    for the bragg diffraction angle and linear correction parameters for MCA
    channel energies for an EDD experimental setup.
    '''

    def _process(self, data):
        '''Return tuned values for 2&theta and linear correction parameters for
        the MCA channel energies.

        :param data: input configuration for the raw data & tuning procedure
        :type data: list[dict[str,object]]
        :return: original configuration dictionary with tuned values added
        :rtype: dict[str,float]
        '''

        calibration_config = self.get_config(data)

        tth, slope, intercept = self.calibrate(calibration_config)

        calibration_config.tth_calibrated = tth
        calibration_config.slope_calibrated = slope
        calibration_config.intercept_calibrated = intercept

        return(calibration_config.dict())

    def get_config(self, data):
        '''Get an instance of the configuration object needed by this
        `Processor` from a returned value of `Reader.read`

        :param data: Result of `Reader.read` where at least one item has the
            value `'MCACeriaCalibrationConfig'` for the `'schema'` key.
        :type data: list[dict[str,object]]
        :raises Exception: If a valid config object cannot be constructed from `data`.
        :return: a valid instance of a configuration object with field values
            taken from `data`.
        :rtype: MCACeriaCalibrationConfig
        '''

        from CHAP.models.edd import MCACeriaCalibrationConfig

        calibration_config = False
        if isinstance(data, list):
            for item in data:
                if isinstance(item, dict):
                    if item.get('schema') == 'MCACeriaCalibrationConfig':
                        calibration_config = item.get('data')
                        break

        if not calibration_config:
            raise(ValueError('No MCA ceria calibration configuration found in input data'))

        return(MCACeriaCalibrationConfig(**calibration_config))

    def calibrate(self, calibration_config):
        '''Iteratively calibrate 2&theta by fitting selected peaks of an MCA
        spectrum until the computed strain is sufficiently small. Use the fitted
        peak locations to determine linear correction parameters for the MCA's
        channel energies.

        :param calibration_config: object configuring the CeO2 calibration procedure
        :type calibration_config: MCACeriaCalibrationConfig
        :return: calibrated values of 2&theta and linear correction parameters
            for MCA channel energies : tth, slope, intercept
        :rtype: float, float, float
        '''

        from msnctools.fit import Fit, FitMultipeak
        import numpy as np
        from scipy.constants import physical_constants

        hc = physical_constants['Planck constant in eV/Hz'][0] * \
             physical_constants['speed of light in vacuum'][0] * \
             1e7 # We'll work in keV and A, not eV and m.

        # Collect raw MCA data of interest
        mca_data = calibration_config.mca_data()
        mca_bin_energies = np.arange(0, calibration_config.num_bins) * \
                           (calibration_config.max_energy_kev / calibration_config.num_bins)

        # Mask out the corrected MCA data for fitting
        mca_mask = calibration_config.mca_mask()
        fit_mca_energies = mca_bin_energies[mca_mask]
        fit_mca_intensities = mca_data[mca_mask]

        # Correct raw MCA data for variable flux at different energies
        flux_correct = calibration_config.flux_correction_interpolation_function()
        mca_intensity_weights = flux_correct(fit_mca_energies)
        fit_mca_intensities = fit_mca_intensities / mca_intensity_weights

        # Get the HKLs and lattice spacings that will be used for fitting
        tth = calibration_config.tth_initial_guess
        fit_hkls, fit_ds = calibration_config.fit_ds()
        c_1 = fit_hkls[:,0]**2 + fit_hkls[:,1]**2 + fit_hkls[:,2]**2

        for iter_i in range(calibration_config.max_iter):

            ### Perform the uniform fit first ###

            # Get expected peak energy locations for this iteration's starting
            # value of tth
            fit_lambda = 2.0 * fit_ds * np.sin(0.5*np.radians(tth))
            fit_E0 = hc / fit_lambda

            # Run the uniform fit
            best_fit, residual, best_values, best_errors, redchi, success = \
                FitMultipeak.fit_multipeak(fit_mca_intensities,
                                           fit_E0,
                                           x=fit_mca_energies,
                                           fit_type='uniform')

            # Extract values of interest from the best values for the uniform fit
            # parameters
            uniform_fit_centers = [best_values[f'peak{i+1}_center'] for i in range(len(calibration_config.fit_hkls))]
            # uniform_a = best_values['scale_factor']
            # uniform_strain = np.log(uniform_a / calibration_config.lattice_parameter_angstrom)
            # uniform_tth = tth * (1.0 + uniform_strain)
            # uniform_rel_rms_error = np.linalg.norm(residual) / np.linalg.norm(fit_mca_intensities)

            ### Next, perform the unconstrained fit ###

            # Use the peak locations found in the uniform fit as the initial
            # guesses for peak locations in the unconstrained fit
            best_fit, residual, best_values, best_errors, redchi, success = \
                FitMultipeak.fit_multipeak(fit_mca_intensities,
                                           uniform_fit_centers,
                                           x=fit_mca_energies,
                                           fit_type='unconstrained')

            # Extract values of interest from the best values for the
            # unconstrained fit parameters
            unconstrained_fit_centers = np.array([best_values[f'peak{i+1}_center'] for i in range(len(calibration_config.fit_hkls))])
            unconstrained_a = 0.5 * hc * np.sqrt(c_1) / (unconstrained_fit_centers * abs(np.sin(0.5*np.radians(tth))))
            unconstrained_strains = np.log(unconstrained_a / calibration_config.lattice_parameter_angstrom)
            unconstrained_strain = np.mean(unconstrained_strains)
            unconstrained_tth = tth * (1.0 + unconstrained_strain)
            # unconstrained_rel_rms_error = np.linalg.norm(residual) / np.linalg.norm(fit_mca_intensities)


            # Update tth for the next iteration of tuning
            prev_tth = tth
            tth = unconstrained_tth

            # Stop tuning tth at this iteration if differences are small enough
            if abs(tth - prev_tth) < calibration_config.tune_tth_tol:
                break

        # Fit line to expected / computed peak locations from the last
        # unconstrained fit.
        fit = Fit.fit_data(fit_E0,'linear', x=unconstrained_fit_centers, nan_policy='omit')
        slope = fit.best_values['slope']
        intercept = fit.best_values['intercept']

        return(float(tth), float(slope), float(intercept))

class MCADataProcessor(Processor):
    '''Class representing a process to return data from a MCA, restuctured to
    incorporate the shape & metadata associated with a map configuration to
    which the MCA data belongs, and linearly transformed according to the
    results of a ceria calibration.
    '''

    def _process(self, data):
        '''Process configurations for a map and MCA detector(s), and return the
        raw MCA data collected over the map.

        :param data: input map configuration and results of ceria calibration
        :type data: list[dict[str,object]]
        :return: calibrated and flux-corrected MCA data
        :rtype: nexusformat.nexus.NXentry
        '''

        map_config, calibration_config = self.get_configs(data)
        nxroot = self.get_nxroot(map_config, calibration_config)

        return(nxroot)

    def get_configs(self, data):
        '''Get instances of the configuration objects needed by this
        `Processor` from a returned value of `Reader.read`

        :param data: Result of `Reader.read` where at least one item has the
            value `'MapConfig'` for the `'schema'` key, and at least one item has
            the value `'MCACeriaCalibrationConfig'` for the `'schema'` key.
        :type data: list[dict[str,object]]
        :raises Exception: If valid config objects cannot be constructed from `data`.
        :return: valid instances of the configuration objects with field values
            taken from `data`.
        :rtype: tuple[MapConfig, MCACeriaCalibrationConfig]
        '''

        from CHAP.models.map import MapConfig
        from CHAP.models.edd import MCACeriaCalibrationConfig

        map_config = False
        calibration_config = False
        if isinstance(data, list):
            for item in data:
                if isinstance(item, dict):
                    schema = item.get('schema')
                    if schema == 'MapConfig':
                        map_config = item.get('data')
                    elif schema == 'MCACeriaCalibrationConfig':
                        calibration_config = item.get('data')

        if not map_config:
            raise(ValueError('No map configuration found in input data'))
        if not calibration_config:
            raise(ValueError('No MCA ceria calibration configuration found in input data'))

        return(MapConfig(**map_config), MCACeriaCalibrationConfig(**calibration_config))

    def get_nxroot(self, map_config, calibration_config):
        '''Get a map of the MCA data collected by the scans in `map_config`. The
        MCA data will be calibrated and flux-corrected according to the
        parameters included in `calibration_config`. The data will be returned
        along with relevant metadata in the form of a NeXus structure.

        :param map_config: the map configuration
        :type map_config: MapConfig
        :param calibration_config: the calibration configuration
        :type calibration_config: MCACeriaCalibrationConfig
        :return: a map of the calibrated and flux-corrected MCA data
        :rtype: nexusformat.nexus.NXroot
        '''

        from nexusformat.nexus import (NXdata,
                                       NXdetector,
                                       NXentry,
                                       NXinstrument,
                                       NXroot)
        import numpy as np

        nxroot = NXroot()

        nxroot[map_config.title] = MapProcessor.get_nxentry(map_config)
        nxentry = nxroot[map_config.title]

        nxentry.instrument = NXinstrument()
        nxentry.instrument.detector = NXdetector()
        nxentry.instrument.detector.calibration_configuration = json.dumps(calibration_config.dict())

        nxentry.instrument.detector.data = NXdata()
        nxdata = nxentry.instrument.detector.data
        nxdata.raw = np.empty((*map_config.shape, calibration_config.num_bins))
        nxdata.raw.attrs['units'] = 'counts'
        nxdata.channel_energy = calibration_config.slope_calibrated * \
                                np.arange(0, calibration_config.num_bins) * \
                                (calibration_config.max_energy_kev / calibration_config.num_bins) + \
                                calibration_config.intercept_calibrated
        nxdata.channel_energy.attrs['units'] = 'keV'

        for scans in map_config.spec_scans:
            for scan_number in scans.scan_numbers:
                scanparser = scans.get_scanparser(scan_number)
                for scan_step_index in range(scanparser.spec_scan_npts):
                    map_index = scans.get_index(scan_number, scan_step_index, map_config)
                    nxdata.raw[map_index] = scanparser.get_detector_data(calibration_config.detector_name, scan_step_index)

        nxentry.data.makelink(nxdata.raw, name=calibration_config.detector_name)
        nxentry.data.makelink(nxdata.channel_energy, name=f'{calibration_config.detector_name}_channel_energy')
        if isinstance(nxentry.data.attrs['axes'], str):
            nxentry.data.attrs['axes'] = [nxentry.data.attrs['axes'], f'{calibration_config.detector_name}_channel_energy']
        else:
            nxentry.data.attrs['axes'] += [f'{calibration_config.detector_name}_channel_energy']
        nxentry.data.attrs['signal'] = calibration_config.detector_name

        return(nxroot)

class StrainAnalysisProcessor(Processor):
    '''Class representing a process to compute a map of sample strains by fitting
    bragg peaks in 1D detector data and analyzing the difference between measured
    peak locations and expected peak locations for the sample measured.
    '''

    def _process(self, data):
        '''Process the input map detector data & configuration for the strain
        analysis procedure, and return a map of sample strains.

        :param data: results of `MutlipleReader.read` containing input map
            detector data and strain analysis configuration
        :type data: dict[list[str,object]]
        :return: map of sample strains
        :rtype: xarray.Dataset
        '''

        strain_analysis_config = self.get_config(data)

        return(data)

    def get_config(self, data):
        '''Get instances of the configuration objects needed by this
        `Processor` from a returned value of `Reader.read`

        :param data: Result of `Reader.read` where at least one item has the
            value `'StrainAnalysisConfig'` for the `'schema'` key.
        :type data: list[dict[str,object]]
        :raises Exception: If valid config objects cannot be constructed from `data`.
        :return: valid instances of the configuration objects with field values
            taken from `data`.
        :rtype: StrainAnalysisConfig
        '''

        strain_analysis_config = False
        if isinstance(data, list):
            for item in data:
                if isinstance(item, dict):
                    schema = item.get('schema')
                    if item.get('schema') == 'StrainAnalysisConfig':
                        strain_analysis_config = item.get('data')

        if not strain_analysis_config:
            raise(ValueError('No strain analysis configuration found in input data'))

        return(strain_analysis_config)


class OptionParser():
    '''User based option parser'''
    def __init__(self):
        self.parser = argparse.ArgumentParser(prog='PROG')
        self.parser.add_argument("--data", action="store",
            dest="data", default="", help="Input data")
        self.parser.add_argument("--processor", action="store",
            dest="processor", default="Processor", help="Processor class name")
        self.parser.add_argument('--log-level', choices=logging._nameToLevel.keys(),
            dest='log_level', default='INFO', help='logging level')

def main():
    '''Main function'''
    optmgr  = OptionParser()
    opts = optmgr.parser.parse_args()
    clsName = opts.processor
    try:
        processorCls = getattr(sys.modules[__name__],clsName)
    except:
        print(f'Unsupported processor {clsName}')
        sys.exit(1)

    processor = processorCls()
    processor.logger.setLevel(getattr(logging, opts.log_level))
    log_handler = logging.StreamHandler()
    log_handler.setFormatter(logging.Formatter('{name:20}: {message}', style='{'))
    processor.logger.addHandler(log_handler)
    data = processor.process(opts.data)

    print(f"Processor {processor} operates on data {data}")

if __name__ == '__main__':
    main()