Mercurial > repos > kls286 > chap_test_20230328
comparison CHAP/processor.py @ 0:cbbe42422d56 draft
planemo upload for repository https://github.com/CHESSComputing/ChessAnalysisPipeline/tree/galaxy commit 1401a7e1ae007a6bda260d147f9b879e789b73e0-dirty
| author | kls286 | 
|---|---|
| date | Tue, 28 Mar 2023 15:07:30 +0000 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:cbbe42422d56 | 
|---|---|
| 1 #!/usr/bin/env python | |
| 2 #-*- coding: utf-8 -*- | |
| 3 #pylint: disable= | |
| 4 """ | |
| 5 File : processor.py | |
| 6 Author : Valentin Kuznetsov <vkuznet AT gmail dot com> | |
| 7 Description: Processor module | |
| 8 """ | |
| 9 | |
| 10 # system modules | |
| 11 import argparse | |
| 12 import json | |
| 13 import logging | |
| 14 import sys | |
| 15 from time import time | |
| 16 | |
| 17 # local modules | |
| 18 # from pipeline import PipelineObject | |
| 19 | |
| 20 class Processor(): | |
| 21 """ | |
| 22 Processor represent generic processor | |
| 23 """ | |
| 24 def __init__(self): | |
| 25 """ | |
| 26 Processor constructor | |
| 27 """ | |
| 28 self.__name__ = self.__class__.__name__ | |
| 29 self.logger = logging.getLogger(self.__name__) | |
| 30 self.logger.propagate = False | |
| 31 | |
| 32 def process(self, data): | |
| 33 """ | |
| 34 process data API | |
| 35 """ | |
| 36 | |
| 37 t0 = time() | |
| 38 self.logger.info(f'Executing "process" with type(data)={type(data)}') | |
| 39 | |
| 40 data = self._process(data) | |
| 41 | |
| 42 self.logger.info(f'Finished "process" in {time()-t0:.3f} seconds\n') | |
| 43 | |
| 44 return(data) | |
| 45 | |
| 46 def _process(self, data): | |
| 47 # If needed, extract data from a returned value of Reader.read | |
| 48 if isinstance(data, list): | |
| 49 if all([isinstance(d,dict) for d in data]): | |
| 50 data = data[0]['data'] | |
| 51 # process operation is a simple print function | |
| 52 data += "process part\n" | |
| 53 # and we return data back to pipeline | |
| 54 return data | |
| 55 | |
| 56 | |
| 57 class TFaaSImageProcessor(Processor): | |
| 58 ''' | |
| 59 A Processor to get predictions from TFaaS inference server. | |
| 60 ''' | |
| 61 def process(self, data, url, model, verbose=False): | |
| 62 """ | |
| 63 process data API | |
| 64 """ | |
| 65 | |
| 66 t0 = time() | |
| 67 self.logger.info(f'Executing "process" with url {url} model {model}') | |
| 68 | |
| 69 data = self._process(data, url, model, verbose) | |
| 70 | |
| 71 self.logger.info(f'Finished "process" in {time()-t0:.3f} seconds\n') | |
| 72 | |
| 73 return(data) | |
| 74 | |
| 75 def _process(self, data, url, model, verbose): | |
| 76 '''Print and return the input data. | |
| 77 | |
| 78 :param data: Input image data, either file name or actual image data | |
| 79 :type data: object | |
| 80 :return: `data` | |
| 81 :rtype: object | |
| 82 ''' | |
| 83 from MLaaS.tfaas_client import predictImage | |
| 84 from pathlib import Path | |
| 85 self.logger.info(f"input data {type(data)}") | |
| 86 if isinstance(data, str) and Path(data).is_file(): | |
| 87 imgFile = data | |
| 88 data = predictImage(url, imgFile, model, verbose) | |
| 89 else: | |
| 90 rdict = data[0] | |
| 91 import requests | |
| 92 img = rdict['data'] | |
| 93 session = requests.Session() | |
| 94 rurl = url + '/predict/image' | |
| 95 payload = dict(model=model) | |
| 96 files = dict(image=img) | |
| 97 self.logger.info(f"HTTP request {rurl} with image file and {payload} payload") | |
| 98 req = session.post(rurl, files=files, data=payload ) | |
| 99 data = req.content | |
| 100 data = data.decode("utf-8").replace('\n', '') | |
| 101 self.logger.info(f"HTTP response {data}") | |
| 102 | |
| 103 return(data) | |
| 104 | |
| 105 class URLResponseProcessor(Processor): | |
| 106 def _process(self, data): | |
| 107 '''Take data returned from URLReader.read and return a decoded version of | |
| 108 the content. | |
| 109 | |
| 110 :param data: input data (output of URLReader.read) | |
| 111 :type data: list[dict] | |
| 112 :return: decoded data contents | |
| 113 :rtype: object | |
| 114 ''' | |
| 115 | |
| 116 data = data[0] | |
| 117 | |
| 118 content = data['data'] | |
| 119 encoding = data['encoding'] | |
| 120 | |
| 121 self.logger.debug(f'Decoding content of type {type(content)} with {encoding}') | |
| 122 | |
| 123 try: | |
| 124 content = content.decode(encoding) | |
| 125 except: | |
| 126 self.logger.warning(f'Failed to decode content of type {type(content)} with {encoding}') | |
| 127 | |
| 128 return(content) | |
| 129 | |
| 130 class PrintProcessor(Processor): | |
| 131 '''A Processor to simply print the input data to stdout and return the | |
| 132 original input data, unchanged in any way. | |
| 133 ''' | |
| 134 | |
| 135 def _process(self, data): | |
| 136 '''Print and return the input data. | |
| 137 | |
| 138 :param data: Input data | |
| 139 :type data: object | |
| 140 :return: `data` | |
| 141 :rtype: object | |
| 142 ''' | |
| 143 | |
| 144 print(f'{self.__name__} data :') | |
| 145 | |
| 146 if callable(getattr(data, '_str_tree', None)): | |
| 147 # If data is likely an NXobject, print its tree representation | |
| 148 # (since NXobjects' str representations are just their nxname -- not | |
| 149 # very helpful). | |
| 150 print(data._str_tree(attrs=True, recursive=True)) | |
| 151 else: | |
| 152 print(str(data)) | |
| 153 | |
| 154 return(data) | |
| 155 | |
| 156 class NexusToNumpyProcessor(Processor): | |
| 157 '''A class to convert the default plottable data in an `NXobject` into an | |
| 158 `numpy.ndarray`. | |
| 159 ''' | |
| 160 | |
| 161 def _process(self, data): | |
| 162 '''Return the default plottable data signal in `data` as an | |
| 163 `numpy.ndarray`. | |
| 164 | |
| 165 :param data: input NeXus structure | |
| 166 :type data: nexusformat.nexus.tree.NXobject | |
| 167 :raises ValueError: if `data` has no default plottable data signal | |
| 168 :return: default plottable data signal in `data` | |
| 169 :rtype: numpy.ndarray | |
| 170 ''' | |
| 171 | |
| 172 default_data = data.plottable_data | |
| 173 | |
| 174 if default_data is None: | |
| 175 default_data_path = data.attrs['default'] | |
| 176 default_data = data.get(default_data_path) | |
| 177 if default_data is None: | |
| 178 raise(ValueError(f'The structure of {data} contains no default data')) | |
| 179 | |
| 180 default_signal = default_data.attrs.get('signal') | |
| 181 if default_signal is None: | |
| 182 raise(ValueError(f'The signal of {default_data} is unknown')) | |
| 183 default_signal = default_signal.nxdata | |
| 184 | |
| 185 np_data = default_data[default_signal].nxdata | |
| 186 | |
| 187 return(np_data) | |
| 188 | |
| 189 class NexusToXarrayProcessor(Processor): | |
| 190 '''A class to convert the default plottable data in an `NXobject` into an | |
| 191 `xarray.DataArray`.''' | |
| 192 | |
| 193 def _process(self, data): | |
| 194 '''Return the default plottable data signal in `data` as an | |
| 195 `xarray.DataArray`. | |
| 196 | |
| 197 :param data: input NeXus structure | |
| 198 :type data: nexusformat.nexus.tree.NXobject | |
| 199 :raises ValueError: if metadata for `xarray` is absen from `data` | |
| 200 :return: default plottable data signal in `data` | |
| 201 :rtype: xarray.DataArray | |
| 202 ''' | |
| 203 | |
| 204 from xarray import DataArray | |
| 205 | |
| 206 default_data = data.plottable_data | |
| 207 | |
| 208 if default_data is None: | |
| 209 default_data_path = data.attrs['default'] | |
| 210 default_data = data.get(default_data_path) | |
| 211 if default_data is None: | |
| 212 raise(ValueError(f'The structure of {data} contains no default data')) | |
| 213 | |
| 214 default_signal = default_data.attrs.get('signal') | |
| 215 if default_signal is None: | |
| 216 raise(ValueError(f'The signal of {default_data} is unknown')) | |
| 217 default_signal = default_signal.nxdata | |
| 218 | |
| 219 signal_data = default_data[default_signal].nxdata | |
| 220 | |
| 221 axes = default_data.attrs['axes'] | |
| 222 coords = {} | |
| 223 for axis_name in axes: | |
| 224 axis = default_data[axis_name] | |
| 225 coords[axis_name] = (axis_name, | |
| 226 axis.nxdata, | |
| 227 axis.attrs) | |
| 228 | |
| 229 dims = tuple(axes) | |
| 230 | |
| 231 name = default_signal | |
| 232 | |
| 233 attrs = default_data[default_signal].attrs | |
| 234 | |
| 235 return(DataArray(data=signal_data, | |
| 236 coords=coords, | |
| 237 dims=dims, | |
| 238 name=name, | |
| 239 attrs=attrs)) | |
| 240 | |
| 241 class XarrayToNexusProcessor(Processor): | |
| 242 '''A class to convert the data in an `xarray` structure to an | |
| 243 `nexusformat.nexus.NXdata`. | |
| 244 ''' | |
| 245 | |
| 246 def _process(self, data): | |
| 247 '''Return `data` represented as an `nexusformat.nexus.NXdata`. | |
| 248 | |
| 249 :param data: The input `xarray` structure | |
| 250 :type data: typing.Union[xarray.DataArray, xarray.Dataset] | |
| 251 :return: The data and metadata in `data` | |
| 252 :rtype: nexusformat.nexus.NXdata | |
| 253 ''' | |
| 254 | |
| 255 from nexusformat.nexus import NXdata, NXfield | |
| 256 | |
| 257 signal = NXfield(value=data.data, name=data.name, attrs=data.attrs) | |
| 258 | |
| 259 axes = [] | |
| 260 for name, coord in data.coords.items(): | |
| 261 axes.append(NXfield(value=coord.data, name=name, attrs=coord.attrs)) | |
| 262 axes = tuple(axes) | |
| 263 | |
| 264 return(NXdata(signal=signal, axes=axes)) | |
| 265 | |
| 266 class XarrayToNumpyProcessor(Processor): | |
| 267 '''A class to convert the data in an `xarray.DataArray` structure to an | |
| 268 `numpy.ndarray`. | |
| 269 ''' | |
| 270 | |
| 271 def _process(self, data): | |
| 272 '''Return just the signal values contained in `data`. | |
| 273 | |
| 274 :param data: The input `xarray.DataArray` | |
| 275 :type data: xarray.DataArray | |
| 276 :return: The data in `data` | |
| 277 :rtype: numpy.ndarray | |
| 278 ''' | |
| 279 | |
| 280 return(data.data) | |
| 281 | |
| 282 class MapProcessor(Processor): | |
| 283 '''Class representing a process that takes a map configuration and returns a | |
| 284 `nexusformat.nexus.NXentry` representing that map's metadata and any | |
| 285 scalar-valued raw data requseted by the supplied map configuration. | |
| 286 ''' | |
| 287 | |
| 288 def _process(self, data): | |
| 289 '''Process the output of a `Reader` that contains a map configuration and | |
| 290 return a `nexusformat.nexus.NXentry` representing the map. | |
| 291 | |
| 292 :param data: Result of `Reader.read` where at least one item has the | |
| 293 value `'MapConfig'` for the `'schema'` key. | |
| 294 :type data: list[dict[str,object]] | |
| 295 :return: Map data & metadata (SPEC only, no detector) | |
| 296 :rtype: nexusformat.nexus.NXentry | |
| 297 ''' | |
| 298 | |
| 299 map_config = self.get_map_config(data) | |
| 300 nxentry = self.__class__.get_nxentry(map_config) | |
| 301 | |
| 302 return(nxentry) | |
| 303 | |
| 304 def get_map_config(self, data): | |
| 305 '''Get an instance of `MapConfig` from a returned value of `Reader.read` | |
| 306 | |
| 307 :param data: Result of `Reader.read` where at least one item has the | |
| 308 value `'MapConfig'` for the `'schema'` key. | |
| 309 :type data: list[dict[str,object]] | |
| 310 :raises Exception: If a valid `MapConfig` cannot be constructed from `data`. | |
| 311 :return: a valid instance of `MapConfig` with field values taken from `data`. | |
| 312 :rtype: MapConfig | |
| 313 ''' | |
| 314 | |
| 315 from CHAP.models.map import MapConfig | |
| 316 | |
| 317 map_config = False | |
| 318 if isinstance(data, list): | |
| 319 for item in data: | |
| 320 if isinstance(item, dict): | |
| 321 if item.get('schema') == 'MapConfig': | |
| 322 map_config = item.get('data') | |
| 323 break | |
| 324 | |
| 325 if not map_config: | |
| 326 raise(ValueError('No map configuration found')) | |
| 327 | |
| 328 return(MapConfig(**map_config)) | |
| 329 | |
| 330 @staticmethod | |
| 331 def get_nxentry(map_config): | |
| 332 '''Use a `MapConfig` to construct a `nexusformat.nexus.NXentry` | |
| 333 | |
| 334 :param map_config: a valid map configuration | |
| 335 :type map_config: MapConfig | |
| 336 :return: the map's data and metadata contained in a NeXus structure | |
| 337 :rtype: nexusformat.nexus.NXentry | |
| 338 ''' | |
| 339 | |
| 340 from nexusformat.nexus import (NXcollection, | |
| 341 NXdata, | |
| 342 NXentry, | |
| 343 NXfield, | |
| 344 NXsample) | |
| 345 import numpy as np | |
| 346 | |
| 347 nxentry = NXentry(name=map_config.title) | |
| 348 | |
| 349 nxentry.map_config = json.dumps(map_config.dict()) | |
| 350 | |
| 351 nxentry[map_config.sample.name] = NXsample(**map_config.sample.dict()) | |
| 352 | |
| 353 nxentry.attrs['station'] = map_config.station | |
| 354 | |
| 355 nxentry.spec_scans = NXcollection() | |
| 356 for scans in map_config.spec_scans: | |
| 357 nxentry.spec_scans[scans.scanparsers[0].scan_name] = \ | |
| 358 NXfield(value=scans.scan_numbers, | |
| 359 dtype='int8', | |
| 360 attrs={'spec_file':str(scans.spec_file)}) | |
| 361 | |
| 362 nxentry.data = NXdata() | |
| 363 nxentry.data.attrs['axes'] = map_config.dims | |
| 364 for i,dim in enumerate(map_config.independent_dimensions[::-1]): | |
| 365 nxentry.data[dim.label] = NXfield(value=map_config.coords[dim.label], | |
| 366 units=dim.units, | |
| 367 attrs={'long_name': f'{dim.label} ({dim.units})', | |
| 368 'data_type': dim.data_type, | |
| 369 'local_name': dim.name}) | |
| 370 nxentry.data.attrs[f'{dim.label}_indices'] = i | |
| 371 | |
| 372 signal = False | |
| 373 auxilliary_signals = [] | |
| 374 for data in map_config.all_scalar_data: | |
| 375 nxentry.data[data.label] = NXfield(value=np.empty(map_config.shape), | |
| 376 units=data.units, | |
| 377 attrs={'long_name': f'{data.label} ({data.units})', | |
| 378 'data_type': data.data_type, | |
| 379 'local_name': data.name}) | |
| 380 if not signal: | |
| 381 signal = data.label | |
| 382 else: | |
| 383 auxilliary_signals.append(data.label) | |
| 384 | |
| 385 if signal: | |
| 386 nxentry.data.attrs['signal'] = signal | |
| 387 nxentry.data.attrs['auxilliary_signals'] = auxilliary_signals | |
| 388 | |
| 389 for scans in map_config.spec_scans: | |
| 390 for scan_number in scans.scan_numbers: | |
| 391 scanparser = scans.get_scanparser(scan_number) | |
| 392 for scan_step_index in range(scanparser.spec_scan_npts): | |
| 393 map_index = scans.get_index(scan_number, scan_step_index, map_config) | |
| 394 for data in map_config.all_scalar_data: | |
| 395 nxentry.data[data.label][map_index] = data.get_value(scans, scan_number, scan_step_index) | |
| 396 | |
| 397 return(nxentry) | |
| 398 | |
| 399 class IntegrationProcessor(Processor): | |
| 400 '''Class for integrating 2D detector data | |
| 401 ''' | |
| 402 | |
| 403 def _process(self, data): | |
| 404 '''Integrate the input data with the integration method and keyword | |
| 405 arguments supplied and return the results. | |
| 406 | |
| 407 :param data: input data, including raw data, integration method, and | |
| 408 keyword args for the integration method. | |
| 409 :type data: tuple[typing.Union[numpy.ndarray, list[numpy.ndarray]], | |
| 410 callable, | |
| 411 dict] | |
| 412 :param integration_method: the method of a | |
| 413 `pyFAI.azimuthalIntegrator.AzimuthalIntegrator` or | |
| 414 `pyFAI.multi_geometry.MultiGeometry` that returns the desired | |
| 415 integration results. | |
| 416 :return: integrated raw data | |
| 417 :rtype: pyFAI.containers.IntegrateResult | |
| 418 ''' | |
| 419 | |
| 420 detector_data, integration_method, integration_kwargs = data | |
| 421 | |
| 422 return(integration_method(detector_data, **integration_kwargs)) | |
| 423 | |
| 424 class IntegrateMapProcessor(Processor): | |
| 425 '''Class representing a process that takes a map and integration | |
| 426 configuration and returns a `nexusformat.nexus.NXprocess` containing a map of | |
| 427 the integrated detector data requested. | |
| 428 ''' | |
| 429 | |
| 430 def _process(self, data): | |
| 431 '''Process the output of a `Reader` that contains a map and integration | |
| 432 configuration and return a `nexusformat.nexus.NXprocess` containing a map | |
| 433 of the integrated detector data requested | |
| 434 | |
| 435 :param data: Result of `Reader.read` where at least one item has the | |
| 436 value `'MapConfig'` for the `'schema'` key, and at least one item has | |
| 437 the value `'IntegrationConfig'` for the `'schema'` key. | |
| 438 :type data: list[dict[str,object]] | |
| 439 :return: integrated data and process metadata | |
| 440 :rtype: nexusformat.nexus.NXprocess | |
| 441 ''' | |
| 442 | |
| 443 map_config, integration_config = self.get_configs(data) | |
| 444 nxprocess = self.get_nxprocess(map_config, integration_config) | |
| 445 | |
| 446 return(nxprocess) | |
| 447 | |
| 448 def get_configs(self, data): | |
| 449 '''Return valid instances of `MapConfig` and `IntegrationConfig` from the | |
| 450 input supplied by `MultipleReader`. | |
| 451 | |
| 452 :param data: Result of `Reader.read` where at least one item has the | |
| 453 value `'MapConfig'` for the `'schema'` key, and at least one item has | |
| 454 the value `'IntegrationConfig'` for the `'schema'` key. | |
| 455 :type data: list[dict[str,object]] | |
| 456 :raises ValueError: if `data` cannot be parsed into map and integration configurations. | |
| 457 :return: valid map and integration configuration objects. | |
| 458 :rtype: tuple[MapConfig, IntegrationConfig] | |
| 459 ''' | |
| 460 | |
| 461 self.logger.debug('Getting configuration objects') | |
| 462 t0 = time() | |
| 463 | |
| 464 from CHAP.models.map import MapConfig | |
| 465 from CHAP.models.integration import IntegrationConfig | |
| 466 | |
| 467 map_config = False | |
| 468 integration_config = False | |
| 469 if isinstance(data, list): | |
| 470 for item in data: | |
| 471 if isinstance(item, dict): | |
| 472 schema = item.get('schema') | |
| 473 if schema == 'MapConfig': | |
| 474 map_config = item.get('data') | |
| 475 elif schema == 'IntegrationConfig': | |
| 476 integration_config = item.get('data') | |
| 477 | |
| 478 if not map_config: | |
| 479 raise(ValueError('No map configuration found')) | |
| 480 if not integration_config: | |
| 481 raise(ValueError('No integration configuration found')) | |
| 482 | |
| 483 map_config = MapConfig(**map_config) | |
| 484 integration_config = IntegrationConfig(**integration_config) | |
| 485 | |
| 486 self.logger.debug(f'Got configuration objects in {time()-t0:.3f} seconds') | |
| 487 | |
| 488 return(map_config, integration_config) | |
| 489 | |
| 490 def get_nxprocess(self, map_config, integration_config): | |
| 491 '''Use a `MapConfig` and `IntegrationConfig` to construct a | |
| 492 `nexusformat.nexus.NXprocess` | |
| 493 | |
| 494 :param map_config: a valid map configuration | |
| 495 :type map_config: MapConfig | |
| 496 :param integration_config: a valid integration configuration | |
| 497 :type integration_config" IntegrationConfig | |
| 498 :return: the integrated detector data and metadata contained in a NeXus | |
| 499 structure | |
| 500 :rtype: nexusformat.nexus.NXprocess | |
| 501 ''' | |
| 502 | |
| 503 self.logger.debug('Constructing NXprocess') | |
| 504 t0 = time() | |
| 505 | |
| 506 from nexusformat.nexus import (NXdata, | |
| 507 NXdetector, | |
| 508 NXfield, | |
| 509 NXprocess) | |
| 510 import numpy as np | |
| 511 import pyFAI | |
| 512 | |
| 513 nxprocess = NXprocess(name=integration_config.title) | |
| 514 | |
| 515 nxprocess.map_config = json.dumps(map_config.dict()) | |
| 516 nxprocess.integration_config = json.dumps(integration_config.dict()) | |
| 517 | |
| 518 nxprocess.program = 'pyFAI' | |
| 519 nxprocess.version = pyFAI.version | |
| 520 | |
| 521 for k,v in integration_config.dict().items(): | |
| 522 if k == 'detectors': | |
| 523 continue | |
| 524 nxprocess.attrs[k] = v | |
| 525 | |
| 526 for detector in integration_config.detectors: | |
| 527 nxprocess[detector.prefix] = NXdetector() | |
| 528 nxprocess[detector.prefix].local_name = detector.prefix | |
| 529 nxprocess[detector.prefix].distance = detector.azimuthal_integrator.dist | |
| 530 nxprocess[detector.prefix].distance.attrs['units'] = 'm' | |
| 531 nxprocess[detector.prefix].calibration_wavelength = detector.azimuthal_integrator.wavelength | |
| 532 nxprocess[detector.prefix].calibration_wavelength.attrs['units'] = 'm' | |
| 533 nxprocess[detector.prefix].attrs['poni_file'] = str(detector.poni_file) | |
| 534 nxprocess[detector.prefix].attrs['mask_file'] = str(detector.mask_file) | |
| 535 nxprocess[detector.prefix].raw_data_files = np.full(map_config.shape, '', dtype='|S256') | |
| 536 | |
| 537 nxprocess.data = NXdata() | |
| 538 | |
| 539 nxprocess.data.attrs['axes'] = (*map_config.dims, *integration_config.integrated_data_dims) | |
| 540 for i,dim in enumerate(map_config.independent_dimensions[::-1]): | |
| 541 nxprocess.data[dim.label] = NXfield(value=map_config.coords[dim.label], | |
| 542 units=dim.units, | |
| 543 attrs={'long_name': f'{dim.label} ({dim.units})', | |
| 544 'data_type': dim.data_type, | |
| 545 'local_name': dim.name}) | |
| 546 nxprocess.data.attrs[f'{dim.label}_indices'] = i | |
| 547 | |
| 548 for i,(coord_name,coord_values) in enumerate(integration_config.integrated_data_coordinates.items()): | |
| 549 if coord_name == 'radial': | |
| 550 type_ = pyFAI.units.RADIAL_UNITS | |
| 551 elif coord_name == 'azimuthal': | |
| 552 type_ = pyFAI.units.AZIMUTHAL_UNITS | |
| 553 coord_units = pyFAI.units.to_unit(getattr(integration_config, f'{coord_name}_units'), type_=type_) | |
| 554 nxprocess.data[coord_units.name] = coord_values | |
| 555 nxprocess.data.attrs[f'{coord_units.name}_indices'] = i+len(map_config.coords) | |
| 556 nxprocess.data[coord_units.name].units = coord_units.unit_symbol | |
| 557 nxprocess.data[coord_units.name].attrs['long_name'] = coord_units.label | |
| 558 | |
| 559 nxprocess.data.attrs['signal'] = 'I' | |
| 560 nxprocess.data.I = NXfield(value=np.empty((*tuple([len(coord_values) for coord_name,coord_values in map_config.coords.items()][::-1]), *integration_config.integrated_data_shape)), | |
| 561 units='a.u', | |
| 562 attrs={'long_name':'Intensity (a.u)'}) | |
| 563 | |
| 564 integrator = integration_config.get_multi_geometry_integrator() | |
| 565 if integration_config.integration_type == 'azimuthal': | |
| 566 integration_method = integrator.integrate1d | |
| 567 integration_kwargs = { | |
| 568 'lst_mask': [detector.mask_array for detector in integration_config.detectors], | |
| 569 'npt': integration_config.radial_npt | |
| 570 } | |
| 571 elif integration_config.integration_type == 'cake': | |
| 572 integration_method = integrator.integrate2d | |
| 573 integration_kwargs = { | |
| 574 'lst_mask': [detector.mask_array for detector in integration_config.detectors], | |
| 575 'npt_rad': integration_config.radial_npt, | |
| 576 'npt_azim': integration_config.azimuthal_npt, | |
| 577 'method': 'bbox' | |
| 578 } | |
| 579 | |
| 580 integration_processor = IntegrationProcessor() | |
| 581 integration_processor.logger.setLevel(self.logger.getEffectiveLevel()) | |
| 582 integration_processor.logger.addHandler(self.logger.handlers[0]) | |
| 583 lst_args = [] | |
| 584 for scans in map_config.spec_scans: | |
| 585 for scan_number in scans.scan_numbers: | |
| 586 scanparser = scans.get_scanparser(scan_number) | |
| 587 for scan_step_index in range(scanparser.spec_scan_npts): | |
| 588 map_index = scans.get_index(scan_number, scan_step_index, map_config) | |
| 589 detector_data = scans.get_detector_data(integration_config.detectors, scan_number, scan_step_index) | |
| 590 result = integration_processor.process((detector_data, integration_method, integration_kwargs)) | |
| 591 nxprocess.data.I[map_index] = result.intensity | |
| 592 for detector in integration_config.detectors: | |
| 593 nxprocess[detector.prefix].raw_data_files[map_index] = scanparser.get_detector_data_file(detector.prefix, scan_step_index) | |
| 594 | |
| 595 self.logger.debug(f'Constructed NXprocess in {time()-t0:.3f} seconds') | |
| 596 | |
| 597 return(nxprocess) | |
| 598 | |
| 599 class MCACeriaCalibrationProcessor(Processor): | |
| 600 '''Class representing the procedure to use a CeO2 scan to obtain tuned values | |
| 601 for the bragg diffraction angle and linear correction parameters for MCA | |
| 602 channel energies for an EDD experimental setup. | |
| 603 ''' | |
| 604 | |
| 605 def _process(self, data): | |
| 606 '''Return tuned values for 2&theta and linear correction parameters for | |
| 607 the MCA channel energies. | |
| 608 | |
| 609 :param data: input configuration for the raw data & tuning procedure | |
| 610 :type data: list[dict[str,object]] | |
| 611 :return: original configuration dictionary with tuned values added | |
| 612 :rtype: dict[str,float] | |
| 613 ''' | |
| 614 | |
| 615 calibration_config = self.get_config(data) | |
| 616 | |
| 617 tth, slope, intercept = self.calibrate(calibration_config) | |
| 618 | |
| 619 calibration_config.tth_calibrated = tth | |
| 620 calibration_config.slope_calibrated = slope | |
| 621 calibration_config.intercept_calibrated = intercept | |
| 622 | |
| 623 return(calibration_config.dict()) | |
| 624 | |
| 625 def get_config(self, data): | |
| 626 '''Get an instance of the configuration object needed by this | |
| 627 `Processor` from a returned value of `Reader.read` | |
| 628 | |
| 629 :param data: Result of `Reader.read` where at least one item has the | |
| 630 value `'MCACeriaCalibrationConfig'` for the `'schema'` key. | |
| 631 :type data: list[dict[str,object]] | |
| 632 :raises Exception: If a valid config object cannot be constructed from `data`. | |
| 633 :return: a valid instance of a configuration object with field values | |
| 634 taken from `data`. | |
| 635 :rtype: MCACeriaCalibrationConfig | |
| 636 ''' | |
| 637 | |
| 638 from CHAP.models.edd import MCACeriaCalibrationConfig | |
| 639 | |
| 640 calibration_config = False | |
| 641 if isinstance(data, list): | |
| 642 for item in data: | |
| 643 if isinstance(item, dict): | |
| 644 if item.get('schema') == 'MCACeriaCalibrationConfig': | |
| 645 calibration_config = item.get('data') | |
| 646 break | |
| 647 | |
| 648 if not calibration_config: | |
| 649 raise(ValueError('No MCA ceria calibration configuration found in input data')) | |
| 650 | |
| 651 return(MCACeriaCalibrationConfig(**calibration_config)) | |
| 652 | |
| 653 def calibrate(self, calibration_config): | |
| 654 '''Iteratively calibrate 2&theta by fitting selected peaks of an MCA | |
| 655 spectrum until the computed strain is sufficiently small. Use the fitted | |
| 656 peak locations to determine linear correction parameters for the MCA's | |
| 657 channel energies. | |
| 658 | |
| 659 :param calibration_config: object configuring the CeO2 calibration procedure | |
| 660 :type calibration_config: MCACeriaCalibrationConfig | |
| 661 :return: calibrated values of 2&theta and linear correction parameters | |
| 662 for MCA channel energies : tth, slope, intercept | |
| 663 :rtype: float, float, float | |
| 664 ''' | |
| 665 | |
| 666 from msnctools.fit import Fit, FitMultipeak | |
| 667 import numpy as np | |
| 668 from scipy.constants import physical_constants | |
| 669 | |
| 670 hc = physical_constants['Planck constant in eV/Hz'][0] * \ | |
| 671 physical_constants['speed of light in vacuum'][0] * \ | |
| 672 1e7 # We'll work in keV and A, not eV and m. | |
| 673 | |
| 674 # Collect raw MCA data of interest | |
| 675 mca_data = calibration_config.mca_data() | |
| 676 mca_bin_energies = np.arange(0, calibration_config.num_bins) * \ | |
| 677 (calibration_config.max_energy_kev / calibration_config.num_bins) | |
| 678 | |
| 679 # Mask out the corrected MCA data for fitting | |
| 680 mca_mask = calibration_config.mca_mask() | |
| 681 fit_mca_energies = mca_bin_energies[mca_mask] | |
| 682 fit_mca_intensities = mca_data[mca_mask] | |
| 683 | |
| 684 # Correct raw MCA data for variable flux at different energies | |
| 685 flux_correct = calibration_config.flux_correction_interpolation_function() | |
| 686 mca_intensity_weights = flux_correct(fit_mca_energies) | |
| 687 fit_mca_intensities = fit_mca_intensities / mca_intensity_weights | |
| 688 | |
| 689 # Get the HKLs and lattice spacings that will be used for fitting | |
| 690 tth = calibration_config.tth_initial_guess | |
| 691 fit_hkls, fit_ds = calibration_config.fit_ds() | |
| 692 c_1 = fit_hkls[:,0]**2 + fit_hkls[:,1]**2 + fit_hkls[:,2]**2 | |
| 693 | |
| 694 for iter_i in range(calibration_config.max_iter): | |
| 695 | |
| 696 ### Perform the uniform fit first ### | |
| 697 | |
| 698 # Get expected peak energy locations for this iteration's starting | |
| 699 # value of tth | |
| 700 fit_lambda = 2.0 * fit_ds * np.sin(0.5*np.radians(tth)) | |
| 701 fit_E0 = hc / fit_lambda | |
| 702 | |
| 703 # Run the uniform fit | |
| 704 best_fit, residual, best_values, best_errors, redchi, success = \ | |
| 705 FitMultipeak.fit_multipeak(fit_mca_intensities, | |
| 706 fit_E0, | |
| 707 x=fit_mca_energies, | |
| 708 fit_type='uniform') | |
| 709 | |
| 710 # Extract values of interest from the best values for the uniform fit | |
| 711 # parameters | |
| 712 uniform_fit_centers = [best_values[f'peak{i+1}_center'] for i in range(len(calibration_config.fit_hkls))] | |
| 713 # uniform_a = best_values['scale_factor'] | |
| 714 # uniform_strain = np.log(uniform_a / calibration_config.lattice_parameter_angstrom) | |
| 715 # uniform_tth = tth * (1.0 + uniform_strain) | |
| 716 # uniform_rel_rms_error = np.linalg.norm(residual) / np.linalg.norm(fit_mca_intensities) | |
| 717 | |
| 718 ### Next, perform the unconstrained fit ### | |
| 719 | |
| 720 # Use the peak locations found in the uniform fit as the initial | |
| 721 # guesses for peak locations in the unconstrained fit | |
| 722 best_fit, residual, best_values, best_errors, redchi, success = \ | |
| 723 FitMultipeak.fit_multipeak(fit_mca_intensities, | |
| 724 uniform_fit_centers, | |
| 725 x=fit_mca_energies, | |
| 726 fit_type='unconstrained') | |
| 727 | |
| 728 # Extract values of interest from the best values for the | |
| 729 # unconstrained fit parameters | |
| 730 unconstrained_fit_centers = np.array([best_values[f'peak{i+1}_center'] for i in range(len(calibration_config.fit_hkls))]) | |
| 731 unconstrained_a = 0.5 * hc * np.sqrt(c_1) / (unconstrained_fit_centers * abs(np.sin(0.5*np.radians(tth)))) | |
| 732 unconstrained_strains = np.log(unconstrained_a / calibration_config.lattice_parameter_angstrom) | |
| 733 unconstrained_strain = np.mean(unconstrained_strains) | |
| 734 unconstrained_tth = tth * (1.0 + unconstrained_strain) | |
| 735 # unconstrained_rel_rms_error = np.linalg.norm(residual) / np.linalg.norm(fit_mca_intensities) | |
| 736 | |
| 737 | |
| 738 # Update tth for the next iteration of tuning | |
| 739 prev_tth = tth | |
| 740 tth = unconstrained_tth | |
| 741 | |
| 742 # Stop tuning tth at this iteration if differences are small enough | |
| 743 if abs(tth - prev_tth) < calibration_config.tune_tth_tol: | |
| 744 break | |
| 745 | |
| 746 # Fit line to expected / computed peak locations from the last | |
| 747 # unconstrained fit. | |
| 748 fit = Fit.fit_data(fit_E0,'linear', x=unconstrained_fit_centers, nan_policy='omit') | |
| 749 slope = fit.best_values['slope'] | |
| 750 intercept = fit.best_values['intercept'] | |
| 751 | |
| 752 return(float(tth), float(slope), float(intercept)) | |
| 753 | |
| 754 class MCADataProcessor(Processor): | |
| 755 '''Class representing a process to return data from a MCA, restuctured to | |
| 756 incorporate the shape & metadata associated with a map configuration to | |
| 757 which the MCA data belongs, and linearly transformed according to the | |
| 758 results of a ceria calibration. | |
| 759 ''' | |
| 760 | |
| 761 def _process(self, data): | |
| 762 '''Process configurations for a map and MCA detector(s), and return the | |
| 763 raw MCA data collected over the map. | |
| 764 | |
| 765 :param data: input map configuration and results of ceria calibration | |
| 766 :type data: list[dict[str,object]] | |
| 767 :return: calibrated and flux-corrected MCA data | |
| 768 :rtype: nexusformat.nexus.NXentry | |
| 769 ''' | |
| 770 | |
| 771 map_config, calibration_config = self.get_configs(data) | |
| 772 nxroot = self.get_nxroot(map_config, calibration_config) | |
| 773 | |
| 774 return(nxroot) | |
| 775 | |
| 776 def get_configs(self, data): | |
| 777 '''Get instances of the configuration objects needed by this | |
| 778 `Processor` from a returned value of `Reader.read` | |
| 779 | |
| 780 :param data: Result of `Reader.read` where at least one item has the | |
| 781 value `'MapConfig'` for the `'schema'` key, and at least one item has | |
| 782 the value `'MCACeriaCalibrationConfig'` for the `'schema'` key. | |
| 783 :type data: list[dict[str,object]] | |
| 784 :raises Exception: If valid config objects cannot be constructed from `data`. | |
| 785 :return: valid instances of the configuration objects with field values | |
| 786 taken from `data`. | |
| 787 :rtype: tuple[MapConfig, MCACeriaCalibrationConfig] | |
| 788 ''' | |
| 789 | |
| 790 from CHAP.models.map import MapConfig | |
| 791 from CHAP.models.edd import MCACeriaCalibrationConfig | |
| 792 | |
| 793 map_config = False | |
| 794 calibration_config = False | |
| 795 if isinstance(data, list): | |
| 796 for item in data: | |
| 797 if isinstance(item, dict): | |
| 798 schema = item.get('schema') | |
| 799 if schema == 'MapConfig': | |
| 800 map_config = item.get('data') | |
| 801 elif schema == 'MCACeriaCalibrationConfig': | |
| 802 calibration_config = item.get('data') | |
| 803 | |
| 804 if not map_config: | |
| 805 raise(ValueError('No map configuration found in input data')) | |
| 806 if not calibration_config: | |
| 807 raise(ValueError('No MCA ceria calibration configuration found in input data')) | |
| 808 | |
| 809 return(MapConfig(**map_config), MCACeriaCalibrationConfig(**calibration_config)) | |
| 810 | |
| 811 def get_nxroot(self, map_config, calibration_config): | |
| 812 '''Get a map of the MCA data collected by the scans in `map_config`. The | |
| 813 MCA data will be calibrated and flux-corrected according to the | |
| 814 parameters included in `calibration_config`. The data will be returned | |
| 815 along with relevant metadata in the form of a NeXus structure. | |
| 816 | |
| 817 :param map_config: the map configuration | |
| 818 :type map_config: MapConfig | |
| 819 :param calibration_config: the calibration configuration | |
| 820 :type calibration_config: MCACeriaCalibrationConfig | |
| 821 :return: a map of the calibrated and flux-corrected MCA data | |
| 822 :rtype: nexusformat.nexus.NXroot | |
| 823 ''' | |
| 824 | |
| 825 from nexusformat.nexus import (NXdata, | |
| 826 NXdetector, | |
| 827 NXentry, | |
| 828 NXinstrument, | |
| 829 NXroot) | |
| 830 import numpy as np | |
| 831 | |
| 832 nxroot = NXroot() | |
| 833 | |
| 834 nxroot[map_config.title] = MapProcessor.get_nxentry(map_config) | |
| 835 nxentry = nxroot[map_config.title] | |
| 836 | |
| 837 nxentry.instrument = NXinstrument() | |
| 838 nxentry.instrument.detector = NXdetector() | |
| 839 nxentry.instrument.detector.calibration_configuration = json.dumps(calibration_config.dict()) | |
| 840 | |
| 841 nxentry.instrument.detector.data = NXdata() | |
| 842 nxdata = nxentry.instrument.detector.data | |
| 843 nxdata.raw = np.empty((*map_config.shape, calibration_config.num_bins)) | |
| 844 nxdata.raw.attrs['units'] = 'counts' | |
| 845 nxdata.channel_energy = calibration_config.slope_calibrated * \ | |
| 846 np.arange(0, calibration_config.num_bins) * \ | |
| 847 (calibration_config.max_energy_kev / calibration_config.num_bins) + \ | |
| 848 calibration_config.intercept_calibrated | |
| 849 nxdata.channel_energy.attrs['units'] = 'keV' | |
| 850 | |
| 851 for scans in map_config.spec_scans: | |
| 852 for scan_number in scans.scan_numbers: | |
| 853 scanparser = scans.get_scanparser(scan_number) | |
| 854 for scan_step_index in range(scanparser.spec_scan_npts): | |
| 855 map_index = scans.get_index(scan_number, scan_step_index, map_config) | |
| 856 nxdata.raw[map_index] = scanparser.get_detector_data(calibration_config.detector_name, scan_step_index) | |
| 857 | |
| 858 nxentry.data.makelink(nxdata.raw, name=calibration_config.detector_name) | |
| 859 nxentry.data.makelink(nxdata.channel_energy, name=f'{calibration_config.detector_name}_channel_energy') | |
| 860 if isinstance(nxentry.data.attrs['axes'], str): | |
| 861 nxentry.data.attrs['axes'] = [nxentry.data.attrs['axes'], f'{calibration_config.detector_name}_channel_energy'] | |
| 862 else: | |
| 863 nxentry.data.attrs['axes'] += [f'{calibration_config.detector_name}_channel_energy'] | |
| 864 nxentry.data.attrs['signal'] = calibration_config.detector_name | |
| 865 | |
| 866 return(nxroot) | |
| 867 | |
| 868 class StrainAnalysisProcessor(Processor): | |
| 869 '''Class representing a process to compute a map of sample strains by fitting | |
| 870 bragg peaks in 1D detector data and analyzing the difference between measured | |
| 871 peak locations and expected peak locations for the sample measured. | |
| 872 ''' | |
| 873 | |
| 874 def _process(self, data): | |
| 875 '''Process the input map detector data & configuration for the strain | |
| 876 analysis procedure, and return a map of sample strains. | |
| 877 | |
| 878 :param data: results of `MutlipleReader.read` containing input map | |
| 879 detector data and strain analysis configuration | |
| 880 :type data: dict[list[str,object]] | |
| 881 :return: map of sample strains | |
| 882 :rtype: xarray.Dataset | |
| 883 ''' | |
| 884 | |
| 885 strain_analysis_config = self.get_config(data) | |
| 886 | |
| 887 return(data) | |
| 888 | |
| 889 def get_config(self, data): | |
| 890 '''Get instances of the configuration objects needed by this | |
| 891 `Processor` from a returned value of `Reader.read` | |
| 892 | |
| 893 :param data: Result of `Reader.read` where at least one item has the | |
| 894 value `'StrainAnalysisConfig'` for the `'schema'` key. | |
| 895 :type data: list[dict[str,object]] | |
| 896 :raises Exception: If valid config objects cannot be constructed from `data`. | |
| 897 :return: valid instances of the configuration objects with field values | |
| 898 taken from `data`. | |
| 899 :rtype: StrainAnalysisConfig | |
| 900 ''' | |
| 901 | |
| 902 strain_analysis_config = False | |
| 903 if isinstance(data, list): | |
| 904 for item in data: | |
| 905 if isinstance(item, dict): | |
| 906 schema = item.get('schema') | |
| 907 if item.get('schema') == 'StrainAnalysisConfig': | |
| 908 strain_analysis_config = item.get('data') | |
| 909 | |
| 910 if not strain_analysis_config: | |
| 911 raise(ValueError('No strain analysis configuration found in input data')) | |
| 912 | |
| 913 return(strain_analysis_config) | |
| 914 | |
| 915 | |
| 916 class OptionParser(): | |
| 917 '''User based option parser''' | |
| 918 def __init__(self): | |
| 919 self.parser = argparse.ArgumentParser(prog='PROG') | |
| 920 self.parser.add_argument("--data", action="store", | |
| 921 dest="data", default="", help="Input data") | |
| 922 self.parser.add_argument("--processor", action="store", | |
| 923 dest="processor", default="Processor", help="Processor class name") | |
| 924 self.parser.add_argument('--log-level', choices=logging._nameToLevel.keys(), | |
| 925 dest='log_level', default='INFO', help='logging level') | |
| 926 | |
| 927 def main(): | |
| 928 '''Main function''' | |
| 929 optmgr = OptionParser() | |
| 930 opts = optmgr.parser.parse_args() | |
| 931 clsName = opts.processor | |
| 932 try: | |
| 933 processorCls = getattr(sys.modules[__name__],clsName) | |
| 934 except: | |
| 935 print(f'Unsupported processor {clsName}') | |
| 936 sys.exit(1) | |
| 937 | |
| 938 processor = processorCls() | |
| 939 processor.logger.setLevel(getattr(logging, opts.log_level)) | |
| 940 log_handler = logging.StreamHandler() | |
| 941 log_handler.setFormatter(logging.Formatter('{name:20}: {message}', style='{')) | |
| 942 processor.logger.addHandler(log_handler) | |
| 943 data = processor.process(opts.data) | |
| 944 | |
| 945 print(f"Processor {processor} operates on data {data}") | |
| 946 | |
| 947 if __name__ == '__main__': | |
| 948 main() | 
