comparison CHAP/models/map.py @ 0:cbbe42422d56 draft

planemo upload for repository https://github.com/CHESSComputing/ChessAnalysisPipeline/tree/galaxy commit 1401a7e1ae007a6bda260d147f9b879e789b73e0-dirty
author kls286
date Tue, 28 Mar 2023 15:07:30 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:cbbe42422d56
1 from functools import cache, lru_cache
2 import os
3 from typing import Literal, Optional, Union
4
5 import numpy as np
6 from pydantic import (BaseModel,
7 conint,
8 conlist,
9 confloat,
10 constr,
11 FilePath,
12 PrivateAttr,
13 ValidationError,
14 validator)
15 from pyspec.file.spec import FileSpec
16
17 class Sample(BaseModel):
18 """
19 Class representing a sample metadata configuration.
20
21 :ivar name: The name of the sample.
22 :type name: str
23 :ivar description: A description of the sample.
24 :type description: Optional[str]
25 """
26 name: constr(min_length=1)
27 description: Optional[str]
28
29 class SpecScans(BaseModel):
30 """
31 Class representing a set of scans from a single SPEC file.
32
33 :ivar spec_file: Path to the SPEC file.
34 :type spec_file: str
35 :ivar scan_numbers: List of scan numbers to use.
36 :type scan_numbers: list[int]
37 """
38 spec_file: FilePath
39 scan_numbers: conlist(item_type=conint(gt=0), min_items=1)
40 @validator('spec_file', allow_reuse=True)
41 def validate_spec_file(cls, spec_file):
42 """
43 Validate the specified SPEC file.
44
45 :param spec_file: Path to the SPEC file.
46 :type spec_file: str
47 :raises ValueError: If the SPEC file is invalid.
48 :return: Absolute path to the SPEC file, if it is valid.
49 :rtype: str
50 """
51 try:
52 spec_file = os.path.abspath(spec_file)
53 sspec_file = FileSpec(spec_file)
54 except:
55 raise(ValueError(f'Invalid SPEC file {spec_file}'))
56 else:
57 return(spec_file)
58 @validator('scan_numbers', allow_reuse=True)
59 def validate_scan_numbers(cls, scan_numbers, values):
60 """
61 Validate the specified list of scan numbers.
62
63 :param scan_numbers: List of scan numbers.
64 :type scan_numbers: list of int
65 :param values: Dictionary of values for all fields of the model.
66 :type values: dict
67 :raises ValueError: If a specified scan number is not found in the SPEC file.
68 :return: List of scan numbers.
69 :rtype: list of int
70 """
71 spec_file = values.get('spec_file')
72 if spec_file is not None:
73 spec_scans = FileSpec(spec_file)
74 for scan_number in scan_numbers:
75 scan = spec_scans.get_scan_by_number(scan_number)
76 if scan is None:
77 raise(ValueError(f'There is no scan number {scan_number} in {spec_file}'))
78 return(scan_numbers)
79
80 @property
81 def scanparsers(self):
82 '''A list of `ScanParser`s for each of the scans specified by the SPEC
83 file and scan numbers belonging to this instance of `SpecScans`
84 '''
85 return([self.get_scanparser(scan_no) for scan_no in self.scan_numbers])
86
87 def get_scanparser(self, scan_number):
88 """This method returns a `ScanParser` for the specified scan number in
89 the specified SPEC file.
90
91 :param scan_number: Scan number to get a `ScanParser` for
92 :type scan_number: int
93 :return: `ScanParser` for the specified scan number
94 :rtype: ScanParser
95 """
96 return(get_scanparser(self.spec_file, scan_number))
97 def get_index(self, scan_number:int, scan_step_index:int, map_config):
98 """This method returns a tuple representing the index of a specific step
99 in a specific spec scan within a map.
100
101 :param scan_number: Scan number to get index for
102 :type scan_number: int
103 :param scan_step_index: Scan step index to get index for
104 :type scan_step_index: int
105 :param map_config: Map configuration to get index for
106 :type map_config: MapConfig
107 :return: Index for the specified scan number and scan step index within
108 the specified map configuration
109 :rtype: tuple
110 """
111 index = ()
112 for independent_dimension in map_config.independent_dimensions:
113 coordinate_index = list(map_config.coords[independent_dimension.label]).index(independent_dimension.get_value(self, scan_number, scan_step_index))
114 index = (coordinate_index, *index)
115 return(index)
116 def get_detector_data(self, detectors:list, scan_number:int, scan_step_index:int):
117 """
118 Return the raw data from the specified detectors at the specified scan
119 number and scan step index.
120
121 :param detectors: List of detector prefixes to get raw data for
122 :type detectors: list[str]
123 :param scan_number: Scan number to get data for
124 :type scan_number: int
125 :param scan_step_index: Scan step index to get data for
126 :type scan_step_index: int
127 :return: Data from the specified detectors for the specified scan number
128 and scan step index
129 :rtype: list[np.ndarray]
130 """
131 return(get_detector_data(tuple([detector.prefix for detector in detectors]), self.spec_file, scan_number, scan_step_index))
132 @cache
133 def get_available_scan_numbers(spec_file:str):
134 scans = FileSpec(spec_file).scans
135 scan_numbers = list(scans.keys())
136 return(scan_numbers)
137 @cache
138 def get_scanparser(spec_file:str, scan_number:int):
139 if scan_number not in get_available_scan_numbers(spec_file):
140 return(None)
141 else:
142 return(ScanParser(spec_file, scan_number))
143 @lru_cache(maxsize=10)
144 def get_detector_data(detector_prefixes:tuple, spec_file:str, scan_number:int, scan_step_index:int):
145 detector_data = []
146 scanparser = get_scanparser(spec_file, scan_number)
147 for prefix in detector_prefixes:
148 image_data = scanparser.get_detector_data(prefix, scan_step_index)
149 detector_data.append(image_data)
150 return(detector_data)
151
152 class PointByPointScanData(BaseModel):
153 """Class representing a source of raw scalar-valued data for which a value
154 was recorded at every point in a `MapConfig`.
155
156 :ivar label: A user-defined label for referring to this data in the NeXus
157 file and in other tools.
158 :type label: str
159 :ivar units: The units in which the data were recorded.
160 :type units: str
161 :ivar data_type: Represents how these data were recorded at time of data
162 collection.
163 :type data_type: Literal['spec_motor', 'scan_column', 'smb_par']
164 :ivar name: Represents the name with which these raw data were recorded at
165 time of data collection.
166 :type name: str
167 """
168 label: constr(min_length=1)
169 units: constr(strip_whitespace=True, min_length=1)
170 data_type: Literal['spec_motor', 'scan_column', 'smb_par']
171 name: constr(strip_whitespace=True, min_length=1)
172 @validator('label')
173 def validate_label(cls, label):
174 """Validate that the supplied `label` does not conflict with any of the
175 values for `label` reserved for certain data needed to perform
176 corrections.
177
178 :param label: The value of `label` to validate
179 :type label: str
180 :raises ValueError: If `label` is one of the reserved values.
181 :return: The original supplied value `label`, if it is allowed.
182 :rtype: str
183 """
184 #if (not issubclass(cls,CorrectionsData)) and label in CorrectionsData.__fields__['label'].type_.__args__:
185 if (not issubclass(cls,CorrectionsData)) and label in CorrectionsData.reserved_labels():
186 raise(ValueError(f'{cls.__name__}.label may not be any of the following reserved values: {CorrectionsData.reserved_labels()}'))
187 return(label)
188 def validate_for_station(self, station:str):
189 """Validate this instance of `PointByPointScanData` for a certain choice
190 of station (beamline).
191
192 :param station: The name of the station (in 'idxx' format).
193 :type station: str
194 :raises TypeError: If the station is not compatible with the value of the
195 `data_type` attribute for this instance of PointByPointScanData.
196 :return: None
197 :rtype: None
198 """
199 if station.lower() not in ('id1a3', 'id3a') and self.data_type == 'smb_par':
200 raise(TypeError(f'{self.__class__.__name__}.data_type may not be "smb_par" when station is "{station}"'))
201 def validate_for_spec_scans(self, spec_scans:list[SpecScans], scan_step_index:Union[Literal['all'],int]='all'):
202 """Validate this instance of `PointByPointScanData` for a list of
203 `SpecScans`.
204
205 :param spec_scans: A list of `SpecScans` whose raw data will be checked
206 for the presence of the data represented by this instance of
207 `PointByPointScanData`
208 :type spec_scans: list[SpecScans]
209 :param scan_step_index: A specific scan step index to validate, defaults
210 to `'all'`.
211 :type scan_step_index: Union[Literal['all'],int], optional
212 :raises RuntimeError: If the data represented by this instance of
213 `PointByPointScanData` is missing for the specified scan steps.
214 :return: None
215 :rtype: None
216 """
217 for scans in spec_scans:
218 for scan_number in scans.scan_numbers:
219 scanparser = scans.get_scanparser(scan_number)
220 if scan_step_index == 'all':
221 scan_step_index_range = range(scanparser.spec_scan_npts)
222 else:
223 scan_step_index_range = range(scan_step_index,scan_step_index+1)
224 for scan_step_index in scan_step_index_range:
225 try:
226 self.get_value(scans, scan_number, scan_step_index)
227 except:
228 raise(RuntimeError(f'Could not find data for {self.name} (data_type "{self.data_type}") on scan number {scan_number} in spec file {scans.spec_file}'))
229 def get_value(self, spec_scans:SpecScans, scan_number:int, scan_step_index:int):
230 """Return the value recorded for this instance of `PointByPointScanData`
231 at a specific scan step.
232
233 :param spec_scans: An instance of `SpecScans` in which the requested scan step occurs.
234 :type spec_scans: SpecScans
235 :param scan_number: The number of the scan in which the requested scan step occurs.
236 :type scan_number: int
237 :param scan_step_index: The index of the requested scan step.
238 :type scan_step_index: int
239 :return: The value recorded of the data represented by this instance of
240 `PointByPointScanData` at the scan step requested
241 :rtype: float
242 """
243 if self.data_type == 'spec_motor':
244 return(get_spec_motor_value(spec_scans.spec_file, scan_number, scan_step_index, self.name))
245 elif self.data_type == 'scan_column':
246 return(get_spec_counter_value(spec_scans.spec_file, scan_number, scan_step_index, self.name))
247 elif self.data_type == 'smb_par':
248 return(get_smb_par_value(spec_scans.spec_file, scan_number, self.name))
249 @cache
250 def get_spec_motor_value(spec_file:str, scan_number:int, scan_step_index:int, spec_mnemonic:str):
251 """Return the value recorded for a SPEC motor at a specific scan step.
252
253 :param spec_file: Location of a SPEC file in which the requested scan step occurs.
254 :type spec_scans: str
255 :param scan_number: The number of the scan in which the requested scan step occurs.
256 :type scan_number: int
257 :param scan_step_index: The index of the requested scan step.
258 :type scan_step_index: int
259 :param spec_mnemonic: The menmonic of a SPEC motor.
260 :type spec_mnemonic: str
261 :return: The value of the motor at the scan step requested
262 :rtype: float
263 """
264 scanparser = get_scanparser(spec_file, scan_number)
265 if spec_mnemonic in scanparser.spec_scan_motor_mnes:
266 motor_i = scanparser.spec_scan_motor_mnes.index(spec_mnemonic)
267 if scan_step_index >= 0:
268 scan_step = np.unravel_index(scan_step_index, scanparser.spec_scan_shape, order='F')
269 motor_value = scanparser.spec_scan_motor_vals[motor_i][scan_step[motor_i]]
270 else:
271 motor_value = scanparser.spec_scan_motor_vals[motor_i]
272 else:
273 motor_value = scanparser.get_spec_positioner_value(spec_mnemonic)
274 return(motor_value)
275 @cache
276 def get_spec_counter_value(spec_file:str, scan_number:int, scan_step_index:int, spec_column_label:str):
277 """Return the value recorded for a SPEC counter at a specific scan step.
278
279 :param spec_file: Location of a SPEC file in which the requested scan step occurs.
280 :type spec_scans: str
281 :param scan_number: The number of the scan in which the requested scan step occurs.
282 :type scan_number: int
283 :param scan_step_index: The index of the requested scan step.
284 :type scan_step_index: int
285 :param spec_column_label: The label of a SPEC data column.
286 :type spec_column_label: str
287 :return: The value of the counter at the scan step requested
288 :rtype: float
289 """
290 scanparser = get_scanparser(spec_file, scan_number)
291 if scan_step_index >= 0:
292 return(scanparser.spec_scan_data[spec_column_label][scan_step_index])
293 else:
294 return(scanparser.spec_scan_data[spec_column_label])
295 @cache
296 def get_smb_par_value(spec_file:str, scan_number:int, par_name:str):
297 """Return the value recorded for a specific scan in SMB-tyle .par file.
298
299 :param spec_file: Location of a SPEC file in which the requested scan step occurs.
300 :type spec_scans: str
301 :param scan_number: The number of the scan in which the requested scan step occurs.
302 :type scan_number: int
303 :param par_name: The name of the column in the .par file
304 :type par_name: str
305 :return: The value of the .par file value for the scan requested.
306 :rtype: float
307 """
308 scanparser = get_scanparser(spec_file, scan_number)
309 return(scanparser.pars[par_name])
310 def validate_data_source_for_map_config(data_source, values):
311 import_scanparser(values.get('station'), values.get('experiment_type'))
312 data_source.validate_for_station(values.get('station'))
313 data_source.validate_for_spec_scans(values.get('spec_scans'))
314 return(data_source)
315
316 class CorrectionsData(PointByPointScanData):
317 """Class representing the special instances of `PointByPointScanData` that
318 are used by certain kinds of `CorrectionConfig` tools.
319
320 :ivar label: One of the reserved values required by `CorrectionConfig`,
321 `'presample_intensity'`, `'postsample_intensity'`, or
322 `'dwell_time_actual'`.
323 :type label: Literal['presample_intensity','postsample_intensity','dwell_time_actual']
324 :ivar units: The units in which the data were recorded.
325 :type units: str
326 :ivar data_type: Represents how these data were recorded at time of data
327 collection.
328 :type data_type: Literal['scan_column', 'smb_par']
329 :ivar name: Represents the name with which these raw data were recorded at
330 time of data collection.
331 :type name: str
332 """
333 label: Literal['presample_intensity','postsample_intensity','dwell_time_actual']
334 data_type: Literal['scan_column','smb_par']
335 @classmethod
336 def reserved_labels(cls):
337 """Return a list of all the labels reserved for corrections-related
338 scalar data.
339
340 :return: A list of reserved labels
341 :rtype: list[str]
342 """
343 return(list(cls.__fields__['label'].type_.__args__))
344 class PresampleIntensity(CorrectionsData):
345 """Class representing a source of raw data for the intensity of the beam that
346 is incident on the sample.
347
348 :ivar label: Must be `"presample_intensity"`
349 :type label: Literal["presample_intensity"]
350 :ivar units: Must be `"counts"`
351 :type units: Literal["counts"]
352 :ivar data_type: Represents how these data were recorded at time of data
353 collection.
354 :type data_type: Literal['scan_column', 'smb_par']
355 :ivar name: Represents the name with which these raw data were recorded at
356 time of data collection.
357 :type name: str
358 """
359 label: Literal['presample_intensity'] = 'presample_intensity'
360 units: Literal['counts'] = 'counts'
361 class PostsampleIntensity(CorrectionsData):
362 """Class representing a source of raw data for the intensity of the beam that
363 has passed through the sample.
364
365 :ivar label: Must be `"postsample_intensity"`
366 :type label: Literal["postsample_intensity"]
367 :ivar units: Must be `"counts"`
368 :type units: Literal["counts"]
369 :ivar data_type: Represents how these data were recorded at time of data
370 collection.
371 :type data_type: Literal['scan_column', 'smb_par']
372 :ivar name: Represents the name with which these raw data were recorded at
373 time of data collection.
374 :type name: str
375 """
376 label: Literal['postsample_intensity'] = 'postsample_intensity'
377 units: Literal['counts'] = 'counts'
378 class DwellTimeActual(CorrectionsData):
379 """Class representing a source of raw data for the actual dwell time at each
380 scan point in SPEC (with some scan types, this value can vary slightly
381 point-to-point from the dwell time specified in the command).
382
383 :ivar label: Must be `"dwell_time_actual"`
384 :type label: Literal["dwell_time_actual"]
385 :ivar units: Must be `"counts"`
386 :type units: Literal["counts"]
387 :ivar data_type: Represents how these data were recorded at time of data
388 collection.
389 :type data_type: Literal['scan_column', 'smb_par']
390 :ivar name: Represents the name with which these raw data were recorded at
391 time of data collection.
392 :type name: str
393 """
394 label: Literal['dwell_time_actual'] = 'dwell_time_actual'
395 units: Literal['s'] = 's'
396
397 class MapConfig(BaseModel):
398 """Class representing an experiment consisting of one or more SPEC scans.
399
400 :ivar title: The title for the map configuration.
401 :type title: str
402 :ivar station: The name of the station at which the map was collected.
403 :type station: Literal['id1a3','id3a','id3b']
404 :ivar spec_scans: A list of the spec scans that compose the map.
405 :type spec_scans: list[SpecScans]
406 :ivar independent_dimensions: A list of the sources of data representing the
407 raw values of each independent dimension of the map.
408 :type independent_dimensions: list[PointByPointScanData]
409 :ivar presample_intensity: A source of point-by-point presample beam
410 intensity data. Required when applying a CorrectionConfig tool.
411 :type presample_intensity: Optional[PresampleIntensity]
412 :ivar dwell_time_actual: A source of point-by-point actual dwell times for
413 spec scans. Required when applying a CorrectionConfig tool.
414 :type dwell_time_actual: Optional[DwellTimeActual]
415 :ivar presample_intensity: A source of point-by-point postsample beam
416 intensity data. Required when applying a CorrectionConfig tool with
417 `correction_type="flux_absorption"` or
418 `correction_type="flux_absorption_background"`.
419 :type presample_intensity: Optional[PresampleIntensity]
420 :ivar scalar_data: A list of the sources of data representing other scalar
421 raw data values collected at each point ion the map. In the NeXus file
422 representation of the map, datasets for these values will be included.
423 :type scalar_values: Optional[list[PointByPointScanData]]
424 """
425 title: constr(strip_whitespace=True, min_length=1)
426 station: Literal['id1a3','id3a','id3b']
427 experiment_type: Literal['SAXSWAXS', 'EDD', 'XRF']
428 sample: Sample
429 spec_scans: conlist(item_type=SpecScans, min_items=1)
430 independent_dimensions: conlist(item_type=PointByPointScanData, min_items=1)
431 presample_intensity: Optional[PresampleIntensity]
432 dwell_time_actual: Optional[DwellTimeActual]
433 postsample_intensity: Optional[PostsampleIntensity]
434 scalar_data: Optional[list[PointByPointScanData]] = []
435 _coords: dict = PrivateAttr()
436 _validate_independent_dimensions = validator('independent_dimensions', each_item=True, allow_reuse=True)(validate_data_source_for_map_config)
437 _validate_presample_intensity = validator('presample_intensity', allow_reuse=True)(validate_data_source_for_map_config)
438 _validate_dwell_time_actual = validator('dwell_time_actual', allow_reuse=True)(validate_data_source_for_map_config)
439 _validate_postsample_intensity = validator('postsample_intensity', allow_reuse=True)(validate_data_source_for_map_config)
440 _validate_scalar_data = validator('scalar_data', each_item=True, allow_reuse=True)(validate_data_source_for_map_config)
441 @validator('experiment_type')
442 def validate_experiment_type(cls, value, values):
443 '''Ensure values for the station and experiment_type fields are compatible'''
444 station = values.get('station')
445 if station == 'id1a3':
446 allowed_experiment_types = ['SAXSWAXS', 'EDD']
447 elif station == 'id3a':
448 allowed_experiment_types = ['EDD']
449 elif station == 'id3b':
450 allowed_experiment_types = ['SAXSWAXS', 'XRF']
451 else:
452 allowed_experiment_types = []
453 if value not in allowed_experiment_types:
454 raise(ValueError(f'For station {station}, allowed experiment types are {allowed_experiment_types} (suuplied experiment type {value} is not allowed)'))
455 return(value)
456 @property
457 def coords(self):
458 """Return a dictionary of the values of each independent dimension across
459 the map.
460
461 :returns: A dictionary ofthe map's coordinate values.
462 :rtype: dict[str,list[float]]
463 """
464 try:
465 return(self._coords)
466 except:
467 coords = {}
468 for independent_dimension in self.independent_dimensions:
469 coords[independent_dimension.label] = []
470 for scans in self.spec_scans:
471 for scan_number in scans.scan_numbers:
472 scanparser = scans.get_scanparser(scan_number)
473 for scan_step_index in range(scanparser.spec_scan_npts):
474 coords[independent_dimension.label].append(independent_dimension.get_value(scans, scan_number, scan_step_index))
475 coords[independent_dimension.label] = np.unique(coords[independent_dimension.label])
476 self._coords = coords
477 return(self._coords)
478 @property
479 def dims(self):
480 """Return a tuple of the independent dimension labels for the map."""
481 return([point_by_point_scan_data.label for point_by_point_scan_data in self.independent_dimensions[::-1]])
482 @property
483 def shape(self):
484 """Return the shape of the map -- a tuple representing the number of
485 unique values of each dimension across the map.
486 """
487 return(tuple([len(values) for key,values in self.coords.items()][::-1]))
488 @property
489 def all_scalar_data(self):
490 """Return a list of all instances of `PointByPointScanData` for which
491 this map configuration will collect dataset-like data (as opposed to
492 axes-like data).
493
494 This will be any and all of the items in the corrections-data-related
495 fields, as well as any additional items in the optional `scalar_data`
496 field."""
497 return([getattr(self,l,None) for l in CorrectionsData.reserved_labels() if getattr(self,l,None) is not None] + self.scalar_data)
498
499 def import_scanparser(station, experiment_type):
500 if station.lower() in ('id1a3', 'id3a'):
501 if experiment_type == 'SAXSWAXS':
502 from msnctools.scanparsers import SMBLinearScanParser
503 globals()['ScanParser'] = SMBLinearScanParser
504 elif experiment_type == 'EDD':
505 from msnctools.scanparsers import SMBMCAScanParser
506 globals()['ScanParser'] = SMBMCAScanParser
507 else:
508 raise(ValueError(f'Invalid experiment_type: {experiment_type}'))
509 elif station.lower() == 'id3b':
510 if experiment_type == 'SAXSWAXS':
511 from msnctools.scanparsers import FMBSAXSWAXSScanParser
512 globals()['ScanParser'] = FMBSAXSWAXSScanParser
513 elif experiment_type == 'XRF':
514 from msnctools.scanparsers import FMBXRFScanParser
515 globals()['ScanParser'] = FMBXRFScanParser
516 else:
517 raise(ValueError(f'Invalid experiment_type: {experiment_type}'))
518 else:
519 raise(ValueError(f'Invalid station: {station}'))