# HG changeset patch
# User rv43
# Date 1659024324 0
# Node ID 26f99fdd8d616f8936e1f383b1c2655ee19b7124
# Parent 059819ea1f0ea164513f3c37e9963c19f88a86cd
"planemo upload for repository https://github.com/rolfverberg/galaxytools commit 4f7738d02f4a3fd91373f43937ed311b6fe11a12"
diff -r 059819ea1f0e -r 26f99fdd8d61 detector.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/detector.py Thu Jul 28 16:05:24 2022 +0000
@@ -0,0 +1,345 @@
+import logging
+import os
+import yaml
+from functools import cache
+from copy import deepcopy
+
+#from .general import *
+from general import illegal_value, is_int, is_num, input_yesno
+
+#from hexrd.instrument import HEDMInstrument, PlanarDetector
+
+class DetectorConfig:
+ def __init__(self, config_source):
+ self._config_source = config_source
+
+ if isinstance(self._config_source, str):
+ self._config_file = self._config_source
+ self._config = self._load_config_file()
+ elif isinstance(self._config_source, dict):
+ self._config_file = None
+ self._config = self._config_source
+ else:
+ self._config_file = None
+ self._config = False
+
+ self._valid = self._validate()
+
+ if not self.valid:
+ logging.error(f'Cannot create a valid instance of {self.__class__.__name__} '+
+ f'from {self._config_source}')
+
+ def __repr__(self):
+ return(f'{self.__class__.__name__}({self._config_source.__repr__()})')
+ def __str__(self):
+ return(f'{self.__class__.__name__} generated from {self._config_source}')
+
+ @property
+ def config_file(self):
+ return(self._config_file)
+
+ @property
+ def config(self):
+ return(deepcopy(self._config))
+
+ @property
+ def valid(self):
+ return(self._valid)
+
+ def load_config_file(self):
+ raise(NotImplementedError)
+
+ def validate(self):
+ raise(NotImplementedError)
+
+ def _load_config_file(self):
+ return(self.load_config_file())
+
+ def _validate(self):
+ if not self.config:
+ logging.error('A configuration must be loaded prior to calling Detector._validate')
+ return(False)
+ else:
+ return(self.validate())
+
+ def _write_to_file(self, out_file):
+ out_file = os.path.abspath(out_file)
+
+ current_config_valid = self.validate()
+ if not current_config_valid:
+ write_invalid_config = input_yesno(s=f'This {self.__class__.__name__} is currently '+
+ f'invalid. Write the configuration to {out_file} anyways?', default='no')
+ if not write_invalid_config:
+ logging.info('In accordance with user input, the invalid configuration will '+
+ f'not be written to {out_file}')
+ return
+
+ if os.access(out_file, os.W_OK):
+ if os.path.exists(out_file):
+ overwrite = input_yesno(s=f'{out_file} already exists. Overwrite?', default='no')
+ if overwrite:
+ self.write_to_file(out_file)
+ else:
+ logging.info(f'In accordance with user input, {out_file} will not be '+
+ 'overwritten')
+ else:
+ self.write_to_file(out_file)
+ else:
+ logging.error(f'Insufficient permissions to write to {out_file}')
+
+ def write_to_file(self, out_file):
+ raise(NotImplementedError)
+
+class YamlDetectorConfig(DetectorConfig):
+ def __init__(self, config_source, validate_yaml_pars=[]):
+ self._validate_yaml_pars = validate_yaml_pars
+ super().__init__(config_source)
+
+ def load_config_file(self):
+ if not os.path.splitext(self._config_file)[1]:
+ if os.path.isfile(f'{self._config_file}.yml'):
+ self._config_file = f'{self._config_file}.yml'
+ if os.path.isfile(f'{self._config_file}.yaml'):
+ self._config_file = f'{self._config_file}.yaml'
+ if not os.path.isfile(self._config_file):
+ logging.error(f'Unable to load {self._config_file}')
+ return(False)
+ with open(self._config_file, 'r') as infile:
+ config = yaml.safe_load(infile)
+ if isinstance(config, dict):
+ return(config)
+ else:
+ logging.error(f'Unable to load {self._config_file} as a dictionary')
+ return(False)
+
+ def validate(self):
+ if not self._validate_yaml_pars:
+ logging.warning('There are no required parameters provided for this detector '+
+ 'configuration')
+ return(True)
+
+ def validate_nested_pars(config, validate_yaml_par):
+ yaml_par_levels = validate_yaml_par.split(':')
+ first_level_par = yaml_par_levels[0]
+ try:
+ first_level_par = int(first_level_par)
+ except:
+ pass
+ try:
+ next_level_config = config[first_level_par]
+ if len(yaml_par_levels) > 1:
+ next_level_pars = ':'.join(yaml_par_levels[1:])
+ return(validate_nested_pars(next_level_config, next_level_pars))
+ else:
+ return(True)
+ except:
+ return(False)
+
+ pars_missing = [p for p in self._validate_yaml_pars
+ if not validate_nested_pars(self.config, p)]
+ if len(pars_missing) > 0:
+ logging.error(f'Missing item(s) in configuration: {", ".join(pars_missing)}')
+ return(False)
+ else:
+ return(True)
+
+ def write_to_file(self, out_file):
+ with open(out_file, 'w') as outf:
+ yaml.dump(self.config, outf)
+
+
+class TomoDetectorConfig(YamlDetectorConfig):
+ def __init__(self, config_source):
+ validate_yaml_pars = ['detector',
+ 'lens_magnification',
+ 'detector:pixels:rows',
+ 'detector:pixels:columns',
+ *[f'detector:pixels:size:{i}' for i in range(2)]]
+ super().__init__(config_source, validate_yaml_pars=validate_yaml_pars)
+
+ @property
+ @cache
+ def lens_magnification(self):
+ lens_magnification = self.config.get('lens_magnification')
+ if not isinstance(lens_magnification, (int, float)) or lens_magnification <= 0.:
+ illegal_value(lens_magnification, 'lens_magnification', 'detector file')
+ logging.warning('Using default lens_magnification value of 1.0')
+ return(1.0)
+ else:
+ return(lens_magnification)
+
+ @property
+ @cache
+ def pixel_size(self):
+ pixel_size = self.config['detector'].get('pixels').get('size')
+ if isinstance(pixel_size, (int, float)):
+ if pixel_size <= 0.:
+ illegal_value(pixel_size, 'pixel_size', 'detector file')
+ return(None)
+ pixel_size /= self.lens_magnification
+ elif isinstance(pixel_size, list):
+ if ((len(pixel_size) > 2) or
+ (len(pixel_size) == 2 and pixel_size[0] != pixel_size[1])):
+ illegal_value(pixel_size, 'pixel size', 'detector file')
+ return(None)
+ elif not is_num(pixel_size[0], 0.):
+ illegal_value(pixel_size, 'pixel size', 'detector file')
+ return(None)
+ else:
+ pixel_size = pixel_size[0]/self.lens_magnification
+ else:
+ illegal_value(pixel_size, 'pixel size', 'detector file')
+ return(None)
+
+ return(pixel_size)
+
+ @property
+ @cache
+ def dimensions(self):
+ pixels = self.config['detector'].get('pixels')
+ num_rows = pixels.get('rows')
+ if not is_int(num_rows, 1):
+ illegal_value(num_rows, 'rows', 'detector file')
+ return(None)
+ num_columns = pixels.get('columns')
+ if not is_int(num_columns, 1):
+ illegal_value(num_columns, 'columns', 'detector file')
+ return(None)
+ return(num_rows, num_columns)
+
+
+class EDDDetectorConfig(YamlDetectorConfig):
+ def __init__(self, config_source):
+ validate_yaml_pars = ['num_bins',
+ 'max_E',
+ # 'angle', # KLS leave this out for now -- I think it has to do with the relative geometry of sample, beam, and detector (not a property of the detector on its own), so may not belong here in the DetectorConfig object?
+ 'tth_angle',
+ 'slope',
+ 'intercept']
+ super().__init__(config_source, validate_yaml_pars=validate_yaml_pars)
+
+ @property
+ @cache
+ def num_bins(self):
+ try:
+ num_bins = int(self.config['num_bins'])
+ if num_bins <= 0:
+ raise(ValueError)
+ else:
+ return(num_bins)
+ except:
+ illegal_value(self.config['num_bins'], 'num_bins')
+ @property
+ @cache
+ def max_E(self):
+ try:
+ max_E = float(self.config['max_E'])
+ if max_E <= 0:
+ raise(ValueError)
+ else:
+ return(max_E)
+ except:
+ illegal_value(self.config['max_E'], 'max_E')
+ return(None)
+
+ @property
+ def bin_energies(self):
+ return(self.slope * np.linspace(0, self.max_E, self.num_bins, endpoint=False) +
+ self.intercept)
+
+ @property
+ def tth_angle(self):
+ try:
+ return(float(self.config['tth_angle']))
+ except:
+ illegal_value(tth_angle, 'tth_angle')
+ return(None)
+ @tth_angle.setter
+ def tth_angle(self, value):
+ try:
+ self._config['tth_angle'] = float(value)
+ except:
+ illegal_value(value, 'tth_angle')
+
+ @property
+ def slope(self):
+ try:
+ return(float(self.config['slope']))
+ except:
+ illegal_value(slope, 'slope')
+ return(None)
+ @slope.setter
+ def slope(self, value):
+ try:
+ self._config['slope'] = float(value)
+ except:
+ illegal_value(value, 'slope')
+
+ @property
+ def intercept(self):
+ try:
+ return(float(self.config['intercept']))
+ except:
+ illegal_value(intercept, 'intercept')
+ return(None)
+ @intercept.setter
+ def intercept(self, value):
+ try:
+ self._config['intercept'] = float(value)
+ except:
+ illegal_value(value, 'intercept')
+
+
+# class HexrdDetectorConfig(YamlDetectorConfig):
+# def __init__(self, config_source, detector_names=[]):
+# self.detector_names = detector_names
+# validate_yaml_pars_each_detector = [*[f'buffer:{i}' for i in range(2)],
+# 'distortion:function_name',
+# *[f'distortion:parameters:{i}' for i in range(6)],
+# 'pixels:columns',
+# 'pixels:rows',
+# *['pixels:size:%i' % i for i in range(2)],
+# 'saturation_level',
+# *[f'transform:tilt:{i}' for i in range(3)],
+# *[f'transform:translation:{i}' for i in range(3)]]
+# validate_yaml_pars = []
+# for detector_name in self.detector_names:
+# validate_yaml_pars += [f'detectors:{detector_name}:{par}' for par in validate_yaml_pars_each_detector]
+
+# super().__init__(config_source, validate_yaml_pars=validate_yaml_pars)
+
+# def validate(self):
+# yaml_valid = YamlDetectorConfig.validate(self)
+# if not yaml_valid:
+# return(False)
+# else:
+# hedm_instrument = HEDMInstrument(instrument_config=self.config)
+# for detector_name in self.detector_names:
+# if detector_name in hedm_instrument.detectors:
+# if isinstance(hedm_instrument.detectors[detector_name], PlanarDetector):
+# continue
+# else:
+# return(False)
+# else:
+# return(False)
+# return(True)
+
+# class SAXSWAXSDetectorConfig(DetectorConfig):
+# def __init__(self, config_source):
+# super().__init__(config_source)
+
+# @property
+# def ai(self):
+# return(self.config)
+# @ai.setter
+# def ai(self, value):
+# if isinstance(value, pyFAI.azimuthalIntegrator.AzimuthalIntegrator):
+# self.config = ai
+# else:
+# illegal_value(value, 'azimuthal integrator')
+
+# # pyFAI will perform its own error-checking for the mask attribute.
+# mask = property(self.ai.get_mask, self.ai,set_mask)
+
+
+
diff -r 059819ea1f0e -r 26f99fdd8d61 fit.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/fit.py Thu Jul 28 16:05:24 2022 +0000
@@ -0,0 +1,663 @@
+#!/usr/bin/env python3
+
+# -*- coding: utf-8 -*-
+"""
+Created on Mon Dec 6 15:36:22 2021
+
+@author: rv43
+"""
+
+import sys
+import re
+import logging
+import numpy as np
+
+from asteval import Interpreter
+from copy import deepcopy
+#from lmfit import Minimizer
+from lmfit import Model, Parameters
+from lmfit.models import ConstantModel, LinearModel, QuadraticModel, PolynomialModel,\
+ StepModel, RectangleModel, GaussianModel, LorentzianModel
+
+#from .general import *
+from general import is_index, index_nearest, quickPlot
+
+# sigma = fwhm_factor*fwhm
+fwhm_factor = {
+ 'gaussian' : f'fwhm/(2*sqrt(2*log(2)))',
+ 'lorentzian' : f'0.5*fwhm',
+ 'splitlorentzian' : f'0.5*fwhm', # sigma = sigma_r
+ 'voight' : f'0.2776*fwhm', # sigma = gamma
+ 'pseudovoight' : f'0.5*fwhm'} # fraction = 0.5
+
+# amplitude = height_factor*height*fwhm
+height_factor = {
+ 'gaussian' : f'height*fwhm*0.5*sqrt(pi/log(2))',
+ 'lorentzian' : f'height*fwhm*0.5*pi',
+ 'splitlorentzian' : f'height*fwhm*0.5*pi', # sigma = sigma_r
+ 'voight' : f'3.334*height*fwhm', # sigma = gamma
+ 'pseudovoight' : f'1.268*height*fwhm'} # fraction = 0.5
+
+class Fit:
+ """Wrapper class for lmfit
+ """
+ def __init__(self, x, y, models=None, **kwargs):
+ self._x = x
+ self._y = y
+ self._model = None
+ self._parameters = Parameters()
+ self._result = None
+ if models is not None:
+ if callable(models) or isinstance(models, str):
+ kwargs = self.add_model(models, **kwargs)
+ elif isinstance(models, (tuple, list)):
+ for model in models:
+ kwargs = self.add_model(model, **kwargs)
+ self.fit(**kwargs)
+
+ @classmethod
+ def fit_data(cls, x, y, models, **kwargs):
+ return cls(x, y, models, **kwargs)
+
+ @property
+ def best_errors(self):
+ if self._result is None:
+ return None
+ errors = {}
+ names = sorted(self._result.params)
+ for name in names:
+ par = self._result.params[name]
+ errors[name] = par.stderr
+ return errors
+
+ @property
+ def best_fit(self):
+ if self._result is None:
+ return None
+ return self._result.best_fit
+
+ @property
+ def best_parameters(self):
+ if self._result is None:
+ return None
+ parameters = []
+ names = sorted(self._result.params)
+ for name in names:
+ par = self._result.params[name]
+ parameters.append({'name' : par.name, 'value' : par.value, 'error' : par.stderr,
+ 'init_value' : par.init_value, 'min' : par.min, 'max' : par.max,
+ 'vary' : par.vary, 'expr' : par.expr})
+ return parameters
+
+ @property
+ def best_values(self):
+ if self._result is None:
+ return None
+ return self._result.params.valuesdict()
+
+ @property
+ def chisqr(self):
+ return self._result.chisqr
+
+ @property
+ def covar(self):
+ return self._result.covar
+
+ @property
+ def init_values(self):
+ if self._result is None:
+ return None
+ return self._result.init_values
+
+ @property
+ def num_func_eval(self):
+ return self._result.nfev
+
+ @property
+ def redchi(self):
+ return self._result.redchi
+
+ @property
+ def residual(self):
+ return self._result.residual
+
+ @property
+ def success(self):
+ if not self._result.success:
+# print(f'ier = {self._result.ier}')
+# print(f'lmdif_message = {self._result.lmdif_message}')
+# print(f'message = {self._result.message}')
+# print(f'nfev = {self._result.nfev}')
+# print(f'redchi = {self._result.redchi}')
+# print(f'success = {self._result.success}')
+ if self._result.ier == 0 or self._result.ier == 5:
+ logging.warning(f'ier = {self._result.ier}: {self._result.message}')
+ else:
+ logging.warning(f'ier = {self._result.ier}: {self._result.message}')
+ return True
+# self.print_fit_report()
+# self.plot()
+ return self._result.success
+
+ @property
+ def var_names(self):
+ """Intended to be used with covar
+ """
+ if self._result is None:
+ return None
+ return self._result.var_names
+
+ def print_fit_report(self, show_correl=False):
+ if self._result is not None:
+ print(self._result.fit_report(show_correl=show_correl))
+
+ def add_parameter(self, **parameter):
+ if not isinstance(parameter, dict):
+ illegal_value(parameter, 'parameter', 'add_parameter')
+ return
+ self._parameters.add(**parameter)
+
+ def add_model(self, model, prefix=None, parameters=None, **kwargs):
+ # Create the new model
+# print('\nAt start adding model:')
+# self._parameters.pretty_print()
+ if prefix is not None and not isinstance(prefix, str):
+ logging.warning('Ignoring illegal prefix: {model} {type(model)}')
+ prefix = None
+ if callable(model):
+ newmodel = Model(model, prefix=prefix)
+ elif isinstance(model, str):
+ if model == 'constant':
+ newmodel = ConstantModel(prefix=prefix)
+ elif model == 'linear':
+ newmodel = LinearModel(prefix=prefix)
+ elif model == 'quadratic':
+ newmodel = QuadraticModel(prefix=prefix)
+ elif model == 'gaussian':
+ newmodel = GaussianModel(prefix=prefix)
+ elif model == 'step':
+ form = kwargs.get('form')
+ if form is not None:
+ del kwargs['form']
+ if form is None or form not in ('linear', 'atan', 'arctan', 'erf', 'logistic'):
+ logging.error(f'Illegal form parameter for build-in step model ({form})')
+ return kwargs
+ newmodel = StepModel(prefix=prefix, form=form)
+ elif model == 'rectangle':
+ form = kwargs.get('form')
+ if form is not None:
+ del kwargs['form']
+ if form is None or form not in ('linear', 'atan', 'arctan', 'erf', 'logistic'):
+ logging.error(f'Illegal form parameter for build-in rectangle model ({form})')
+ return kwargs
+ newmodel = RectangleModel(prefix=prefix, form=form)
+ else:
+ logging.error('Unknown build-in fit model')
+ return kwargs
+ else:
+ illegal_value(model, 'model', 'add_model')
+ return kwargs
+
+ # Add the new model to the current one
+ if self._model is None:
+ self._model = newmodel
+ else:
+ self._model += newmodel
+ if self._parameters is None:
+ self._parameters = newmodel.make_params()
+ else:
+ self._parameters += newmodel.make_params()
+# print('\nAfter adding model:')
+# self._parameters.pretty_print()
+
+ # Initialize the model parameters
+ if prefix is None:
+ prefix = ""
+ if parameters is not None:
+ if not isinstance(parameters, (tuple, list)):
+ illegal_value(parameters, 'parameters', 'add_model')
+ return kwargs
+ for parameter in parameters:
+ if not isinstance(parameter, dict):
+ illegal_value(parameter, 'parameter in parameters', 'add_model')
+ return kwargs
+ parameter['name'] = prefix+parameter['name']
+ self._parameters.add(**parameter)
+ for name, value in kwargs.items():
+ if isinstance(value, (int, float)):
+ self._parameters.add(prefix+name, value=value)
+# print('\nAt end add_model:')
+# self._parameters.pretty_print()
+
+ return kwargs
+
+ def fit(self, interactive=False, guess=False, **kwargs):
+ if self._model is None:
+ logging.error('Undefined fit model')
+ return
+ # Current parameter values
+ pars = self._parameters.valuesdict()
+ # Apply parameter updates through keyword arguments
+ for par in set(pars) & set(kwargs):
+ pars[par] = kwargs.pop(par)
+ self._parameters[par].set(value=pars[par])
+ # Check for uninitialized parameters
+ for par, value in pars.items():
+ if value is None or np.isinf(value) or np.isnan(value):
+ if interactive:
+ self._parameters[par].set(value=
+ input_num(f'Enter an initial value for {par}: '))
+ else:
+ self._parameters[par].set(value=1.0)
+# print('\nAt start actual fit:')
+# print(f'kwargs = {kwargs}')
+# self._parameters.pretty_print()
+# print(f'parameters:\n{self._parameters}')
+# print(f'x = {self._x}')
+# print(f'len(x) = {len(self._x)}')
+# print(f'y = {self._y}')
+# print(f'len(y) = {len(self._y)}')
+ if guess:
+ self._parameters = self._model.guess(self._y, x=self._x)
+ self._result = self._model.fit(self._y, self._parameters, x=self._x, **kwargs)
+# print('\nAt end actual fit:')
+# print(f'var_names:\n{self._result.var_names}')
+# print(f'stderr:\n{np.sqrt(np.diagonal(self._result.covar))}')
+# self._parameters.pretty_print()
+# print(f'parameters:\n{self._parameters}')
+
+ def plot(self):
+ if self._result is None:
+ return
+ components = self._result.eval_components()
+ plots = ((self._x, self._y, '.'), (self._x, self._result.best_fit, 'k-'),
+ (self._x, self._result.init_fit, 'g-'))
+ legend = ['data', 'best fit', 'init']
+ if len(components) > 1:
+ for modelname, y in components.items():
+ if isinstance(y, (int, float)):
+ y *= np.ones(len(self._x))
+ plots += ((self._x, y, '--'),)
+# if modelname[-1] == '_':
+# legend.append(modelname[:-1])
+# else:
+# legend.append(modelname)
+ quickPlot(plots, legend=legend, block=True)
+
+ @staticmethod
+ def guess_init_peak(x, y, *args, center_guess=None, use_max_for_center=True):
+ """ Return a guess for the initial height, center and fwhm for a peak
+ """
+ center_guesses = None
+ if len(x) != len(y):
+ logging.error(f'Illegal x and y lengths ({len(x)}, {len(y)}), skip initial guess')
+ return None, None, None
+ if isinstance(center_guess, (int, float)):
+ if len(args):
+ logging.warning('Ignoring additional arguments for single center_guess value')
+ elif isinstance(center_guess, (tuple, list, np.ndarray)):
+ if len(center_guess) == 1:
+ logging.warning('Ignoring additional arguments for single center_guess value')
+ if not isinstance(center_guess[0], (int, float)):
+ raise ValueError(f'Illegal center_guess type ({type(center_guess[0])})')
+ center_guess = center_guess[0]
+ else:
+ if len(args) != 1:
+ raise ValueError(f'Illegal number of arguments ({len(args)})')
+ n = args[0]
+ if not is_index(n, 0, len(center_guess)):
+ raise ValueError('Illegal argument')
+ center_guesses = center_guess
+ center_guess = center_guesses[n]
+ elif center_guess is not None:
+ raise ValueError(f'Illegal center_guess type ({type(center_guess)})')
+
+ # Sort the inputs
+ index = np.argsort(x)
+ x = x[index]
+ y = y[index]
+ miny = y.min()
+# print(f'miny = {miny}')
+# print(f'x_range = {x[0]} {x[-1]} {len(x)}')
+# print(f'y_range = {y[0]} {y[-1]} {len(y)}')
+
+# xx = x
+# yy = y
+ # Set range for current peak
+# print(f'center_guesses = {center_guesses}')
+ if center_guesses is not None:
+ if n == 0:
+ low = 0
+ upp = index_nearest(x, (center_guesses[0]+center_guesses[1])/2)
+ elif n == len(center_guesses)-1:
+ low = index_nearest(x, (center_guesses[n-1]+center_guesses[n])/2)
+ upp = len(x)
+ else:
+ low = index_nearest(x, (center_guesses[n-1]+center_guesses[n])/2)
+ upp = index_nearest(x, (center_guesses[n]+center_guesses[n+1])/2)
+# print(f'low = {low}')
+# print(f'upp = {upp}')
+ x = x[low:upp]
+ y = y[low:upp]
+# quickPlot(x, y, vlines=(x[0], center_guess, x[-1]), block=True)
+
+ # Estimate FHHM
+ maxy = y.max()
+# print(f'x_range = {x[0]} {x[-1]} {len(x)}')
+# print(f'y_range = {y[0]} {y[-1]} {len(y)} {miny} {maxy}')
+# print(f'center_guess = {center_guess}')
+ if center_guess is None:
+ center_index = np.argmax(y)
+ center = x[center_index]
+ height = maxy-miny
+ else:
+ if use_max_for_center:
+ center_index = np.argmax(y)
+ center = x[center_index]
+ if center_index < 0.1*len(x) or center_index > 0.9*len(x):
+ center_index = index_nearest(x, center_guess)
+ center = center_guess
+ else:
+ center_index = index_nearest(x, center_guess)
+ center = center_guess
+ height = y[center_index]-miny
+# print(f'center_index = {center_index}')
+# print(f'center = {center}')
+# print(f'height = {height}')
+ half_height = miny+0.5*height
+# print(f'half_height = {half_height}')
+ fwhm_index1 = 0
+ for i in range(center_index, fwhm_index1, -1):
+ if y[i] < half_height:
+ fwhm_index1 = i
+ break
+# print(f'fwhm_index1 = {fwhm_index1} {x[fwhm_index1]}')
+ fwhm_index2 = len(x)-1
+ for i in range(center_index, fwhm_index2):
+ if y[i] < half_height:
+ fwhm_index2 = i
+ break
+# print(f'fwhm_index2 = {fwhm_index2} {x[fwhm_index2]}')
+# quickPlot((x,y,'o'), vlines=(x[fwhm_index1], center, x[fwhm_index2]), block=True)
+ if fwhm_index1 == 0 and fwhm_index2 < len(x)-1:
+ fwhm = 2*(x[fwhm_index2]-center)
+ elif fwhm_index1 > 0 and fwhm_index2 == len(x)-1:
+ fwhm = 2*(center-x[fwhm_index1])
+ else:
+ fwhm = x[fwhm_index2]-x[fwhm_index1]
+# print(f'fwhm_index1 = {fwhm_index1} {x[fwhm_index1]}')
+# print(f'fwhm_index2 = {fwhm_index2} {x[fwhm_index2]}')
+# print(f'fwhm = {fwhm}')
+
+ # Return height, center and FWHM
+# quickPlot((x,y,'o'), (xx,yy), vlines=(x[fwhm_index1], center, x[fwhm_index2]), block=True)
+ return height, center, fwhm
+
+
+class FitMultipeak(Fit):
+ """Fit data with multiple peaks
+ """
+ def __init__(self, x, y, normalize=True):
+ super().__init__(x, deepcopy(y))
+ self._norm = None
+ self._fwhm_max = None
+ self._sigma_max = None
+ if normalize:
+ self._normalize()
+ #quickPlot((self._x,self._y), block=True)
+
+ @classmethod
+ def fit_multipeak(cls, x, y, centers, peak_models='gaussian', center_exprs=None, fit_type=None,
+ background_order=None, fwhm_max=None, plot_components=None):
+ """Make sure that centers and fwhm_max are in the correct units and consistent with expr
+ for a uniform fit (fit_type == 'uniform')
+ """
+ fit = cls(x, y)
+ success = fit.fit(centers, fit_type=fit_type, peak_models=peak_models, fwhm_max=fwhm_max,
+ center_exprs=center_exprs, background_order=background_order,
+ plot_components=plot_components)
+ if success:
+ return fit.best_fit, fit.residual, fit.best_values, fit.best_errors, fit.redchi, \
+ fit.success
+ else:
+ return np.array([]), np.array([]), {}, {}, sys.float_info.max, False
+
+ def fit(self, centers, fit_type=None, peak_models=None, center_exprs=None, fwhm_max=None,
+ background_order=None, plot_components=None, param_constraint=False):
+ self._fwhm_max = fwhm_max
+ # Create the multipeak model
+ self._create_model(centers, fit_type, peak_models, center_exprs, background_order,
+ param_constraint)
+
+ # Perform the fit
+ try:
+ if param_constraint:
+ super().fit(fit_kws={'xtol' : 1.e-5, 'ftol' : 1.e-5, 'gtol' : 1.e-5})
+ else:
+ super().fit()
+ except:
+ return False
+
+ # Check for valid fit parameter results
+ fit_failure = self._check_validity()
+ success = True
+ if fit_failure:
+ if param_constraint:
+ logging.warning(' -> Should not happen with param_constraint set, fail the fit')
+ success = False
+ else:
+ logging.info(' -> Retry fitting with constraints')
+ self.fit(centers, fit_type, peak_models, center_exprs, fwhm_max=fwhm_max,
+ background_order=background_order, plot_components=plot_components,
+ param_constraint=True)
+ else:
+ # Renormalize the data and results
+ self._renormalize()
+
+ # Print report and plot components if requested
+ if plot_components is not None:
+ self.print_fit_report()
+ self.plot()
+
+ return success
+
+ def _create_model(self, centers, fit_type=None, peak_models=None, center_exprs=None,
+ background_order=None, param_constraint=False):
+ """Create the multipeak model
+ """
+ if isinstance(centers, (int, float)):
+ centers = [centers]
+ num_peaks = len(centers)
+ if peak_models is None:
+ peak_models = num_peaks*['gaussian']
+ elif isinstance(peak_models, str):
+ peak_models = num_peaks*[peak_models]
+ if len(peak_models) != num_peaks:
+ raise ValueError(f'Inconsistent number of peaks in peak_models ({len(peak_models)} vs '+
+ f'{num_peaks})')
+ if num_peaks == 1:
+ if fit_type is not None:
+ logging.warning('Ignoring fit_type input for fitting one peak')
+ fit_type = None
+ if center_exprs is not None:
+ logging.warning('Ignoring center_exprs input for fitting one peak')
+ center_exprs = None
+ else:
+ if fit_type == 'uniform':
+ if center_exprs is None:
+ center_exprs = [f'scale_factor*{cen}' for cen in centers]
+ if len(center_exprs) != num_peaks:
+ raise ValueError(f'Inconsistent number of peaks in center_exprs '+
+ f'({len(center_exprs)} vs {num_peaks})')
+ elif fit_type == 'unconstrained' or fit_type is None:
+ if center_exprs is not None:
+ logging.warning('Ignoring center_exprs input for unconstrained fit')
+ center_exprs = None
+ else:
+ raise ValueError(f'Illegal fit_type in fit_multigaussian {fit_type}')
+ self._sigma_max = None
+ if param_constraint:
+ min_value = sys.float_info.min
+ if self._fwhm_max is not None:
+ self._sigma_max = np.zeros(num_peaks)
+ else:
+ min_value = None
+
+ # Reset the fit
+ self._model = None
+ self._parameters = Parameters()
+ self._result = None
+
+ # Add background model
+ if background_order is not None:
+ if background_order == 0:
+ self.add_model('constant', prefix='background', c=0.0)
+ elif background_order == 1:
+ self.add_model('linear', prefix='background', slope=0.0, intercept=0.0)
+ elif background_order == 2:
+ self.add_model('quadratic', prefix='background', a=0.0, b=0.0, c=0.0)
+ else:
+ raise ValueError(f'background_order = {background_order}')
+
+ # Add peaks and guess initial fit parameters
+ ast = Interpreter()
+ if num_peaks == 1:
+ height_init, cen_init, fwhm_init = self.guess_init_peak(self._x, self._y)
+ if self._fwhm_max is not None and fwhm_init > self._fwhm_max:
+ fwhm_init = self._fwhm_max
+ ast(f'fwhm = {fwhm_init}')
+ ast(f'height = {height_init}')
+ sig_init = ast(fwhm_factor[peak_models[0]])
+ amp_init = ast(height_factor[peak_models[0]])
+ sig_max = None
+ if self._sigma_max is not None:
+ ast(f'fwhm = {self._fwhm_max}')
+ sig_max = ast(fwhm_factor[peak_models[0]])
+ self._sigma_max[0] = sig_max
+ self.add_model(peak_models[0], parameters=(
+ {'name' : 'amplitude', 'value' : amp_init, 'min' : min_value},
+ {'name' : 'center', 'value' : cen_init, 'min' : min_value},
+ {'name' : 'sigma', 'value' : sig_init, 'min' : min_value, 'max' : sig_max}))
+ else:
+ if fit_type == 'uniform':
+ self.add_parameter(name='scale_factor', value=1.0)
+ for i in range(num_peaks):
+ height_init, cen_init, fwhm_init = self.guess_init_peak(self._x, self._y, i,
+ center_guess=centers)
+ if self._fwhm_max is not None and fwhm_init > self._fwhm_max:
+ fwhm_init = self._fwhm_max
+ ast(f'fwhm = {fwhm_init}')
+ ast(f'height = {height_init}')
+ sig_init = ast(fwhm_factor[peak_models[i]])
+ amp_init = ast(height_factor[peak_models[i]])
+ sig_max = None
+ if self._sigma_max is not None:
+ ast(f'fwhm = {self._fwhm_max}')
+ sig_max = ast(fwhm_factor[peak_models[i]])
+ self._sigma_max[i] = sig_max
+ if fit_type == 'uniform':
+ self.add_model(peak_models[i], prefix=f'peak{i+1}_', parameters=(
+ {'name' : 'amplitude', 'value' : amp_init, 'min' : min_value},
+ {'name' : 'center', 'expr' : center_exprs[i], 'min' : min_value},
+ {'name' : 'sigma', 'value' : sig_init, 'min' : min_value,
+ 'max' : sig_max}))
+ else:
+ self.add_model('gaussian', prefix=f'peak{i+1}_', parameters=(
+ {'name' : 'amplitude', 'value' : amp_init, 'min' : min_value},
+ {'name' : 'center', 'value' : cen_init, 'min' : min_value},
+ {'name' : 'sigma', 'value' : sig_init, 'min' : min_value,
+ 'max' : sig_max}))
+
+ def _check_validity(self):
+ """Check for valid fit parameter results
+ """
+ fit_failure = False
+ index = re.compile(r'\d+')
+ for parameter in self.best_parameters:
+ name = parameter['name']
+ if ((('amplitude' in name or 'height' in name) and parameter['value'] <= 0.0) or
+ (('sigma' in name or 'fwhm' in name) and parameter['value'] <= 0.0) or
+ ('center' in name and parameter['value'] <= 0.0) or
+ (name == 'scale_factor' and parameter['value'] <= 0.0)):
+ logging.info(f'Invalid fit result for {name} ({parameter["value"]})')
+ fit_failure = True
+ if 'sigma' in name and self._sigma_max is not None:
+ if name == 'sigma':
+ sigma_max = self._sigma_max[0]
+ else:
+ sigma_max = self._sigma_max[int(index.search(name).group())-1]
+ i = int(index.search(name).group())-1
+ if parameter['value'] > sigma_max:
+ logging.info(f'Invalid fit result for {name} ({parameter["value"]})')
+ fit_failure = True
+ elif parameter['value'] == sigma_max:
+ logging.warning(f'Edge result on for {name} ({parameter["value"]})')
+ if 'fwhm' in name and self._fwhm_max is not None:
+ if parameter['value'] > self._fwhm_max:
+ logging.info(f'Invalid fit result for {name} ({parameter["value"]})')
+ fit_failure = True
+ elif parameter['value'] == self._fwhm_max:
+ logging.warning(f'Edge result on for {name} ({parameter["value"]})')
+ return fit_failure
+
+ def _normalize(self):
+ """Normalize the data
+ """
+ y_min = self._y.min()
+ self._norm = (y_min, self._y.max()-y_min)
+ if self._norm[1] == 0.0:
+ self._norm = None
+ else:
+ self._y = (self._y-self._norm[0])/self._norm[1]
+
+ def _renormalize(self):
+ """Renormalize the data and results
+ """
+ if self._norm is None:
+ return
+ self._y = self._norm[0]+self._norm[1]*self._y
+ self._result.best_fit = self._norm[0]+self._norm[1]*self._result.best_fit
+ for name in self._result.params:
+ par = self._result.params[name]
+ if 'amplitude' in name or 'height' in name or 'background' in name:
+ par.value *= self._norm[1]
+ if par.stderr is not None:
+ par.stderr *= self._norm[1]
+ if par.init_value is not None:
+ par.init_value *= self._norm[1]
+ if par.min is not None and not np.isinf(par.min):
+ par.min *= self._norm[1]
+ if par.max is not None and not np.isinf(par.max):
+ par.max *= self._norm[1]
+ if 'intercept' in name or 'backgroundc' in name:
+ par.value += self._norm[0]
+ if par.init_value is not None:
+ par.init_value += self._norm[0]
+ if par.min is not None and not np.isinf(par.min):
+ par.min += self._norm[0]
+ if par.max is not None and not np.isinf(par.max):
+ par.max += self._norm[0]
+ self._result.init_fit = self._norm[0]+self._norm[1]*self._result.init_fit
+ init_values = {}
+ for name in self._result.init_values:
+ init_values[name] = self._result.init_values[name]
+ if init_values[name] is None:
+ continue
+ if 'amplitude' in name or 'height' in name or 'background' in name:
+ init_values[name] *= self._norm[1]
+ if 'intercept' in name or 'backgroundc' in name:
+ init_values[name] += self._norm[0]
+ self._result.init_values = init_values
+ # Don't renormalized chisqr, it has no useful meaning in physical units
+ #self._result.chisqr *= self._norm[1]*self._norm[1]
+ if self._result.covar is not None:
+ for i, name in enumerate(self._result.var_names):
+ if 'amplitude' in name or 'height' in name or 'background' in name:
+ for j in range(len(self._result.var_names)):
+ if self._result.covar[i,j] is not None:
+ self._result.covar[i,j] *= self._norm[1]
+ if self._result.covar[j,i] is not None:
+ self._result.covar[j,i] *= self._norm[1]
+ # Don't renormalized redchi, it has no useful meaning in physical units
+ #self._result.redchi *= self._norm[1]*self._norm[1]
+ self._result.residual *= self._norm[1]
diff -r 059819ea1f0e -r 26f99fdd8d61 general.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/general.py Thu Jul 28 16:05:24 2022 +0000
@@ -0,0 +1,1374 @@
+#!/usr/bin/env python3
+
+# -*- coding: utf-8 -*-
+"""
+Created on Mon Dec 6 15:36:22 2021
+
+@author: rv43
+"""
+
+import logging
+
+import os
+import sys
+import re
+import yaml
+try:
+ import h5py
+except:
+ pass
+import numpy as np
+try:
+ import matplotlib.pyplot as plt
+ from matplotlib.widgets import Button
+except:
+ pass
+
+from ast import literal_eval
+from copy import deepcopy
+from time import time
+
+
+def depth_list(L): return isinstance(L, list) and max(map(depth_list, L))+1
+def depth_tuple(T): return isinstance(T, tuple) and max(map(depth_tuple, T))+1
+def unwrap_tuple(T):
+ if depth_tuple(T) > 1 and len(T) == 1:
+ T = unwrap_tuple(*T)
+ return T
+
+def illegal_value(value, name, location=None, exit_flag=False):
+ if not isinstance(location, str):
+ location = ''
+ else:
+ location = f'in {location} '
+ if isinstance(name, str):
+ logging.error(f'Illegal value for {name} {location}({value}, {type(value)})')
+ else:
+ logging.error(f'Illegal value {location}({value}, {type(value)})')
+ if exit_flag:
+ raise ValueError
+
+def is_int(v, v_min=None, v_max=None):
+ """Value is an integer in range v_min <= v <= v_max.
+ """
+ if not isinstance(v, int):
+ return False
+ if v_min is not None and not isinstance(v_min, int):
+ illegal_value(v_min, 'v_min', 'is_int')
+ return False
+ if v_max is not None and not isinstance(v_max, int):
+ illegal_value(v_max, 'v_max', 'is_int')
+ return False
+ if v_min is not None and v_max is not None and v_min > v_max:
+ logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})')
+ return False
+ if (v_min is not None and v < v_min) or (v_max is not None and v > v_max):
+ return False
+ return True
+
+def is_int_pair(v, v_min=None, v_max=None):
+ """Value is an integer pair, each in range v_min <= v[i] <= v_max or
+ v_min[i] <= v[i] <= v_max[i].
+ """
+ if not (isinstance(v, (tuple, list)) and len(v) == 2 and isinstance(v[0], int) and
+ isinstance(v[1], int)):
+ return False
+ if v_min is not None or v_max is not None:
+ if (v_min is None or isinstance(v_min, int)) and (v_max is None or isinstance(v_max, int)):
+ if True in [True if not is_int(vi, v_min=v_min, v_max=v_max) else False for vi in v]:
+ return False
+ elif is_int_pair(v_min) and is_int_pair(v_max):
+ if True in [True if v_min[i] > v_max[i] else False for i in range(2)]:
+ logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})')
+ return False
+ if True in [True if not is_int(v[i], v_min[i], v_max[i]) else False for i in range(2)]:
+ return False
+ elif is_int_pair(v_min) and (v_max is None or isinstance(v_max, int)):
+ if True in [True if not is_int(v[i], v_min=v_min[i], v_max=v_max) else False
+ for i in range(2)]:
+ return False
+ elif (v_min is None or isinstance(v_min, int)) and is_int_pair(v_max):
+ if True in [True if not is_int(v[i], v_min=v_min, v_max=v_max[i]) else False
+ for i in range(2)]:
+ return False
+ else:
+ logging.error(f'Illegal v_min or v_max input ({v_min} {type(v_min)} and '+
+ f'{v_max} {type(v_max)})')
+ return False
+ return True
+
+def is_int_series(l, v_min=None, v_max=None):
+ """Value is a tuple or list of integers, each in range v_min <= l[i] <= v_max.
+ """
+ if v_min is not None and not isinstance(v_min, int):
+ illegal_value(v_min, 'v_min', 'is_int_series')
+ return False
+ if v_max is not None and not isinstance(v_max, int):
+ illegal_value(v_max, 'v_max', 'is_int_series')
+ return False
+ if not isinstance(l, (tuple, list)):
+ return False
+ if True in [True if not is_int(v, v_min=v_min, v_max=v_max) else False for v in l]:
+ return False
+ return True
+
+def is_num(v, v_min=None, v_max=None):
+ """Value is a number in range v_min <= v <= v_max.
+ """
+ if not isinstance(v, (int, float)):
+ return False
+ if v_min is not None and not isinstance(v_min, (int, float)):
+ illegal_value(v_min, 'v_min', 'is_num')
+ return False
+ if v_max is not None and not isinstance(v_max, (int, float)):
+ illegal_value(v_max, 'v_max', 'is_num')
+ return False
+ if v_min is not None and v_max is not None and v_min > v_max:
+ logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})')
+ return False
+ if (v_min is not None and v < v_min) or (v_max is not None and v > v_max):
+ return False
+ return True
+
+def is_num_pair(v, v_min=None, v_max=None):
+ """Value is a number pair, each in range v_min <= v[i] <= v_max or
+ v_min[i] <= v[i] <= v_max[i].
+ """
+ if not (isinstance(v, (tuple, list)) and len(v) == 2 and isinstance(v[0], (int, float)) and
+ isinstance(v[1], (int, float))):
+ return False
+ if v_min is not None or v_max is not None:
+ if ((v_min is None or isinstance(v_min, (int, float))) and
+ (v_max is None or isinstance(v_max, (int, float)))):
+ if True in [True if not is_num(vi, v_min=v_min, v_max=v_max) else False for vi in v]:
+ return False
+ elif is_num_pair(v_min) and is_num_pair(v_max):
+ if True in [True if v_min[i] > v_max[i] else False for i in range(2)]:
+ logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})')
+ return False
+ if True in [True if not is_num(v[i], v_min[i], v_max[i]) else False for i in range(2)]:
+ return False
+ elif is_num_pair(v_min) and (v_max is None or isinstance(v_max, (int, float))):
+ if True in [True if not is_num(v[i], v_min=v_min[i], v_max=v_max) else False
+ for i in range(2)]:
+ return False
+ elif (v_min is None or isinstance(v_min, (int, float))) and is_num_pair(v_max):
+ if True in [True if not is_num(v[i], v_min=v_min, v_max=v_max[i]) else False
+ for i in range(2)]:
+ return False
+ else:
+ logging.error(f'Illegal v_min or v_max input ({v_min} {type(v_min)} and '+
+ f'{v_max} {type(v_max)})')
+ return False
+ return True
+
+def is_num_series(l, v_min=None, v_max=None):
+ """Value is a tuple or list of numbers, each in range v_min <= l[i] <= v_max.
+ """
+ if v_min is not None and not isinstance(v_min, (int, float)):
+ illegal_value(v_min, 'v_min', 'is_num_series')
+ return False
+ if v_max is not None and not isinstance(v_max, (int, float)):
+ illegal_value(v_max, 'v_max', 'is_num_series')
+ return False
+ if not isinstance(l, (tuple, list)):
+ return False
+ if True in [True if not is_num(v, v_min=v_min, v_max=v_max) else False for v in l]:
+ return False
+ return True
+
+def is_index(v, v_min=0, v_max=None):
+ """Value is an array index in range v_min <= v < v_max.
+ NOTE v_max IS NOT included!
+ """
+ if isinstance(v_max, int):
+ if v_max <= v_min:
+ logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})')
+ return False
+ v_max -= 1
+ return is_int(v, v_min, v_max)
+
+def is_index_range(v, v_min=0, v_max=None):
+ """Value is an array index range in range v_min <= v[0] <= v[1] <= v_max.
+ NOTE v_max IS included!
+ """
+ if not is_int_pair(v):
+ return False
+ if not isinstance(v_min, int):
+ illegal_value(v_min, 'v_min', 'is_index_range')
+ return False
+ if v_max is not None:
+ if not isinstance(v_max, int):
+ illegal_value(v_max, 'v_max', 'is_index_range')
+ return False
+ if v_max < v_min:
+ logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})')
+ return False
+ if not v_min <= v[0] <= v[1] or (v_max is not None and v[1] > v_max):
+ return False
+ return True
+
+def index_nearest(a, value):
+ a = np.asarray(a)
+ if a.ndim > 1:
+ logging.warning(f'Illegal input array ({a}, {type(a)})')
+ # Round up for .5
+ value *= 1.0+sys.float_info.epsilon
+ return (int)(np.argmin(np.abs(a-value)))
+
+def index_nearest_low(a, value):
+ a = np.asarray(a)
+ if a.ndim > 1:
+ logging.warning(f'Illegal input array ({a}, {type(a)})')
+ index = int(np.argmin(np.abs(a-value)))
+ if value < a[index] and index > 0:
+ index -= 1
+ return index
+
+def index_nearest_upp(a, value):
+ a = np.asarray(a)
+ if a.ndim > 1:
+ logging.warning(f'Illegal input array ({a}, {type(a)})')
+ index = int(np.argmin(np.abs(a-value)))
+ if value > a[index] and index < a.size-1:
+ index += 1
+ return index
+
+def round_to_n(x, n=1):
+ if x == 0.0:
+ return 0
+ else:
+ return round(x, n-1-int(np.floor(np.log10(abs(x)))))
+
+def round_up_to_n(x, n=1):
+ xr = round_to_n(x, n)
+ if abs(x/xr) > 1.0:
+ xr += np.sign(x)*10**(np.floor(np.log10(abs(x)))+1-n)
+ return xr
+
+def trunc_to_n(x, n=1):
+ xr = round_to_n(x, n)
+ if abs(xr/x) > 1.0:
+ xr -= np.sign(x)*10**(np.floor(np.log10(abs(x)))+1-n)
+ return xr
+
+def string_to_list(s):
+ """Return a list of numbers by splitting/expanding a string on any combination of
+ dashes, commas, and/or whitespaces
+ e.g: '1, 3, 5-8,12 ' -> [1, 3, 5, 6, 7, 8, 12]
+ """
+ if not isinstance(s, str):
+ illegal_value(s, location='string_to_list')
+ return None
+ if not len(s):
+ return []
+ try:
+ list1 = [x for x in re.split('\s+,\s+|\s+,|,\s+|\s+|,', s.strip())]
+ except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
+ return None
+ try:
+ l = []
+ for l1 in list1:
+ l2 = [literal_eval(x) for x in re.split('\s+-\s+|\s+-|-\s+|\s+|-', l1)]
+ if len(l2) == 1:
+ l += l2
+ elif len(l2) == 2 and l2[1] > l2[0]:
+ l += [i for i in range(l2[0], l2[1]+1)]
+ else:
+ raise ValueError
+ except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
+ return None
+ return sorted(set(l))
+
+def get_trailing_int(string):
+ indexRegex = re.compile(r'\d+$')
+ mo = indexRegex.search(string)
+ if mo is None:
+ return None
+ else:
+ return int(mo.group())
+
+def input_int(s=None, v_min=None, v_max=None, default=None, inset=None):
+ if default is not None:
+ if not isinstance(default, int):
+ illegal_value(default, 'default', 'input_int')
+ return None
+ default_string = f' [{default}]'
+ else:
+ default_string = ''
+ if v_min is not None:
+ if not isinstance(v_min, int):
+ illegal_value(v_min, 'v_min', 'input_int')
+ return None
+ if default is not None and default < v_min:
+ logging.error('Illegal v_min, default combination ({v_min}, {default})')
+ return None
+ if v_max is not None:
+ if not isinstance(v_max, int):
+ illegal_value(v_max, 'v_max', 'input_int')
+ return None
+ if v_min is not None and v_min > v_max:
+ logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})')
+ return None
+ if default is not None and default > v_max:
+ logging.error('Illegal default, v_max combination ({default}, {v_max})')
+ return None
+ if inset is not None:
+ if (not isinstance(inset, (tuple, list)) or False in [True if isinstance(i, int) else
+ False for i in inset]):
+ illegal_value(inset, 'inset', 'input_int')
+ return None
+ if v_min is not None and v_max is not None:
+ v_range = f' ({v_min}, {v_max})'
+ elif v_min is not None:
+ v_range = f' (>= {v_min})'
+ elif v_max is not None:
+ v_range = f' (<= {v_max})'
+ else:
+ v_range = ''
+ if s is None:
+ print(f'Enter an integer{v_range}{default_string}: ')
+ else:
+ print(f'{s}{v_range}{default_string}: ')
+ try:
+ i = input()
+ if isinstance(i, str) and not len(i):
+ v = default
+ else:
+ v = literal_eval(i)
+ if inset and v not in inset:
+ raise ValueError(f'{v} not part of the set {inset}')
+ except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
+ v = None
+ except:
+ print('Unexpected error')
+ raise
+ if not is_int(v, v_min, v_max):
+ print('Illegal input, enter a valid integer')
+ v = input_int(s, v_min, v_max, default)
+ return v
+
+def input_num(s=None, v_min=None, v_max=None, default=None):
+ if default is not None:
+ if not isinstance(default, (int, float)):
+ illegal_value(default, 'default', 'input_num')
+ return None
+ default_string = f' [{default}]'
+ else:
+ default_string = ''
+ if v_min is not None:
+ if not isinstance(v_min, (int, float)):
+ illegal_value(vmin, 'vmin', 'input_num')
+ return None
+ if default is not None and default < v_min:
+ logging.error('Illegal v_min, default combination ({v_min}, {default})')
+ return None
+ if v_max is not None:
+ if not isinstance(v_max, (int, float)):
+ illegal_value(vmax, 'vmax', 'input_num')
+ return None
+ if v_min is not None and v_max < v_min:
+ logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})')
+ return None
+ if default is not None and default > v_max:
+ logging.error('Illegal default, v_max combination ({default}, {v_max})')
+ return None
+ if v_min is not None and v_max is not None:
+ v_range = f' ({v_min}, {v_max})'
+ elif v_min is not None:
+ v_range = f' (>= {v_min})'
+ elif v_max is not None:
+ v_range = f' (<= {v_max})'
+ else:
+ v_range = ''
+ if s is None:
+ print(f'Enter a number{v_range}{default_string}: ')
+ else:
+ print(f'{s}{v_range}{default_string}: ')
+ try:
+ i = input()
+ if isinstance(i, str) and not len(i):
+ v = default
+ else:
+ v = literal_eval(i)
+ except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
+ v = None
+ except:
+ print('Unexpected error')
+ raise
+ if not is_num(v, v_min, v_max):
+ print('Illegal input, enter a valid number')
+ v = input_num(s, v_min, v_max, default)
+ return v
+
+def input_int_list(s=None, v_min=None, v_max=None):
+ if v_min is not None and not isinstance(v_min, int):
+ illegal_value(vmin, 'vmin', 'input_int_list')
+ return None
+ if v_max is not None:
+ if not isinstance(v_max, int):
+ illegal_value(vmax, 'vmax', 'input_int_list')
+ return None
+ if v_max < v_min:
+ logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})')
+ return None
+ if v_min is not None and v_max is not None:
+ v_range = f' (each value in ({v_min}, {v_max}))'
+ elif v_min is not None:
+ v_range = f' (each value >= {v_min})'
+ elif v_max is not None:
+ v_range = f' (each value <= {v_max})'
+ else:
+ v_range = ''
+ if s is None:
+ print(f'Enter a series of integers{v_range}: ')
+ else:
+ print(f'{s}{v_range}: ')
+ try:
+ l = string_to_list(input())
+ except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
+ l = None
+ except:
+ print('Unexpected error')
+ raise
+ if (not isinstance(l, list) or
+ True in [True if not is_int(v, v_min, v_max) else False for v in l]):
+ print('Illegal input: enter a valid set of dash/comma/whitespace separated integers '+
+ 'e.g. 2,3,5-8,10')
+ l = input_int_list(s, v_min, v_max)
+ return l
+
+def input_yesno(s=None, default=None):
+ if default is not None:
+ if not isinstance(default, str):
+ illegal_value(default, 'default', 'input_yesno')
+ return None
+ if default.lower() in 'yes':
+ default = 'y'
+ elif default.lower() in 'no':
+ default = 'n'
+ else:
+ illegal_value(default, 'default', 'input_yesno')
+ return None
+ default_string = f' [{default}]'
+ else:
+ default_string = ''
+ if s is None:
+ print(f'Enter yes or no{default_string}: ')
+ else:
+ print(f'{s}{default_string}: ')
+ i = input()
+ if isinstance(i, str) and not len(i):
+ i = default
+ if i is not None and i.lower() in 'yes':
+ v = True
+ elif i is not None and i.lower() in 'no':
+ v = False
+ else:
+ print('Illegal input, enter yes or no')
+ v = input_yesno(s, default)
+ return v
+
+def input_menu(items, default=None, header=None):
+ if not isinstance(items, (tuple, list)) or False in [True if isinstance(i, str) else False
+ for i in items]:
+ illegal_value(items, 'items', 'input_menu')
+ return None
+ if default is not None:
+ if not (isinstance(default, str) and default in items):
+ logging.error(f'Illegal value for default ({default}), must be in {items}')
+ return None
+ default_string = f' [{items.index(default)+1}]'
+ else:
+ default_string = ''
+ if header is None:
+ print(f'Choose one of the following items (1, {len(items)}){default_string}:')
+ else:
+ print(f'{header} (1, {len(items)}){default_string}:')
+ for i, choice in enumerate(items):
+ print(f' {i+1}: {choice}')
+ choice = input()
+ if isinstance(choice, str) and not len(choice):
+ choice = items.index(default)
+ else:
+ choice = literal_eval(choice)
+ if isinstance(choice, int) and 1 <= choice <= len(items):
+ choice -= 1
+ else:
+ print(f'Illegal choice, enter a number between 1 and {len(items)}')
+ choice = input_menu(items, default)
+ return choice
+
+def create_mask(x, bounds=None, reverse_mask=False, current_mask=None):
+ # bounds is a pair of number in the same units a x
+ if not isinstance(x, (tuple, list, np.ndarray)) or not len(x):
+ logging.warning(f'Illegal input array ({x}, {type(x)})')
+ return None
+ if bounds is not None and not is_num_pair(bounds):
+ logging.warning(f'Illegal bounds parameter ({bounds} {type(bounds)}, input ignored')
+ bounds = None
+ if bounds is not None:
+ if not reverse_mask:
+ mask = np.logical_and(x > min(bounds), x < max(bounds))
+ else:
+ mask = np.logical_or(x < min(bounds), x > max(bounds))
+ else:
+ mask = np.ones(len(x), dtype=bool)
+ if current_mask is not None:
+ if not isinstance(current_mask, (tuple, list, np.ndarray)) or len(current_mask) != len(x):
+ logging.warning(f'Illegal current_mask ({current_mask}, {type(current_mask)}), '+
+ 'input ignored')
+ else:
+ mask = np.logical_and(mask, current_mask)
+ if not True in mask:
+ logging.warning('Entire data array is masked')
+ return mask
+
+def draw_mask_1d(ydata, xdata=None, current_index_ranges=None, current_mask=None,
+ select_mask=True, num_index_ranges_max=None, title=None, legend=None):
+ def draw_selections(ax):
+ ax.clear()
+ ax.set_title(title)
+ ax.legend([legend])
+ ax.plot(xdata, ydata, 'k')
+ for (low, upp) in current_include:
+ xlow = 0.5*(xdata[max(0, low-1)]+xdata[low])
+ xupp = 0.5*(xdata[upp]+xdata[min(num_data-1, upp+1)])
+ ax.axvspan(xlow, xupp, facecolor='green', alpha=0.5)
+ for (low, upp) in current_exclude:
+ xlow = 0.5*(xdata[max(0, low-1)]+xdata[low])
+ xupp = 0.5*(xdata[upp]+xdata[min(num_data-1, upp+1)])
+ ax.axvspan(xlow, xupp, facecolor='red', alpha=0.5)
+ for (low, upp) in selected_index_ranges:
+ xlow = 0.5*(xdata[max(0, low-1)]+xdata[low])
+ xupp = 0.5*(xdata[upp]+xdata[min(num_data-1, upp+1)])
+ ax.axvspan(xlow, xupp, facecolor=selection_color, alpha=0.5)
+ ax.get_figure().canvas.draw()
+
+ def onclick(event):
+ if event.inaxes in [fig.axes[0]]:
+ selected_index_ranges.append(index_nearest_upp(xdata, event.xdata))
+
+ def onrelease(event):
+ if len(selected_index_ranges) > 0:
+ if isinstance(selected_index_ranges[-1], int):
+ if event.inaxes in [fig.axes[0]]:
+ event.xdata = index_nearest_low(xdata, event.xdata)
+ if selected_index_ranges[-1] <= event.xdata:
+ selected_index_ranges[-1] = (selected_index_ranges[-1], event.xdata)
+ else:
+ selected_index_ranges[-1] = (event.xdata, selected_index_ranges[-1])
+ draw_selections(event.inaxes)
+ else:
+ selected_index_ranges.pop(-1)
+
+ def confirm_selection(event):
+ plt.close()
+
+ def clear_last_selection(event):
+ if len(selected_index_ranges):
+ selected_index_ranges.pop(-1)
+ draw_selections(ax)
+
+ def update_mask(mask):
+ for (low, upp) in selected_index_ranges:
+ selected_mask = np.logical_and(xdata >= xdata[low], xdata <= xdata[upp])
+ mask = np.logical_or(mask, selected_mask)
+ for (low, upp) in unselected_index_ranges:
+ unselected_mask = np.logical_and(xdata >= xdata[low], xdata <= xdata[upp])
+ mask[unselected_mask] = False
+ return mask
+
+ def update_index_ranges(mask):
+ # Update the currently included index ranges (where mask is True)
+ current_include = []
+ for i, m in enumerate(mask):
+ if m == True:
+ if len(current_include) == 0 or type(current_include[-1]) == tuple:
+ current_include.append(i)
+ else:
+ if len(current_include) > 0 and isinstance(current_include[-1], int):
+ current_include[-1] = (current_include[-1], i-1)
+ if len(current_include) > 0 and isinstance(current_include[-1], int):
+ current_include[-1] = (current_include[-1], num_data-1)
+ return current_include
+
+ # Check for valid inputs
+ ydata = np.asarray(ydata)
+ if ydata.ndim > 1:
+ logging.warning(f'Illegal ydata dimension ({ydata.ndim})')
+ return None, None
+ num_data = ydata.size
+ if xdata is None:
+ xdata = np.arange(num_data)
+ else:
+ xdata = np.asarray(xdata, dtype=np.float64)
+ if xdata.ndim > 1 or xdata.size != num_data:
+ logging.warning(f'Illegal xdata shape ({xdata.shape})')
+ return None, None
+ if not np.all(xdata[:-1] < xdata[1:]):
+ logging.warning('Illegal xdata: must be monotonically increasing')
+ return None, None
+ if current_index_ranges is not None:
+ if not isinstance(current_index_ranges, (tuple, list)):
+ logging.warning('Illegal current_index_ranges parameter ({current_index_ranges}, '+
+ f'{type(current_index_ranges)})')
+ return None, None
+ if not isinstance(select_mask, bool):
+ logging.warning('Illegal select_mask parameter ({select_mask}, {type(select_mask)})')
+ return None, None
+ if num_index_ranges_max is not None:
+ logging.warning('num_index_ranges_max input not yet implemented in draw_mask_1d')
+ if title is None:
+ title = 'select ranges of data'
+ elif not isinstance(title, str):
+ illegal(title, 'title')
+ title = ''
+ if legend is None and not isinstance(title, str):
+ illegal(legend, 'legend')
+ legend = None
+
+ if select_mask:
+ title = f'Click and drag to {title} you wish to include'
+ selection_color = 'green'
+ else:
+ title = f'Click and drag to {title} you wish to exclude'
+ selection_color = 'red'
+
+ # Set initial selected mask and the selected/unselected index ranges as needed
+ selected_index_ranges = []
+ unselected_index_ranges = []
+ selected_mask = np.full(xdata.shape, False, dtype=bool)
+ if current_index_ranges is None:
+ if current_mask is None:
+ if not select_mask:
+ selected_index_ranges = [(0, num_data-1)]
+ selected_mask = np.full(xdata.shape, True, dtype=bool)
+ else:
+ selected_mask = np.copy(np.asarray(current_mask, dtype=bool))
+ if current_index_ranges is not None and len(current_index_ranges):
+ current_index_ranges = sorted([(low, upp) for (low, upp) in current_index_ranges])
+ for (low, upp) in current_index_ranges:
+ if low > upp or low >= num_data or upp < 0:
+ continue
+ if low < 0:
+ low = 0
+ if upp >= num_data:
+ upp = num_data-1
+ selected_index_ranges.append((low, upp))
+ selected_mask = update_mask(selected_mask)
+ if current_index_ranges is not None and current_mask is not None:
+ selected_mask = np.logical_and(current_mask, selected_mask)
+ if current_mask is not None:
+ selected_index_ranges = update_index_ranges(selected_mask)
+
+ # Set up range selections for display
+ current_include = selected_index_ranges
+ current_exclude = []
+ selected_index_ranges = []
+ if not len(current_include):
+ if select_mask:
+ current_exclude = [(0, num_data-1)]
+ else:
+ current_include = [(0, num_data-1)]
+ else:
+ if current_include[0][0] > 0:
+ current_exclude.append((0, current_include[0][0]-1))
+ for i in range(1, len(current_include)):
+ current_exclude.append((current_include[i-1][1]+1, current_include[i][0]-1))
+ if current_include[-1][1] < num_data-1:
+ current_exclude.append((current_include[-1][1]+1, num_data-1))
+
+ # Set up matplotlib figure
+ plt.close('all')
+ fig, ax = plt.subplots()
+ plt.subplots_adjust(bottom=0.2)
+ draw_selections(ax)
+
+ # Set up event handling for click-and-drag range selection
+ cid_click = fig.canvas.mpl_connect('button_press_event', onclick)
+ cid_release = fig.canvas.mpl_connect('button_release_event', onrelease)
+
+ # Set up confirm / clear range selection buttons
+ confirm_b = Button(plt.axes([0.75, 0.05, 0.15, 0.075]), 'Confirm')
+ clear_b = Button(plt.axes([0.59, 0.05, 0.15, 0.075]), 'Clear')
+ cid_confirm = confirm_b.on_clicked(confirm_selection)
+ cid_clear = clear_b.on_clicked(clear_last_selection)
+
+ # Show figure
+ plt.show(block=True)
+
+ # Disconnect callbacks when figure is closed
+ fig.canvas.mpl_disconnect(cid_click)
+ fig.canvas.mpl_disconnect(cid_release)
+ confirm_b.disconnect(cid_confirm)
+ clear_b.disconnect(cid_clear)
+
+ # Swap selection depending on select_mask
+ if not select_mask:
+ selected_index_ranges, unselected_index_ranges = unselected_index_ranges, \
+ selected_index_ranges
+
+ # Update the mask with the currently selected/unselected x-ranges
+ selected_mask = update_mask(selected_mask)
+
+ # Update the currently included index ranges (where mask is True)
+ current_include = update_index_ranges(selected_mask)
+
+ return selected_mask, current_include
+
+def findImageFiles(path, filetype, name=None):
+ if isinstance(name, str):
+ name = f' {name} '
+ else:
+ name = ' '
+ # Find available index range
+ if filetype == 'tif':
+ if not isinstance(path, str) or not os.path.isdir(path):
+ illegal_value(path, 'path', 'findImageRange')
+ return -1, 0, []
+ indexRegex = re.compile(r'\d+')
+ # At this point only tiffs
+ files = sorted([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) and
+ f.endswith('.tif') and indexRegex.search(f)])
+ num_imgs = len(files)
+ if num_imgs < 1:
+ logging.warning('No available'+name+'files')
+ return -1, 0, []
+ first_index = indexRegex.search(files[0]).group()
+ last_index = indexRegex.search(files[-1]).group()
+ if first_index is None or last_index is None:
+ logging.error('Unable to find correctly indexed'+name+'images')
+ return -1, 0, []
+ first_index = int(first_index)
+ last_index = int(last_index)
+ if num_imgs != last_index-first_index+1:
+ logging.error('Non-consecutive set of indices for'+name+'images')
+ return -1, 0, []
+ paths = [os.path.join(path, f) for f in files]
+ elif filetype == 'h5':
+ if not isinstance(path, str) or not os.path.isfile(path):
+ illegal_value(path, 'path', 'findImageRange')
+ return -1, 0, []
+ # At this point only h5 in alamo2 detector style
+ first_index = 0
+ with h5py.File(path, 'r') as f:
+ num_imgs = f['entry/instrument/detector/data'].shape[0]
+ last_index = num_imgs-1
+ paths = [path]
+ else:
+ illegal_value(filetype, 'filetype', 'findImageRange')
+ return -1, 0, []
+ logging.debug('\nNumber of available'+name+f'images: {num_imgs}')
+ logging.debug('Index range of available'+name+f'images: [{first_index}, '+
+ f'{last_index}]')
+
+ return first_index, num_imgs, paths
+
+def selectImageRange(first_index, offset, num_imgs, name=None, num_required=None):
+ if isinstance(name, str):
+ name = f' {name} '
+ else:
+ name = ' '
+ # Check existing values
+ use_input = False
+ if (is_int(first_index, 0) and is_int(offset, 0) and is_int(num_imgs, 1)):
+ if offset < 0:
+ use_input = input_yesno(f'\nCurrent{name}first index = {first_index}, '+
+ 'use this value (y/n)?', 'y')
+ else:
+ use_input = input_yesno(f'\nCurrent{name}first index/offset = '+
+ f'{first_index}/{offset}, use these values (y/n)?', 'y')
+ if num_required is None:
+ if use_input:
+ use_input = input_yesno(f'Current number of{name}images = '+
+ f'{num_imgs}, use this value (y/n)? ', 'y')
+ if use_input:
+ return first_index, offset, num_imgs
+
+ # Check range against requirements
+ if num_imgs < 1:
+ logging.warning('No available'+name+'images')
+ return -1, -1, 0
+ if num_required is None:
+ if num_imgs == 1:
+ return first_index, 0, 1
+ else:
+ if not is_int(num_required, 1):
+ illegal_value(num_required, 'num_required', 'selectImageRange')
+ return -1, -1, 0
+ if num_imgs < num_required:
+ logging.error('Unable to find the required'+name+
+ f'images ({num_imgs} out of {num_required})')
+ return -1, -1, 0
+
+ # Select index range
+ print('\nThe number of available'+name+f'images is {num_imgs}')
+ if num_required is None:
+ last_index = first_index+num_imgs
+ use_all = f'Use all ([{first_index}, {last_index}])'
+ pick_offset = 'Pick a first index offset and a number of images'
+ pick_bounds = 'Pick the first and last index'
+ choice = input_menu([use_all, pick_offset, pick_bounds], default=pick_offset)
+ if not choice:
+ offset = 0
+ elif choice == 1:
+ offset = input_int('Enter the first index offset', 0, last_index-first_index)
+ first_index += offset
+ if first_index == last_index:
+ num_imgs = 1
+ else:
+ num_imgs = input_int('Enter the number of images', 1, num_imgs-offset)
+ else:
+ offset = input_int('Enter the first index', first_index, last_index)
+ first_index += offset
+ num_imgs = input_int('Enter the last index', first_index, last_index)-first_index+1
+ else:
+ use_all = f'Use ([{first_index}, {first_index+num_required-1}])'
+ pick_offset = 'Pick the first index offset'
+ choice = input_menu([use_all, pick_offset], pick_offset)
+ offset = 0
+ if choice == 1:
+ offset = input_int('Enter the first index offset', 0, num_imgs-num_required)
+ first_index += offset
+ num_imgs = num_required
+
+ return first_index, offset, num_imgs
+
+def loadImage(f, img_x_bounds=None, img_y_bounds=None):
+ """Load a single image from file.
+ """
+ if not os.path.isfile(f):
+ logging.error(f'Unable to load {f}')
+ return None
+ img_read = plt.imread(f)
+ if not img_x_bounds:
+ img_x_bounds = (0, img_read.shape[0])
+ else:
+ if (not isinstance(img_x_bounds, (tuple, list)) or len(img_x_bounds) != 2 or
+ not (0 <= img_x_bounds[0] < img_x_bounds[1] <= img_read.shape[0])):
+ logging.error(f'inconsistent row dimension in {f}')
+ return None
+ if not img_y_bounds:
+ img_y_bounds = (0, img_read.shape[1])
+ else:
+ if (not isinstance(img_y_bounds, list) or len(img_y_bounds) != 2 or
+ not (0 <= img_y_bounds[0] < img_y_bounds[1] <= img_read.shape[1])):
+ logging.error(f'inconsistent column dimension in {f}')
+ return None
+ return img_read[img_x_bounds[0]:img_x_bounds[1],img_y_bounds[0]:img_y_bounds[1]]
+
+def loadImageStack(files, filetype, img_offset, num_imgs, num_img_skip=0,
+ img_x_bounds=None, img_y_bounds=None):
+ """Load a set of images and return them as a stack.
+ """
+ logging.debug(f'img_offset = {img_offset}')
+ logging.debug(f'num_imgs = {num_imgs}')
+ logging.debug(f'num_img_skip = {num_img_skip}')
+ logging.debug(f'\nfiles:\n{files}\n')
+ img_stack = np.array([])
+ if filetype == 'tif':
+ img_read_stack = []
+ i = 1
+ t0 = time()
+ for f in files[img_offset:img_offset+num_imgs:num_img_skip+1]:
+ if not i%20:
+ logging.info(f' loading {i}/{num_imgs}: {f}')
+ else:
+ logging.debug(f' loading {i}/{num_imgs}: {f}')
+ img_read = loadImage(f, img_x_bounds, img_y_bounds)
+ img_read_stack.append(img_read)
+ i += num_img_skip+1
+ img_stack = np.stack([img_read for img_read in img_read_stack])
+ logging.info(f'... done in {time()-t0:.2f} seconds!')
+ logging.debug(f'img_stack shape = {np.shape(img_stack)}')
+ del img_read_stack, img_read
+ elif filetype == 'h5':
+ if not isinstance(files[0], str) and not os.path.isfile(files[0]):
+ illegal_value(files[0], 'files[0]', 'loadImageStack')
+ return img_stack
+ t0 = time()
+ logging.info(f'Loading {files[0]}')
+ with h5py.File(files[0], 'r') as f:
+ shape = f['entry/instrument/detector/data'].shape
+ if len(shape) != 3:
+ logging.error(f'inconsistent dimensions in {files[0]}')
+ if not img_x_bounds:
+ img_x_bounds = (0, shape[1])
+ else:
+ if (not isinstance(img_x_bounds, (tuple, list)) or len(img_x_bounds) != 2 or
+ not (0 <= img_x_bounds[0] < img_x_bounds[1] <= shape[1])):
+ logging.error(f'inconsistent row dimension in {files[0]} {img_x_bounds} '+
+ f'{shape[1]}')
+ if not img_y_bounds:
+ img_y_bounds = (0, shape[2])
+ else:
+ if (not isinstance(img_y_bounds, list) or len(img_y_bounds) != 2 or
+ not (0 <= img_y_bounds[0] < img_y_bounds[1] <= shape[2])):
+ logging.error(f'inconsistent column dimension in {files[0]}')
+ img_stack = f.get('entry/instrument/detector/data')[
+ img_offset:img_offset+num_imgs:num_img_skip+1,
+ img_x_bounds[0]:img_x_bounds[1],img_y_bounds[0]:img_y_bounds[1]]
+ logging.info(f'... done in {time()-t0:.2f} seconds!')
+ else:
+ illegal_value(filetype, 'filetype', 'loadImageStack')
+ return img_stack
+
+def combine_tiffs_in_h5(files, num_imgs, h5_filename):
+ img_stack = loadImageStack(files, 'tif', 0, num_imgs)
+ with h5py.File(h5_filename, 'w') as f:
+ f.create_dataset('entry/instrument/detector/data', data=img_stack)
+ del img_stack
+ return [h5_filename]
+
+def clearImshow(title=None):
+ plt.ioff()
+ if title is None:
+ title = 'quick imshow'
+ elif not isinstance(title, str):
+ illegal_value(title, 'title', 'clearImshow')
+ return
+ plt.close(fig=title)
+
+def clearPlot(title=None):
+ plt.ioff()
+ if title is None:
+ title = 'quick plot'
+ elif not isinstance(title, str):
+ illegal_value(title, 'title', 'clearPlot')
+ return
+ plt.close(fig=title)
+
+def quickImshow(a, title=None, path=None, name=None, save_fig=False, save_only=False,
+ clear=True, extent=None, show_grid=False, grid_color='w', grid_linewidth=1, **kwargs):
+ if title is not None and not isinstance(title, str):
+ illegal_value(title, 'title', 'quickImshow')
+ return
+ if path is not None and not isinstance(path, str):
+ illegal_value(path, 'path', 'quickImshow')
+ return
+ if not isinstance(save_fig, bool):
+ illegal_value(save_fig, 'save_fig', 'quickImshow')
+ return
+ if not isinstance(save_only, bool):
+ illegal_value(save_only, 'save_only', 'quickImshow')
+ return
+ if not isinstance(clear, bool):
+ illegal_value(clear, 'clear', 'quickImshow')
+ return
+ if not title:
+ title='quick imshow'
+# else:
+# title = re.sub(r"\s+", '_', title)
+ if name is None:
+ ttitle = re.sub(r"\s+", '_', title)
+ if path is None:
+ path = f'{ttitle}.png'
+ else:
+ path = f'{path}/{ttitle}.png'
+ else:
+ if path is None:
+ path = name
+ else:
+ path = f'{path}/{name}'
+ if extent is None:
+ extent = (0, a.shape[1], a.shape[0], 0)
+ if clear:
+ plt.close(fig=title)
+ if not save_only:
+ plt.ion()
+ plt.figure(title)
+ plt.imshow(a, extent=extent, **kwargs)
+ if show_grid:
+ ax = plt.gca()
+ ax.grid(color=grid_color, linewidth=grid_linewidth)
+# if title != 'quick imshow':
+# plt.title = title
+ if save_only:
+ plt.savefig(path)
+ plt.close(fig=title)
+ else:
+ if save_fig:
+ plt.savefig(path)
+
+def quickPlot(*args, xerr=None, yerr=None, vlines=None, title=None, xlim=None, ylim=None,
+ xlabel=None, ylabel=None, legend=None, path=None, name=None, show_grid=False,
+ save_fig=False, save_only=False, clear=True, block=False, **kwargs):
+ if title is not None and not isinstance(title, str):
+ illegal_value(title, 'title', 'quickPlot')
+ title = None
+ if xlim is not None and not isinstance(xlim, (tuple, list)) and len(xlim) != 2:
+ illegal_value(xlim, 'xlim', 'quickPlot')
+ xlim = None
+ if ylim is not None and not isinstance(ylim, (tuple, list)) and len(ylim) != 2:
+ illegal_value(ylim, 'ylim', 'quickPlot')
+ ylim = None
+ if xlabel is not None and not isinstance(xlabel, str):
+ illegal_value(xlabel, 'xlabel', 'quickPlot')
+ xlabel = None
+ if ylabel is not None and not isinstance(ylabel, str):
+ illegal_value(ylabel, 'ylabel', 'quickPlot')
+ ylabel = None
+ if legend is not None and not isinstance(legend, (tuple, list)):
+ illegal_value(legend, 'legend', 'quickPlot')
+ legend = None
+ if path is not None and not isinstance(path, str):
+ illegal_value(path, 'path', 'quickPlot')
+ return
+ if not isinstance(show_grid, bool):
+ illegal_value(show_grid, 'show_grid', 'quickPlot')
+ return
+ if not isinstance(save_fig, bool):
+ illegal_value(save_fig, 'save_fig', 'quickPlot')
+ return
+ if not isinstance(save_only, bool):
+ illegal_value(save_only, 'save_only', 'quickPlot')
+ return
+ if not isinstance(clear, bool):
+ illegal_value(clear, 'clear', 'quickPlot')
+ return
+ if not isinstance(block, bool):
+ illegal_value(block, 'block', 'quickPlot')
+ return
+ if title is None:
+ title = 'quick plot'
+# else:
+# title = re.sub(r"\s+", '_', title)
+ if name is None:
+ ttitle = re.sub(r"\s+", '_', title)
+ if path is None:
+ path = f'{ttitle}.png'
+ else:
+ path = f'{path}/{ttitle}.png'
+ else:
+ if path is None:
+ path = name
+ else:
+ path = f'{path}/{name}'
+ if clear:
+ plt.close(fig=title)
+ args = unwrap_tuple(args)
+ if depth_tuple(args) > 1 and (xerr is not None or yerr is not None):
+ logging.warning('Error bars ignored form multiple curves')
+ if not save_only:
+ if block:
+ plt.ioff()
+ else:
+ plt.ion()
+ plt.figure(title)
+ if depth_tuple(args) > 1:
+ for y in args:
+ plt.plot(*y, **kwargs)
+ else:
+ if xerr is None and yerr is None:
+ plt.plot(*args, **kwargs)
+ else:
+ plt.errorbar(*args, xerr=xerr, yerr=yerr, **kwargs)
+ if vlines is not None:
+ for v in vlines:
+ plt.axvline(v, color='r', linestyle='--', **kwargs)
+# if vlines is not None:
+# for s in tuple(([x, x], list(plt.gca().get_ylim())) for x in vlines):
+# plt.plot(*s, color='red', **kwargs)
+ if xlim is not None:
+ plt.xlim(xlim)
+ if ylim is not None:
+ plt.ylim(ylim)
+ if xlabel is not None:
+ plt.xlabel(xlabel)
+ if ylabel is not None:
+ plt.ylabel(ylabel)
+ if show_grid:
+ ax = plt.gca()
+ ax.grid(color='k')#, linewidth=1)
+ if legend is not None:
+ plt.legend(legend)
+ if save_only:
+ plt.savefig(path)
+ plt.close(fig=title)
+ else:
+ if save_fig:
+ plt.savefig(path)
+ if block:
+ plt.show(block=block)
+
+def selectArrayBounds(a, x_low=None, x_upp=None, num_x_min=None, ask_bounds=False,
+ title='select array bounds'):
+ """Interactively select the lower and upper data bounds for a numpy array.
+ """
+ if isinstance(a, (tuple, list)):
+ a = np.array(a)
+ if not isinstance(a, np.ndarray) or a.ndim != 1:
+ illegal_value(a.ndim, 'array type or dimension', 'selectArrayBounds')
+ return None
+ len_a = len(a)
+ if num_x_min is None:
+ num_x_min = 1
+ else:
+ if num_x_min < 2 or num_x_min > len_a:
+ logging.warning('Illegal value for num_x_min in selectArrayBounds, input ignored')
+ num_x_min = 1
+
+ # Ask to use current bounds
+ if ask_bounds and (x_low is not None or x_upp is not None):
+ if x_low is None:
+ x_low = 0
+ if not is_int(x_low, 0, len_a-num_x_min):
+ illegal_value(x_low, 'x_low', 'selectArrayBounds')
+ return None
+ if x_upp is None:
+ x_upp = len_a
+ if not is_int(x_upp, x_low+num_x_min, len_a):
+ illegal_value(x_upp, 'x_upp', 'selectArrayBounds')
+ return None
+ quickPlot((range(len_a), a), vlines=(x_low,x_upp), title=title)
+ if not input_yesno(f'\nCurrent array bounds: [{x_low}, {x_upp}] '+
+ 'use these values (y/n)?', 'y'):
+ x_low = None
+ x_upp = None
+ else:
+ clearPlot(title)
+ return x_low, x_upp
+
+ if x_low is None:
+ x_min = 0
+ x_max = len_a
+ x_low_max = len_a-num_x_min
+ while True:
+ quickPlot(range(x_min, x_max), a[x_min:x_max], title=title)
+ zoom_flag = input_yesno('Set lower data bound (y) or zoom in (n)?', 'y')
+ if zoom_flag:
+ x_low = input_int(' Set lower data bound', 0, x_low_max)
+ break
+ else:
+ x_min = input_int(' Set lower zoom index', 0, x_low_max)
+ x_max = input_int(' Set upper zoom index', x_min+1, x_low_max+1)
+ else:
+ if not is_int(x_low, 0, len_a-num_x_min):
+ illegal_value(x_low, 'x_low', 'selectArrayBounds')
+ return None
+ if x_upp is None:
+ x_min = x_low+num_x_min
+ x_max = len_a
+ x_upp_min = x_min
+ while True:
+ quickPlot(range(x_min, x_max), a[x_min:x_max], title=title)
+ zoom_flag = input_yesno('Set upper data bound (y) or zoom in (n)?', 'y')
+ if zoom_flag:
+ x_upp = input_int(' Set upper data bound', x_upp_min, len_a)
+ break
+ else:
+ x_min = input_int(' Set upper zoom index', x_upp_min, len_a-1)
+ x_max = input_int(' Set upper zoom index', x_min+1, len_a)
+ else:
+ if not is_int(x_upp, x_low+num_x_min, len_a):
+ illegal_value(x_upp, 'x_upp', 'selectArrayBounds')
+ return None
+ print(f'lower bound = {x_low} (inclusive)\nupper bound = {x_upp} (exclusive)]')
+ quickPlot((range(len_a), a), vlines=(x_low,x_upp), title=title)
+ if not input_yesno('Accept these bounds (y/n)?', 'y'):
+ x_low, x_upp = selectArrayBounds(a, None, None, num_x_min, title=title)
+ clearPlot(title)
+ return x_low, x_upp
+
+def selectImageBounds(a, axis, low=None, upp=None, num_min=None,
+ title='select array bounds'):
+ """Interactively select the lower and upper data bounds for a 2D numpy array.
+ """
+ if isinstance(a, np.ndarray):
+ if a.ndim != 2:
+ illegal_value(a.ndim, 'array dimension', 'selectImageBounds')
+ return None
+ elif isinstance(a, (tuple, list)):
+ if len(a) != 2:
+ illegal_value(len(a), 'array dimension', 'selectImageBounds')
+ return None
+ if len(a[0]) != len(a[1]) or not (isinstance(a[0], (tuple, list, np.ndarray)) and
+ isinstance(a[1], (tuple, list, np.ndarray))):
+ logging.error(f'Illegal array type in selectImageBounds ({type(a[0])} {type(a[1])})')
+ return None
+ a = np.array(a)
+ else:
+ illegal_value(a, 'array type', 'selectImageBounds')
+ return None
+ if axis < 0 or axis >= a.ndim:
+ illegal_value(axis, 'axis', 'selectImageBounds')
+ return None
+ low_save = low
+ upp_save = upp
+ num_min_save = num_min
+ if num_min is None:
+ num_min = 1
+ else:
+ if num_min < 2 or num_min > a.shape[axis]:
+ logging.warning('Illegal input for num_min in selectImageBounds, input ignored')
+ num_min = 1
+ if low is None:
+ min_ = 0
+ max_ = a.shape[axis]
+ low_max = a.shape[axis]-num_min
+ while True:
+ if axis:
+ quickImshow(a[:,min_:max_], title=title, aspect='auto',
+ extent=[min_,max_,a.shape[0],0])
+ else:
+ quickImshow(a[min_:max_,:], title=title, aspect='auto',
+ extent=[0,a.shape[1], max_,min_])
+ zoom_flag = input_yesno('Set lower data bound (y) or zoom in (n)?', 'y')
+ if zoom_flag:
+ low = input_int(' Set lower data bound', 0, low_max)
+ break
+ else:
+ min_ = input_int(' Set lower zoom index', 0, low_max)
+ max_ = input_int(' Set upper zoom index', min_+1, low_max+1)
+ else:
+ if not is_int(low, 0, a.shape[axis]-num_min):
+ illegal_value(low, 'low', 'selectImageBounds')
+ return None
+ if upp is None:
+ min_ = low+num_min
+ max_ = a.shape[axis]
+ upp_min = min_
+ while True:
+ if axis:
+ quickImshow(a[:,min_:max_], title=title, aspect='auto',
+ extent=[min_,max_,a.shape[0],0])
+ else:
+ quickImshow(a[min_:max_,:], title=title, aspect='auto',
+ extent=[0,a.shape[1], max_,min_])
+ zoom_flag = input_yesno('Set upper data bound (y) or zoom in (n)?', 'y')
+ if zoom_flag:
+ upp = input_int(' Set upper data bound', upp_min, a.shape[axis])
+ break
+ else:
+ min_ = input_int(' Set upper zoom index', upp_min, a.shape[axis]-1)
+ max_ = input_int(' Set upper zoom index', min_+1, a.shape[axis])
+ else:
+ if not is_int(upp, low+num_min, a.shape[axis]):
+ illegal_value(upp, 'upp', 'selectImageBounds')
+ return None
+ bounds = (low, upp)
+ a_tmp = np.copy(a)
+ a_tmp_max = a.max()
+ if axis:
+ a_tmp[:,bounds[0]] = a_tmp_max
+ a_tmp[:,bounds[1]-1] = a_tmp_max
+ else:
+ a_tmp[bounds[0],:] = a_tmp_max
+ a_tmp[bounds[1]-1,:] = a_tmp_max
+ print(f'lower bound = {low} (inclusive)\nupper bound = {upp} (exclusive)')
+ quickImshow(a_tmp, title=title)
+ del a_tmp
+ if not input_yesno('Accept these bounds (y/n)?', 'y'):
+ bounds = selectImageBounds(a, axis, low=low_save, upp=upp_save, num_min=num_min_save,
+ title=title)
+ return bounds
+
+
+class Config:
+ """Base class for processing a config file or dictionary.
+ """
+ def __init__(self, config_file=None, config_dict=None):
+ self.config = {}
+ self.load_flag = False
+ self.suffix = None
+
+ # Load config file
+ if config_file is not None and config_dict is not None:
+ logging.warning('Ignoring config_dict (both config_file and config_dict are specified)')
+ if config_file is not None:
+ self.loadFile(config_file)
+ elif config_dict is not None:
+ self.loadDict(config_dict)
+
+ def loadFile(self, config_file):
+ """Load a config file.
+ """
+ if self.load_flag:
+ logging.warning('Overwriting any previously loaded config file')
+ self.config = {}
+
+ # Ensure config file exists
+ if not os.path.isfile(config_file):
+ logging.error(f'Unable to load {config_file}')
+ return
+
+ # Load config file (for now for Galaxy, allow .dat extension)
+ self.suffix = os.path.splitext(config_file)[1]
+ if self.suffix == '.yml' or self.suffix == '.yaml' or self.suffix == '.dat':
+ with open(config_file, 'r') as f:
+ self.config = yaml.safe_load(f)
+ elif self.suffix == '.txt':
+ with open(config_file, 'r') as f:
+ lines = f.read().splitlines()
+ self.config = {item[0].strip():literal_eval(item[1].strip()) for item in
+ [line.split('#')[0].split('=') for line in lines if '=' in line.split('#')[0]]}
+ else:
+ illegal_value(self.suffix, 'config file extension', 'Config.loadFile')
+
+ # Make sure config file was correctly loaded
+ if isinstance(self.config, dict):
+ self.load_flag = True
+ else:
+ logging.error(f'Unable to load dictionary from config file: {config_file}')
+ self.config = {}
+
+ def loadDict(self, config_dict):
+ """Takes a dictionary and places it into self.config.
+ """
+ if self.load_flag:
+ logging.warning('Overwriting the previously loaded config file')
+
+ if isinstance(config_dict, dict):
+ self.config = config_dict
+ self.load_flag = True
+ else:
+ illegal_value(config_dict, 'dictionary config object', 'Config.loadDict')
+ self.config = {}
+
+ def saveFile(self, config_file):
+ """Save the config file (as a yaml file only right now).
+ """
+ suffix = os.path.splitext(config_file)[1]
+ if suffix != '.yml' and suffix != '.yaml':
+ illegal_value(suffix, 'config file extension', 'Config.saveFile')
+
+ # Check if config file exists
+ if os.path.isfile(config_file):
+ logging.info(f'Updating {config_file}')
+ else:
+ logging.info(f'Saving {config_file}')
+
+ # Save config file
+ with open(config_file, 'w') as f:
+ yaml.safe_dump(self.config, f)
+
+ def validate(self, pars_required, pars_missing=None):
+ """Returns False if any required keys are missing.
+ """
+ if not self.load_flag:
+ logging.error('Load a config file prior to calling Config.validate')
+
+ def validate_nested_pars(config, par):
+ par_levels = par.split(':')
+ first_level_par = par_levels[0]
+ try:
+ first_level_par = int(first_level_par)
+ except:
+ pass
+ try:
+ next_level_config = config[first_level_par]
+ if len(par_levels) > 1:
+ next_level_par = ':'.join(par_levels[1:])
+ return validate_nested_pars(next_level_config, next_level_par)
+ else:
+ return True
+ except:
+ return False
+
+ pars_missing = [p for p in pars_required if not validate_nested_pars(self.config, p)]
+ if len(pars_missing) > 0:
+ logging.error(f'Missing item(s) in configuration: {", ".join(pars_missing)}')
+ return False
+ else:
+ return True
diff -r 059819ea1f0e -r 26f99fdd8d61 msnc_tools.py
--- a/msnc_tools.py Wed Apr 27 17:28:26 2022 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,900 +0,0 @@
-#!/usr/bin/env python3
-
-# -*- coding: utf-8 -*-
-"""
-Created on Mon Dec 6 15:36:22 2021
-
-@author: rv43
-"""
-
-import logging
-
-import os
-import sys
-import re
-import yaml
-import h5py
-try:
- import pyinputplus as pyip
-except:
- pass
-import numpy as np
-import imageio as img
-import matplotlib.pyplot as plt
-from time import time
-from ast import literal_eval
-try:
- from lmfit.models import StepModel, RectangleModel
-except:
- pass
-
-def depth_list(L): return isinstance(L, list) and max(map(depth_list, L))+1
-def depth_tuple(T): return isinstance(T, tuple) and max(map(depth_tuple, T))+1
-
-def is_int(v, v_min=None, v_max=None):
- """Value is an integer in range v_min <= v <= v_max.
- """
- if not isinstance(v, int):
- return False
- if (v_min is not None and v < v_min) or (v_max is not None and v > v_max):
- return False
- return True
-
-def is_num(v, v_min=None, v_max=None):
- """Value is a number in range v_min <= v <= v_max.
- """
- if not isinstance(v, (int,float)):
- return False
- if (v_min is not None and v < v_min) or (v_max is not None and v > v_max):
- return False
- return True
-
-def is_index(v, v_min=0, v_max=None):
- """Value is an array index in range v_min <= v < v_max.
- """
- if not isinstance(v, int):
- return False
- if v < v_min or (v_max is not None and v >= v_max):
- return False
- return True
-
-def is_index_range(v, v_min=0, v_max=None):
- """Value is an array index range in range v_min <= v[0] <= v[1] <= v_max.
- """
- if not (isinstance(v, list) and len(v) == 2 and isinstance(v[0], int) and
- isinstance(v[1], int)):
- return False
- if not 0 <= v[0] <= v[1] or (v_max is not None and v[1] > v_max):
- return False
- return True
-
-def illegal_value(name, value, location=None, exit_flag=False):
- if not isinstance(location, str):
- location = ''
- else:
- location = f'in {location} '
- if isinstance(name, str):
- logging.error(f'Illegal value for {name} {location}({value}, {type(value)})')
- else:
- logging.error(f'Illegal value {location}({value}, {type(value)})')
- if exit_flag:
- exit(1)
-
-def get_trailing_int(string):
- indexRegex = re.compile(r'\d+$')
- mo = indexRegex.search(string)
- if mo is None:
- return None
- else:
- return int(mo.group())
-
-def findImageFiles(path, filetype, name=None):
- if isinstance(name, str):
- name = f' {name} '
- else:
- name = ' '
- # Find available index range
- if filetype == 'tif':
- if not isinstance(path, str) and not os.path.isdir(path):
- illegal_value('path', path, 'findImageRange')
- return -1, 0, []
- indexRegex = re.compile(r'\d+')
- # At this point only tiffs
- files = sorted([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) and
- f.endswith('.tif') and indexRegex.search(f)])
- num_imgs = len(files)
- if num_imgs < 1:
- logging.warning('No available'+name+'files')
- return -1, 0, []
- first_index = indexRegex.search(files[0]).group()
- last_index = indexRegex.search(files[-1]).group()
- if first_index is None or last_index is None:
- logging.error('Unable to find correctly indexed'+name+'images')
- return -1, 0, []
- first_index = int(first_index)
- last_index = int(last_index)
- if num_imgs != last_index-first_index+1:
- logging.error('Non-consecutive set of indices for'+name+'images')
- return -1, 0, []
- paths = [os.path.join(path, f) for f in files]
- elif filetype == 'h5':
- if not isinstance(path, str) or not os.path.isfile(path):
- illegal_value('path', path, 'findImageRange')
- return -1, 0, []
- # At this point only h5 in alamo2 detector style
- first_index = 0
- with h5py.File(path, 'r') as f:
- num_imgs = f['entry/instrument/detector/data'].shape[0]
- last_index = num_imgs-1
- paths = [path]
- else:
- illegal_value('filetype', filetype, 'findImageRange')
- return -1, 0, []
- logging.debug('\nNumber of available'+name+f'images: {num_imgs}')
- logging.debug('Index range of available'+name+f'images: [{first_index}, '+
- f'{last_index}]')
-
- return first_index, num_imgs, paths
-
-def selectImageRange(first_index, offset, num_imgs, name=None, num_required=None):
- if isinstance(name, str):
- name = f' {name} '
- else:
- name = ' '
- # Check existing values
- use_input = 'no'
- if (is_int(first_index, 0) and is_int(offset, 0) and is_int(num_imgs, 1)):
- if offset < 0:
- use_input = pyip.inputYesNo('\nCurrent'+name+f'first index = {first_index}, '+
- 'use this value ([y]/n)? ', blank=True)
- else:
- use_input = pyip.inputYesNo('\nCurrent'+name+'first index/offset = '+
- f'{first_index}/{offset}, use these values ([y]/n)? ',
- blank=True)
- if num_required is None:
- if use_input != 'no':
- use_input = pyip.inputYesNo('Current number of'+name+'images = '+
- f'{num_imgs}, use this value ([y]/n)? ',
- blank=True)
- if use_input != 'no':
- return first_index, offset, num_imgs
-
- # Check range against requirements
- if num_imgs < 1:
- logging.warning('No available'+name+'images')
- return -1, -1, 0
- if num_required is None:
- if num_imgs == 1:
- return first_index, 0, 1
- else:
- if not is_int(num_required, 1):
- illegal_value('num_required', num_required, 'selectImageRange')
- return -1, -1, 0
- if num_imgs < num_required:
- logging.error('Unable to find the required'+name+
- f'images ({num_imgs} out of {num_required})')
- return -1, -1, 0
-
- # Select index range
- print('\nThe number of available'+name+f'images is {num_imgs}')
- if num_required is None:
- last_index = first_index+num_imgs
- use_all = f'Use all ([{first_index}, {last_index}])'
- pick_offset = 'Pick a first index offset and a number of images'
- pick_bounds = 'Pick the first and last index'
- menuchoice = pyip.inputMenu([use_all, pick_offset, pick_bounds], numbered=True)
- if menuchoice == use_all:
- offset = 0
- elif menuchoice == pick_offset:
- offset = pyip.inputInt('Enter the first index offset'+
- f' [0, {last_index-first_index}]: ', min=0, max=last_index-first_index)
- first_index += offset
- if first_index == last_index:
- num_imgs = 1
- else:
- num_imgs = pyip.inputInt(f'Enter the number of images [1, {num_imgs-offset}]: ',
- min=1, max=num_imgs-offset)
- else:
- offset = pyip.inputInt(f'Enter the first index [{first_index}, {last_index}]: ',
- min=first_index, max=last_index)-first_index
- first_index += offset
- num_imgs = pyip.inputInt(f'Enter the last index [{first_index}, {last_index}]: ',
- min=first_index, max=last_index)-first_index+1
- else:
- use_all = f'Use ([{first_index}, {first_index+num_required-1}])'
- pick_offset = 'Pick the first index offset'
- menuchoice = pyip.inputMenu([use_all, pick_offset], numbered=True)
- offset = 0
- if menuchoice == pick_offset:
- offset = pyip.inputInt('Enter the first index offset'+
- f'[0, {num_imgs-num_required}]: ', min=0, max=num_imgs-num_required)
- first_index += offset
- num_imgs = num_required
-
- return first_index, offset, num_imgs
-
-def loadImage(f, img_x_bounds=None, img_y_bounds=None):
- """Load a single image from file.
- """
- if not os.path.isfile(f):
- logging.error(f'Unable to load {f}')
- return None
- img_read = img.imread(f)
- if not img_x_bounds:
- img_x_bounds = [0, img_read.shape[0]]
- else:
- if (not isinstance(img_x_bounds, list) or len(img_x_bounds) != 2 or
- not (0 <= img_x_bounds[0] < img_x_bounds[1] <= img_read.shape[0])):
- logging.error(f'inconsistent row dimension in {f}')
- return None
- if not img_y_bounds:
- img_y_bounds = [0, img_read.shape[1]]
- else:
- if (not isinstance(img_y_bounds, list) or len(img_y_bounds) != 2 or
- not (0 <= img_y_bounds[0] < img_y_bounds[1] <= img_read.shape[1])):
- logging.error(f'inconsistent column dimension in {f}')
- return None
- return img_read[img_x_bounds[0]:img_x_bounds[1],img_y_bounds[0]:img_y_bounds[1]]
-
-def loadImageStack(files, filetype, img_offset, num_imgs, num_img_skip=0,
- img_x_bounds=None, img_y_bounds=None):
- """Load a set of images and return them as a stack.
- """
- logging.debug(f'img_offset = {img_offset}')
- logging.debug(f'num_imgs = {num_imgs}')
- logging.debug(f'num_img_skip = {num_img_skip}')
- logging.debug(f'\nfiles:\n{files}\n')
- img_stack = np.array([])
- if filetype == 'tif':
- img_read_stack = []
- i = 1
- t0 = time()
- for f in files[img_offset:img_offset+num_imgs:num_img_skip+1]:
- if not i%20:
- logging.info(f' loading {i}/{num_imgs}: {f}')
- else:
- logging.debug(f' loading {i}/{num_imgs}: {f}')
- img_read = loadImage(f, img_x_bounds, img_y_bounds)
- img_read_stack.append(img_read)
- i += num_img_skip+1
- img_stack = np.stack([img_read for img_read in img_read_stack])
- logging.info(f'... done in {time()-t0:.2f} seconds!')
- logging.debug(f'img_stack shape = {np.shape(img_stack)}')
- del img_read_stack, img_read
- elif filetype == 'h5':
- if not isinstance(files[0], str) and not os.path.isfile(files[0]):
- illegal_value('files[0]', files[0], 'loadImageStack')
- return img_stack
- t0 = time()
- logging.info(f'Loading {files[0]}')
- with h5py.File(files[0], 'r') as f:
- shape = f['entry/instrument/detector/data'].shape
- if len(shape) != 3:
- logging.error(f'inconsistent dimensions in {files[0]}')
- if not img_x_bounds:
- img_x_bounds = [0, shape[1]]
- else:
- if (not isinstance(img_x_bounds, list) or len(img_x_bounds) != 2 or
- not (0 <= img_x_bounds[0] < img_x_bounds[1] <= shape[1])):
- logging.error(f'inconsistent row dimension in {files[0]}')
- if not img_y_bounds:
- img_y_bounds = [0, shape[2]]
- else:
- if (not isinstance(img_y_bounds, list) or len(img_y_bounds) != 2 or
- not (0 <= img_y_bounds[0] < img_y_bounds[1] <= shape[2])):
- logging.error(f'inconsistent column dimension in {files[0]}')
- img_stack = f.get('entry/instrument/detector/data')[
- img_offset:img_offset+num_imgs:num_img_skip+1,
- img_x_bounds[0]:img_x_bounds[1],img_y_bounds[0]:img_y_bounds[1]]
- logging.info(f'... done in {time()-t0:.2f} seconds!')
- else:
- illegal_value('filetype', filetype, 'findImageRange')
- return img_stack
-
-def combine_tiffs_in_h5(files, num_imgs, h5_filename):
- img_stack = loadImageStack(files, 'tif', 0, num_imgs)
- with h5py.File(h5_filename, 'w') as f:
- f.create_dataset('entry/instrument/detector/data', data=img_stack)
- del img_stack
- return [h5_filename]
-
-def clearFig(title):
- if not isinstance(title, str):
- illegal_value('title', title, 'clearFig')
- return
- plt.close(fig=re.sub(r"\s+", '_', title))
-
-def quickImshow(a, title=None, path=None, name=None, save_fig=False, save_only=False,
- clear=True, extent=None, show_grid=False, grid_color='w', grid_linewidth=1, **kwargs):
- if title is not None and not isinstance(title, str):
- illegal_value('title', title, 'quickImshow')
- return
- if path is not None and not isinstance(path, str):
- illegal_value('path', path, 'quickImshow')
- return
- if not isinstance(save_fig, bool):
- illegal_value('save_fig', save_fig, 'quickImshow')
- return
- if not isinstance(save_only, bool):
- illegal_value('save_only', save_only, 'quickImshow')
- return
- if not isinstance(clear, bool):
- illegal_value('clear', clear, 'quickImshow')
- return
- if not title:
- title='quick_imshow'
- else:
- title = re.sub(r"\s+", '_', title)
- if name is None:
- if path is None:
- path = f'{title}.png'
- else:
- path = f'{path}/{title}.png'
- else:
- if path is None:
- path = name
- else:
- path = f'{path}/{name}'
- if extent is None:
- extent = (0, a.shape[1], a.shape[0], 0)
- if clear:
- plt.close(fig=title)
- if save_only:
- plt.figure(title)
- plt.imshow(a, extent=extent, **kwargs)
- if show_grid:
- ax = plt.gca()
- ax.grid(color=grid_color, linewidth=grid_linewidth)
- if title != 'quick_imshow':
- plt.title = title
- plt.savefig(path)
- plt.close(fig=title)
- #plt.imsave(f'{title}.png', a, **kwargs)
- else:
- plt.ion()
- plt.figure(title)
- plt.imshow(a, extent=extent, **kwargs)
- if show_grid:
- ax = plt.gca()
- ax.grid(color=grid_color, linewidth=grid_linewidth)
- if title != 'quick_imshow':
- plt.title = title
- if save_fig:
- plt.savefig(path)
- plt.pause(1)
-
-def quickPlot(*args, title=None, path=None, name=None, save_fig=False, save_only=False,
- clear=True, show_grid=False, **kwargs):
- if title is not None and not isinstance(title, str):
- illegal_value('title', title, 'quickPlot')
- return
- if path is not None and not isinstance(path, str):
- illegal_value('path', path, 'quickPlot')
- return
- if not isinstance(save_fig, bool):
- illegal_value('save_fig', save_fig, 'quickPlot')
- return
- if not isinstance(save_only, bool):
- illegal_value('save_only', save_only, 'quickPlot')
- return
- if not isinstance(clear, bool):
- illegal_value('clear', clear, 'quickPlot')
- return
- if not title:
- title = 'quick_plot'
- else:
- title = re.sub(r"\s+", '_', title)
- if name is None:
- if path is None:
- path = f'{title}.png'
- else:
- path = f'{path}/{title}.png'
- else:
- if path is None:
- path = name
- else:
- path = f'{path}/{name}'
- if clear:
- plt.close(fig=title)
- if save_only:
- plt.figure(title)
- if depth_tuple(args) > 1:
- for y in args:
- plt.plot(*y, **kwargs)
- else:
- plt.plot(*args, **kwargs)
- if show_grid:
- ax = plt.gca()
- ax.grid(color='k')#, linewidth=1)
- if title != 'quick_plot':
- plt.title = title
- plt.savefig(path)
- plt.close(fig=title)
- else:
- plt.ion()
- plt.figure(title)
- if depth_tuple(args) > 1:
- for y in args:
- plt.plot(*y, **kwargs)
- else:
- plt.plot(*args, **kwargs)
- if show_grid:
- ax = plt.gca()
- ax.grid(color='k')#, linewidth=1)
- if title != 'quick_plot':
- plt.title = title
- if save_fig:
- plt.savefig(path)
- plt.pause(1)
-
-def selectArrayBounds(a, x_low=None, x_upp=None, num_x_min=None,
- title='select array bounds'):
- """Interactively select the lower and upper data bounds for a numpy array.
- """
- if not isinstance(a, np.ndarray) or a.ndim != 1:
- logging.error('Illegal array type or dimension in selectArrayBounds')
- return None
- x_low_save = x_low
- x_upp_save = x_upp
- num_x_min_save = num_x_min
- if num_x_min is None:
- num_x_min = 1
- else:
- if num_x_min < 2 or num_x_min > a.size:
- logging.warning('Illegal input for num_x_min in selectArrayBounds, input ignored')
- num_x_min = 1
- if x_low is None:
- x_min = 0
- x_max = a.size
- x_low_max = a.size-num_x_min
- while True:
- quickPlot(range(x_min, x_max), a[x_min:x_max], title=title)
- zoom_flag = pyip.inputInt('Set lower data bound (0) or zoom in (1)?: ',
- min=0, max=1)
- if zoom_flag:
- x_min = pyip.inputInt(f' Set lower zoom index [0, {x_low_max}]: ',
- min=0, max=x_low_max)
- x_max = pyip.inputInt(f' Set upper zoom index [{x_min+1}, {x_low_max+1}]: ',
- min=x_min+1, max=x_low_max+1)
- else:
- x_low = pyip.inputInt(f' Set lower data bound [0, {x_low_max}]: ',
- min=0, max=x_low_max)
- break
- else:
- if not is_int(x_low, 0, a.size-num_x_min):
- illegal_value('x_low', x_low, 'selectArrayBounds')
- return None
- if x_upp is None:
- x_min = x_low+num_x_min
- x_max = a.size
- x_upp_min = x_min
- while True:
- quickPlot(range(x_min, x_max), a[x_min:x_max], title=title)
- zoom_flag = pyip.inputInt('Set upper data bound (0) or zoom in (1)?: ',
- min=0, max=1)
- if zoom_flag:
- x_min = pyip.inputInt(f' Set upper zoom index [{x_upp_min}, {a.size-1}]: ',
- min=x_upp_min, max=a.size-1)
- x_max = pyip.inputInt(f' Set upper zoom index [{x_min+1}, {a.size}]: ',
- min=x_min+1, max=a.size)
- else:
- x_upp = pyip.inputInt(f' Set upper data bound [{x_upp_min}, {a.size}]: ',
- min=x_upp_min, max=a.size)
- break
- else:
- if not is_int(x_upp, x_low+num_x_min, a.size):
- illegal_value('x_upp', x_upp, 'selectArrayBounds')
- return None
- bounds = [x_low, x_upp]
- print(f'lower bound = {x_low} (inclusive)\nupper bound = {x_upp} (exclusive)]')
- #quickPlot(range(bounds[0], bounds[1]), a[bounds[0]:bounds[1]], title=title)
- quickPlot((range(a.size), a), ([bounds[0], bounds[0]], [a.min(), a.max()], 'r-'),
- ([bounds[1], bounds[1]], [a.min(), a.max()], 'r-'), title=title)
- if pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True) == 'no':
- bounds = selectArrayBounds(a, x_low_save, x_upp_save, num_x_min_save, title=title)
- return bounds
-
-def selectImageBounds(a, axis, low=None, upp=None, num_min=None,
- title='select array bounds'):
- """Interactively select the lower and upper data bounds for a 2D numpy array.
- """
- if not isinstance(a, np.ndarray) or a.ndim != 2:
- logging.error('Illegal array type or dimension in selectImageBounds')
- return None
- if axis < 0 or axis >= a.ndim:
- illegal_value('axis', axis, 'selectImageBounds')
- return None
- low_save = low
- upp_save = upp
- num_min_save = num_min
- if num_min is None:
- num_min = 1
- else:
- if num_min < 2 or num_min > a.shape[axis]:
- logging.warning('Illegal input for num_min in selectImageBounds, input ignored')
- num_min = 1
- if low is None:
- min_ = 0
- max_ = a.shape[axis]
- low_max = a.shape[axis]-num_min
- while True:
- if axis:
- quickImshow(a[:,min_:max_], title=title, aspect='auto',
- extent=[min_,max_,a.shape[0],0])
- else:
- quickImshow(a[min_:max_,:], title=title, aspect='auto',
- extent=[0,a.shape[1], max_,min_])
- zoom_flag = pyip.inputInt('Set lower data bound (0) or zoom in (1)?: ',
- min=0, max=1)
- if zoom_flag:
- min_ = pyip.inputInt(f' Set lower zoom index [0, {low_max}]: ',
- min=0, max=low_max)
- max_ = pyip.inputInt(f' Set upper zoom index [{min_+1}, {low_max+1}]: ',
- min=min_+1, max=low_max+1)
- else:
- low = pyip.inputInt(f' Set lower data bound [0, {low_max}]: ',
- min=0, max=low_max)
- break
- else:
- if not is_int(low, 0, a.shape[axis]-num_min):
- illegal_value('low', low, 'selectImageBounds')
- return None
- if upp is None:
- min_ = low+num_min
- max_ = a.shape[axis]
- upp_min = min_
- while True:
- if axis:
- quickImshow(a[:,min_:max_], title=title, aspect='auto',
- extent=[min_,max_,a.shape[0],0])
- else:
- quickImshow(a[min_:max_,:], title=title, aspect='auto',
- extent=[0,a.shape[1], max_,min_])
- zoom_flag = pyip.inputInt('Set upper data bound (0) or zoom in (1)?: ',
- min=0, max=1)
- if zoom_flag:
- min_ = pyip.inputInt(f' Set upper zoom index [{upp_min}, {a.shape[axis]-1}]: ',
- min=upp_min, max=a.shape[axis]-1)
- max_ = pyip.inputInt(f' Set upper zoom index [{min_+1}, {a.shape[axis]}]: ',
- min=min_+1, max=a.shape[axis])
- else:
- upp = pyip.inputInt(f' Set upper data bound [{upp_min}, {a.shape[axis]}]: ',
- min=upp_min, max=a.shape[axis])
- break
- else:
- if not is_int(upp, low+num_min, a.shape[axis]):
- illegal_value('upp', upp, 'selectImageBounds')
- return None
- bounds = [low, upp]
- a_tmp = np.copy(a)
- a_tmp_max = a.max()
- if axis:
- a_tmp[:,bounds[0]] = a_tmp_max
- a_tmp[:,bounds[1]-1] = a_tmp_max
- else:
- a_tmp[bounds[0],:] = a_tmp_max
- a_tmp[bounds[1]-1,:] = a_tmp_max
- print(f'lower bound = {low} (inclusive)\nupper bound = {upp} (exclusive)')
- quickImshow(a_tmp, title=title)
- del a_tmp
- if pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True) == 'no':
- bounds = selectImageBounds(a, axis, low=low_save, upp=upp_save, num_min=num_min_save,
- title=title)
- return bounds
-
-def fitStep(x=None, y=None, model='step', form='arctan'):
- if not isinstance(y, np.ndarray) or y.ndim != 1:
- logging.error('Illegal array type or dimension for y in fitStep')
- return
- if isinstance(x, type(None)):
- x = np.array(range(y.size))
- elif not isinstance(x, np.ndarray) or x.ndim != 1 or x.size != y.size:
- logging.error('Illegal array type or dimension for x in fitStep')
- return
- if not isinstance(model, str) or not model in ('step', 'rectangle'):
- illegal_value('model', model, 'fitStepModel')
- return
- if not isinstance(form, str) or not form in ('linear', 'atan', 'arctan', 'erf', 'logistic'):
- illegal_value('form', form, 'fitStepModel')
- return
-
- if model == 'step':
- mod = StepModel(form=form)
- else:
- mod = RectangleModel(form=form)
- pars = mod.guess(y, x=x)
- out = mod.fit(y, pars, x=x)
- #print(f'fit_report:\n{out.fit_report()}')
- #quickPlot((x,y),(x,out.best_fit))
- return out.best_values
-
-class Config:
- """Base class for processing a config file or dictionary.
- """
- def __init__(self, config_file=None, config_dict=None):
- self.config = {}
- self.load_flag = False
- self.suffix = None
-
- # Load config file
- if config_file is not None and config_dict is not None:
- logging.warning('Ignoring config_dict (both config_file and config_dict are specified)')
- if config_file is not None:
- self.loadFile(config_file)
- elif config_dict is not None:
- self.loadDict(config_dict)
-
- def loadFile(self, config_file):
- """Load a config file.
- """
- if self.load_flag:
- logging.warning('Overwriting any previously loaded config file')
- self.config = {}
-
- # Ensure config file exists
- if not os.path.isfile(config_file):
- logging.error(f'Unable to load {config_file}')
- return
-
- # Load config file (for now for Galaxy, allow .dat extension)
- self.suffix = os.path.splitext(config_file)[1]
- if self.suffix == '.yml' or self.suffix == '.yaml' or self.suffix == '.dat':
- with open(config_file, 'r') as f:
- self.config = yaml.safe_load(f)
- elif self.suffix == '.txt':
- with open(config_file, 'r') as f:
- lines = f.read().splitlines()
- self.config = {item[0].strip():literal_eval(item[1].strip()) for item in
- [line.split('#')[0].split('=') for line in lines if '=' in line.split('#')[0]]}
- else:
- logging.error(f'Illegal config file extension: {self.suffix}')
-
- # Make sure config file was correctly loaded
- if isinstance(self.config, dict):
- self.load_flag = True
- else:
- logging.error(f'Unable to load dictionary from config file: {config_file}')
- self.config = {}
-
- def loadDict(self, config_dict):
- """Takes a dictionary and places it into self.config.
- """
- exit('loadDict not tested yet, what format do we follow: txt or yaml?')
- if self.load_flag:
- logging.warning('Overwriting the previously loaded config file')
-
- if isinstance(config_dict, dict):
- self.config = config_dict
- self.load_flag = True
- else:
- logging.error(f'Illegal dictionary config object: {config_dict}')
- self.config = {}
-
- def saveFile(self, config_file):
- """Save the config file (as a yaml file only right now).
- """
- suffix = os.path.splitext(config_file)[1]
- if suffix != '.yml' and suffix != '.yaml':
- logging.error(f'Illegal config file extension: {suffix}')
-
- # Check if config file exists
- if os.path.isfile(config_file):
- logging.info(f'Updating {config_file}')
- else:
- logging.info(f'Saving {config_file}')
-
- # Save config file
- with open(config_file, 'w') as f:
- yaml.dump(self.config, f)
-
- def validate(self, pars_required, pars_missing=None):
- """Returns False if any required first level keys are missing.
- """
- if not self.load_flag:
- logging.error('Load a config file prior to calling Config.validate')
- pars = [p for p in pars_required if p not in self.config]
- if isinstance(pars_missing, list):
- pars_missing.extend(pars)
- elif pars_missing is not None:
- illegal_value('pars_missing', pars_missing, 'Config.validate')
- if len(pars) > 0:
- return False
- return True
-
-#RV FIX this is for a txt file, obsolete?
-# def update_txt(self, config_file, key, value, search_string=None, header=None):
-# if not self.load_flag:
-# logging.error('Load a config file prior to calling Config.update')
-#
-# if not os.path.isfile(config_file):
-# logging.error(f'Unable to load {config_file}')
-# lines = []
-# else:
-# with open(config_file, 'r') as f:
-# lines = f.read().splitlines()
-# config = {item[0].strip():literal_eval(item[1].strip()) for item in
-# [line.split('#')[0].split('=') for line in lines if '=' in line.split('#')[0]]}
-# if not isinstance(key, str):
-# illegal_value('key', key, 'Config.update')
-# return config
-# if isinstance(value, str):
-# newline = f"{key} = '{value}'"
-# else:
-# newline = f'{key} = {value}'
-# if key in config.keys():
-# # Update key with value
-# for index,line in enumerate(lines):
-# if '=' in line:
-# item = line.split('#')[0].split('=')
-# if item[0].strip() == key:
-# lines[index] = newline
-# break
-# else:
-# # Insert new key/value pair
-# if search_string is not None:
-# if isinstance(search_string, str):
-# search_string = [search_string]
-# elif not isinstance(search_string, (tuple, list)):
-# illegal_value('search_string', search_string, 'Config.update')
-# search_string = None
-# update_flag = False
-# if search_string is not None:
-# indices = [[index for index,line in enumerate(lines) if item in line]
-# for item in search_string]
-# for i,index in enumerate(indices):
-# if index:
-# if len(search_string) > 1 and key < search_string[i]:
-# lines.insert(index[0], newline)
-# else:
-# lines.insert(index[0]+1, newline)
-# update_flag = True
-# break
-# if not update_flag:
-# if isinstance(header, str):
-# lines += ['', header, newline]
-# else:
-# lines += ['', newline]
-# # Write updated config file
-# with open(config_file, 'w') as f:
-# for line in lines:
-# f.write(f'{line}\n')
-# # Update loaded config
-# config['key'] = value
-#
-#RV update and bring into Config if needed again
-#def search(config_file, search_string):
-# if not os.path.isfile(config_file):
-# logging.error(f'Unable to load {config_file}')
-# return False
-# with open(config_file, 'r') as f:
-# lines = f.read()
-# if search_string in lines:
-# return True
-# return False
-
-class Detector:
- """Class for processing a detector info file or dictionary.
- """
- def __init__(self, detector_id):
- self.detector = {}
- self.load_flag = False
- self.validate_flag = False
-
- # Load detector file
- self.loadFile(detector_id)
-
- def loadFile(self, detector_id):
- """Load a detector file.
- """
- if self.load_flag:
- logging.warning('Overwriting the previously loaded detector file')
- self.detector = {}
-
- # Ensure detector file exists
- if not isinstance(detector_id, str):
- illegal_value('detector_id', detector_id, 'Detector.loadFile')
- return
- detector_file = f'{detector_id}.yaml'
- if not os.path.isfile(detector_file):
- detector_file = self.config['detector_id']+'.yaml'
- if not os.path.isfile(detector_file):
- logging.error(f'Unable to load detector info file for {detector_id}')
- return
-
- # Load detector file
- with open(detector_file, 'r') as f:
- self.detector = yaml.safe_load(f)
-
- # Make sure detector file was correctly loaded
- if isinstance(self.detector, dict):
- self.load_flag = True
- else:
- logging.error(f'Unable to load dictionary from detector file: {detector_file}')
- self.detector = {}
-
- def validate(self):
- """Returns False if any config parameters is illegal or missing.
- """
- if not self.load_flag:
- logging.error('Load a detector file prior to calling Detector.validate')
-
- # Check for required first-level keys
- pars_required = ['detector', 'lens_magnification']
- pars_missing = [p for p in pars_required if p not in self.detector]
- if len(pars_missing) > 0:
- logging.error(f'Missing item(s) in detector file: {", ".join(pars_missing)}')
- return False
-
- is_valid = True
-
- # Check detector pixel config keys
- pixels = self.detector['detector'].get('pixels')
- if not pixels:
- pars_missing.append('detector:pixels')
- else:
- rows = pixels.get('rows')
- if not rows:
- pars_missing.append('detector:pixels:rows')
- columns = pixels.get('columns')
- if not columns:
- pars_missing.append('detector:pixels:columns')
- size = pixels.get('size')
- if not size:
- pars_missing.append('detector:pixels:size')
-
- if not len(pars_missing):
- self.validate_flag = True
- else:
- is_valid = False
-
- return is_valid
-
- def getPixelSize(self):
- """Returns the detector pixel size.
- """
- if not self.validate_flag:
- logging.error('Validate detector file info prior to calling Detector.getPixelSize')
-
- lens_magnification = self.detector.get('lens_magnification')
- if not isinstance(lens_magnification, (int,float)) or lens_magnification <= 0.:
- illegal_value('lens_magnification', lens_magnification, 'detector file')
- return 0
- pixel_size = self.detector['detector'].get('pixels').get('size')
- if isinstance(pixel_size, (int,float)):
- if pixel_size <= 0.:
- illegal_value('pixel_size', pixel_size, 'detector file')
- return 0
- pixel_size /= lens_magnification
- elif isinstance(pixel_size, list):
- if ((len(pixel_size) > 2) or
- (len(pixel_size) == 2 and pixel_size[0] != pixel_size[1])):
- illegal_value('pixel size', pixel_size, 'detector file')
- return 0
- elif not is_num(pixel_size[0], 0.):
- illegal_value('pixel size', pixel_size, 'detector file')
- return 0
- else:
- pixel_size = pixel_size[0]/lens_magnification
- else:
- illegal_value('pixel size', pixel_size, 'detector file')
- return 0
-
- return pixel_size
-
- def getDimensions(self):
- """Returns the detector pixel dimensions.
- """
- if not self.validate_flag:
- logging.error('Validate detector file info prior to calling Detector.getDimensions')
-
- pixels = self.detector['detector'].get('pixels')
- num_rows = pixels.get('rows')
- if not is_int(num_rows, 1):
- illegal_value('rows', num_rows, 'detector file')
- return (0, 0)
- num_columns = pixels.get('columns')
- if not is_int(num_columns, 1):
- illegal_value('columns', num_columns, 'detector file')
- return (0, 0)
-
- return num_rows, num_columns
diff -r 059819ea1f0e -r 26f99fdd8d61 read_image.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/read_image.py Thu Jul 28 16:05:24 2022 +0000
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+
+import logging
+
+import sys
+import argparse
+import numpy as np
+
+def __main__():
+
+ # Parse command line arguments
+ parser = argparse.ArgumentParser(
+ description='Read a reconstructed image')
+ parser.add_argument('-i', '--input_image',
+ help='Reconstructed image file')
+ parser.add_argument('-l', '--log',
+ type=argparse.FileType('w'),
+ default=sys.stdout,
+ help='Log file')
+ args = parser.parse_args()
+
+ # Set basic log configuration
+ logging_format = '%(asctime)s : %(levelname)s - %(module)s : %(funcName)s - %(message)s'
+ log_level = 'INFO'
+ level = getattr(logging, log_level.upper(), None)
+ if not isinstance(level, int):
+ raise ValueError(f'Invalid log_level: {log_level}')
+ logging.basicConfig(format=logging_format, level=level, force=True,
+ handlers=[logging.StreamHandler()])
+
+ logging.info(f'input_image = {args.input_image}')
+ logging.debug(f'log = {args.log}')
+ logging.debug(f'is log stdout? {args.log is sys.stdout}')
+
+ # Load image
+ f = np.load(args.input_image)
+ logging.info(f'f shape = {f.shape}')
+
+if __name__ == "__main__":
+ __main__()
+
diff -r 059819ea1f0e -r 26f99fdd8d61 read_image.xml
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/read_image.xml Thu Jul 28 16:05:24 2022 +0000
@@ -0,0 +1,20 @@
+
+ Read reconstructed image
+
+ tomo_macros.xml
+
+
+
+
+
+
+
+
+
+
diff -r 059819ea1f0e -r 26f99fdd8d61 tomo.py
--- a/tomo.py Wed Apr 27 17:28:26 2022 +0000
+++ b/tomo.py Thu Jul 28 16:05:24 2022 +0000
@@ -14,10 +14,6 @@
import getopt
import re
import io
-try:
- import pyinputplus as pyip
-except:
- pass
import argparse
import numpy as np
try:
@@ -43,7 +39,13 @@
except:
pass
-import msnc_tools as msnc
+from detector import TomoDetectorConfig
+from fit import Fit
+#from msnctools.general import illegal_value, is_int, is_num, is_index_range, get_trailing_int, \
+from general import illegal_value, is_int, is_num, is_index_range, get_trailing_int, \
+ input_int, input_num, input_yesno, input_menu, findImageFiles, loadImageStack, clearPlot, \
+ draw_mask_1d, quickPlot, clearImshow, quickImshow, combine_tiffs_in_h5, Config
+from general import selectArrayBounds,selectImageRange, selectImageBounds
# the following tomopy routines don't run with more than 24 cores on Galaxy-Dev
# - tomopy.find_center_vo
@@ -68,7 +70,7 @@
def __exit__(self, exc_type, exc_value, traceback):
ne.set_num_threads(self.num_core_org)
-class ConfigTomo(msnc.Config):
+class ConfigTomo(Config):
"""Class for processing a config file.
"""
@@ -96,9 +98,9 @@
# Check number of tomography image stacks
self.num_tomo_stacks = self.config.get('num_tomo_stacks', 1)
- if not msnc.is_int(self.num_tomo_stacks, 1):
+ if not is_int(self.num_tomo_stacks, 1):
self.num_tomo_stacks = None
- msnc.illegal_value('num_tomo_stacks', self.num_tomo_stacks, 'config file')
+ illegal_value(self.num_tomo_stacks, 'num_tomo_stacks', 'config file')
return False
logging.info(f'num_tomo_stacks = {self.num_tomo_stacks}')
@@ -109,8 +111,8 @@
logging.error(f'Incorrect number of tomography data path names in config file')
is_valid = False
self.tomo_data_paths = [tomo_data_paths_indices[i][1] for i in range(self.num_tomo_stacks)]
- self.tomo_data_indices = [msnc.get_trailing_int(tomo_data_paths_indices[i][0])
- if msnc.get_trailing_int(tomo_data_paths_indices[i][0]) else None
+ self.tomo_data_indices = [get_trailing_int(tomo_data_paths_indices[i][0])
+ if get_trailing_int(tomo_data_paths_indices[i][0]) else None
for i in range(self.num_tomo_stacks)]
tomo_ref_height_indices = sorted({key:value for key,value in self.config.items()
if 'z_pos' in key}.items())
@@ -125,18 +127,18 @@
# Check tomo angle (theta) range
self.start_theta = self.config.get('start_theta', 0.)
- if not msnc.is_num(self.start_theta, 0.):
- msnc.illegal_value('start_theta', self.start_theta, 'config file')
+ if not is_num(self.start_theta, 0.):
+ illegal_value(self.start_theta, 'start_theta', 'config file')
is_valid = False
logging.debug(f'start_theta = {self.start_theta}')
self.end_theta = self.config.get('end_theta', 180.)
- if not msnc.is_num(self.end_theta, self.start_theta):
- msnc.illegal_value('end_theta', self.end_theta, 'config file')
+ if not is_num(self.end_theta, self.start_theta):
+ illegal_value(self.end_theta, 'end_theta', 'config file')
is_valid = False
logging.debug(f'end_theta = {self.end_theta}')
self.num_thetas = self.config.get('num_thetas')
- if not (self.num_thetas is None or msnc.is_int(self.num_thetas, 1)):
- msnc.illegal_value('num_thetas', self.num_thetas, 'config file')
+ if not (self.num_thetas is None or is_int(self.num_thetas, 1)):
+ illegal_value(self.num_thetas, 'num_thetas', 'config file')
self.num_thetas = None
is_valid = False
logging.debug(f'num_thetas = {self.num_thetas}')
@@ -165,16 +167,16 @@
# Check number of tomography image stacks
stack_info = self.config['stack_info']
self.num_tomo_stacks = stack_info.get('num', 1)
- if not msnc.is_int(self.num_tomo_stacks, 1):
+ if not is_int(self.num_tomo_stacks, 1):
self.num_tomo_stacks = None
- msnc.illegal_value('stack_info:num', self.num_tomo_stacks, 'config file')
+ illegal_value(self.num_tomo_stacks, 'num_tomo_stacks', 'config file')
return False
logging.info(f'num_tomo_stacks = {self.num_tomo_stacks}')
# Find tomography images file/folders and stack parameters
stacks = stack_info.get('stacks')
if stacks is None or len(stacks) is not self.num_tomo_stacks:
- msnc.illegal_value('stack_info:stacks', stacks, 'config file')
+ illegal_value(stacks, 'stacks', 'config file')
return False
self.tomo_data_paths = []
self.tomo_data_indices = []
@@ -192,18 +194,18 @@
self.num_thetas = None
else:
self.start_theta = theta_range.get('start', 0.)
- if not msnc.is_num(self.start_theta, 0.):
- msnc.illegal_value('theta_range:start', self.start_theta, 'config file')
+ if not is_num(self.start_theta, 0.):
+ illegal_value(self.start_theta, 'theta_range:start', 'config file')
is_valid = False
logging.debug(f'start_theta = {self.start_theta}')
self.end_theta = theta_range.get('end', 180.)
- if not msnc.is_num(self.end_theta, self.start_theta):
- msnc.illegal_value('theta_range:end', self.end_theta, 'config file')
+ if not is_num(self.end_theta, self.start_theta):
+ illegal_value(self.end_theta, 'theta_range:end', 'config file')
is_valid = False
logging.debug(f'end_theta = {self.end_theta}')
self.num_thetas = theta_range.get('num')
- if self.num_thetas and not msnc.is_int(self.num_thetas, 1):
- msnc.illegal_value('theta_range:num', self.num_thetas, 'config file')
+ if self.num_thetas and not is_int(self.num_thetas, 1):
+ illegal_value(self.num_thetas, 'theta_range:num', 'config file')
self.num_thetas = None
is_valid = False
logging.debug(f'num_thetas = {self.num_thetas}')
@@ -218,7 +220,7 @@
# Check work_folder (shared by both file formats)
work_folder = os.path.abspath(self.config.get('work_folder', ''))
if not os.path.isdir(work_folder):
- msnc.illegal_value('work_folder', work_folder, 'config file')
+ illegal_value(work_folder, 'work_folder', 'config file')
is_valid = False
logging.info(f'work_folder: {work_folder}')
@@ -226,7 +228,7 @@
self.data_filetype = self.config.get('data_filetype', 'tif')
if not isinstance(self.data_filetype, str) or (self.data_filetype != 'tif' and
self.data_filetype != 'h5'):
- msnc.illegal_value('data_filetype', self.data_filetype, 'config file')
+ illegal_value(self.data_filetype, 'data_filetype', 'config file')
if self.suffix == '.yml' or self.suffix == '.yaml':
is_valid = self._validate_yaml()
@@ -244,7 +246,7 @@
self.tdf_data_path = os.path.abspath(
f'{work_folder}/{self.tdf_data_path}')
else:
- msnc.illegal_value('tdf_data_path', tdf_data_fil, 'config file')
+ illegal_value(tdf_data_fil, 'tdf_data_path', 'config file')
is_valid = False
else:
if isinstance(self.tdf_data_path, int):
@@ -255,7 +257,7 @@
self.tdf_data_path = os.path.abspath(
f'{work_folder}/{self.tdf_data_path}')
else:
- msnc.illegal_value('tdf_data_path', self.tdf_data_path, 'config file')
+ illegal_value(self.tdf_data_path, 'tdf_data_path', 'config file')
is_valid = False
logging.info(f'dark field images path = {self.tdf_data_path}')
@@ -267,7 +269,7 @@
self.tbf_data_path = os.path.abspath(
f'{work_folder}/{self.tbf_data_path}')
else:
- msnc.illegal_value('tbf_data_path', tbf_data_fil, 'config file')
+ illegal_value(tbf_data_fil, 'tbf_data_path', 'config file')
is_valid = False
else:
if isinstance(self.tbf_data_path, int):
@@ -278,7 +280,7 @@
self.tbf_data_path = os.path.abspath(
f'{work_folder}/{self.tbf_data_path}')
else:
- msnc.illegal_value('tbf_data_path', self.tbf_data_path, 'config file')
+ illegal_value(self.tbf_data_path, 'tbf_data_path', 'config file')
is_valid = False
logging.info(f'bright field images path = {self.tbf_data_path}')
@@ -293,7 +295,7 @@
if not os.path.isabs(data_path):
data_path = os.path.abspath(f'{work_folder}/{data_path}')
else:
- msnc.illegal_value(f'stack_info:stacks:data_path', data_path, 'config file')
+ illegal_value(data_path, 'stack_info:stacks:data_path', 'config file')
is_valid = False
data_path = None
else:
@@ -303,7 +305,7 @@
if not os.path.isabs(data_path):
data_path = os.path.abspath(f'{work_folder}/{data_path}')
else:
- msnc.illegal_value(f'stack_info:stacks:data_path', data_path, 'config file')
+ illegal_value(data_path, 'stack_info:stacks:data_path', 'config file')
is_valid = False
data_path = None
tomo_data_paths.append(data_path)
@@ -315,7 +317,7 @@
else:
index = 1
elif not isinstance(index, int):
- msnc.illegal_value(f'stack_info:stacks:index', index, 'config file')
+ illegal_value(index, 'stack_info:stacks:index', 'config file')
is_valid = False
index = None
tomo_data_indices.append(index)
@@ -326,13 +328,13 @@
ref_height = None
else:
ref_height = 0.
- elif not msnc.is_num(ref_height):
- msnc.illegal_value(f'stack_info:stacks:ref_height', ref_height, 'config file')
+ elif not is_num(ref_height):
+ illegal_value(ref_height, 'stack_info:stacks:ref_height', 'config file')
is_valid = False
ref_height = None
# Set reference heights relative to first stack
- if (len(tomo_ref_heights) and msnc.is_num(ref_height) and
- msnc.is_num(tomo_ref_heights[0])):
+ if (len(tomo_ref_heights) and is_num(ref_height) and
+ is_num(tomo_ref_heights[0])):
ref_height = (round(ref_height-tomo_ref_heights[0], 3))
tomo_ref_heights.append(ref_height)
tomo_ref_heights[0] = 0.0
@@ -436,7 +438,7 @@
else:
raise OSError(f'Invalid config_out input {config_out} {type(config_out)}')
if not os.path.isdir(output_folder):
- raise OSError(f'Invalid output_folder input {output_folder} {type(output_folder)}')
+ os.mkdir(os.path.abspath(output_folder))
if isinstance(log_stream, str):
path = os.path.split(log_stream)[0]
if path and not os.path.isdir(path):
@@ -515,12 +517,12 @@
self.is_valid = self.cf.validate()
# Load detector info file
- df = msnc.Detector(self.cf.config['detector']['id'])
+ df = TomoDetectorConfig(self.cf.config['detector']['id'])
# Check detector info file parameters
- if df.validate():
- pixel_size = df.getPixelSize()
- num_rows, num_columns = df.getDimensions()
+ if df.valid:
+ pixel_size = df.pixel_size
+ num_rows, num_columns = df.dimensions
if not pixel_size or not num_rows or not num_columns:
self.is_valid = False
else:
@@ -577,9 +579,9 @@
# Check number of tomography angles/thetas
num_thetas = self.config['theta_range'].get('num')
if num_thetas is None:
- num_thetas = pyip.inputInt('\nEnter the number of thetas (>0): ', greaterThan=0)
- elif not msnc.is_int(num_thetas, 0):
- msnc.illegal_value('num_thetas', num_thetas, 'config file')
+ num_thetas = input_int('\nEnter the number of thetas', 1)
+ elif not is_int(num_thetas, 0):
+ illegal_value(num_thetas, 'num_thetas', 'config file')
self.is_valid = False
return
self.config['theta_range']['num'] = num_thetas
@@ -591,7 +593,7 @@
img_offset = dark_field.get('img_offset', -1)
num_imgs = dark_field.get('num', 0)
if not self.test_mode:
- img_start, img_offset, num_imgs = msnc.selectImageRange(img_start, img_offset,
+ img_start, img_offset, num_imgs = selectImageRange(img_start, img_offset,
num_imgs, 'dark field')
if img_start < 0 or num_imgs < 1:
logging.error('Unable to find suitable dark field images')
@@ -610,7 +612,7 @@
img_offset = bright_field.get('img_offset', -1)
num_imgs = bright_field.get('num', 0)
if not self.test_mode:
- img_start, img_offset, num_imgs = msnc.selectImageRange(img_start, img_offset,
+ img_start, img_offset, num_imgs = selectImageRange(img_start, img_offset,
num_imgs, 'bright field')
if img_start < 0 or num_imgs < 1:
logging.error('Unable to find suitable bright field images')
@@ -632,7 +634,7 @@
img_offset = stack.get('img_offset', -1)
num_imgs = stack.get('num', 0)
if not self.test_mode:
- img_start, img_offset, num_imgs = msnc.selectImageRange(img_start, img_offset,
+ img_start, img_offset, num_imgs = selectImageRange(img_start, img_offset,
num_imgs, f'tomography stack {index}', num_thetas)
if img_start < 0 or num_imgs != num_thetas:
logging.error('Unable to find suitable tomography images')
@@ -656,7 +658,7 @@
# Load the dark field images
logging.debug('Loading dark field...')
dark_field = self.config['dark_field']
- tdf_stack = msnc.loadImageStack(tdf_files, self.config['data_filetype'],
+ tdf_stack = loadImageStack(tdf_files, self.config['data_filetype'],
dark_field['img_offset'], dark_field['num'])
# Take median
@@ -671,10 +673,10 @@
logging.debug(f'tdf_mean = {tdf_mean}')
np.nan_to_num(self.tdf, copy=False, nan=tdf_mean, posinf=tdf_mean, neginf=0.)
if self.galaxy_flag:
- msnc.quickImshow(self.tdf, title='dark field', path='setup_pngs',
+ quickImshow(self.tdf, title='dark field', path='setup_pngs',
save_fig=True, save_only=True)
elif not self.test_mode:
- msnc.quickImshow(self.tdf, title='dark field', path=self.output_folder,
+ quickImshow(self.tdf, title='dark field', path=self.output_folder,
save_fig=self.save_plots, save_only=self.save_plots_only)
def _genBright(self, tbf_files):
@@ -683,7 +685,7 @@
# Load the bright field images
logging.debug('Loading bright field...')
bright_field = self.config['bright_field']
- tbf_stack = msnc.loadImageStack(tbf_files, self.config['data_filetype'],
+ tbf_stack = loadImageStack(tbf_files, self.config['data_filetype'],
bright_field['img_offset'], bright_field['num'])
# Take median
@@ -705,10 +707,10 @@
else:
logging.warning('Dark field unavailable')
if self.galaxy_flag:
- msnc.quickImshow(self.tbf, title='bright field', path='setup_pngs',
+ quickImshow(self.tbf, title='bright field', path='setup_pngs',
save_fig=True, save_only=True)
elif not self.test_mode:
- msnc.quickImshow(self.tbf, title='bright field', path=self.output_folder,
+ quickImshow(self.tbf, title='bright field', path=self.output_folder,
save_fig=self.save_plots, save_only=self.save_plots_only)
def _setDetectorBounds(self, tomo_stack_files):
@@ -721,10 +723,10 @@
img_x_bounds = preprocess.get('img_x_bounds', [0, self.tbf.shape[0]])
if img_x_bounds[0] is not None and img_x_bounds[1] is not None:
if img_x_bounds[0] < 0:
- msnc.illegal_value('img_x_bounds[0]', img_x_bounds[0], 'config file')
+ illegal_value(img_x_bounds[0], 'preprocess:img_x_bounds[0]', 'config file')
img_x_bounds[0] = 0
- if not msnc.is_index_range(img_x_bounds, 0, self.tbf.shape[0]):
- msnc.illegal_value('img_x_bounds[1]', img_x_bounds[1], 'config file')
+ if not is_index_range(img_x_bounds, 0, self.tbf.shape[0]):
+ illegal_value(img_x_bounds[1], 'preprocess:img_x_bounds[1]', 'config file')
img_x_bounds[1] = self.tbf.shape[0]
if self.test_mode:
# Update config and save to file
@@ -763,11 +765,13 @@
x_low = 0
x_upp = x_sum.size
if num_x_min is not None:
- fit = msnc.fitStep(y=x_sum, model='rectangle', form='atan')
- x_low = fit.get('center1', None)
- x_upp = fit.get('center2', None)
- sig_low = fit.get('sigma1', None)
- sig_upp = fit.get('sigma2', None)
+ fit = Fit.fit_data(np.array(range(len(x_sum))), x_sum, models='rectangle',
+ form='atan', guess=True)
+ parameters = fit.best_values
+ x_low = parameters.get('center1', None)
+ x_upp = parameters.get('center2', None)
+ sig_low = parameters.get('sigma1', None)
+ sig_upp = parameters.get('sigma2', None)
if (x_low is not None and x_upp is not None and sig_low is not None and
sig_upp is not None and 0 <= x_low < x_upp <= x_sum.size and
(sig_low+sig_upp)/(x_upp-x_low) < 0.1):
@@ -784,14 +788,14 @@
else:
x_low = 0
x_upp = x_sum.size
- msnc.quickPlot((range(x_sum.size), x_sum),
+ quickPlot((range(x_sum.size), x_sum),
([x_low, x_low], [x_sum_min, x_sum_max], 'r-'),
([x_upp-1, x_upp-1], [x_sum_min, x_sum_max], 'r-'),
title=f'sum bright field over theta/y (row bounds: [{x_low}, {x_upp}])',
path='setup_pngs', name='detectorbounds.png', save_fig=True, save_only=True,
show_grid=True)
for i,stack in enumerate(stacks):
- tomo_stack = msnc.loadImageStack(tomo_stack_files[i], self.config['data_filetype'],
+ tomo_stack = loadImageStack(tomo_stack_files[i], self.config['data_filetype'],
stack['img_offset'], 1)
tomo_stack = tomo_stack[0,:,:]
if num_x_min is not None:
@@ -799,7 +803,7 @@
tomo_stack[x_low,:] = tomo_stack_max
tomo_stack[x_upp-1,:] = tomo_stack_max
title = f'tomography image at theta={self.config["theta_range"]["start"]}'
- msnc.quickImshow(tomo_stack, title=title, path='setup_pngs',
+ quickImshow(tomo_stack, title=title, path='setup_pngs',
name=f'tomo_{stack["index"]}.png', save_fig=True, save_only=True,
show_grid=True)
del tomo_stack
@@ -817,47 +821,47 @@
# For one tomography stack only: load the first image
title = None
- msnc.quickImshow(self.tbf, title='bright field')
+ quickImshow(self.tbf, title='bright field')
if num_tomo_stacks == 1:
- tomo_stack = msnc.loadImageStack(tomo_stack_files[0], self.config['data_filetype'],
+ tomo_stack = loadImageStack(tomo_stack_files[0], self.config['data_filetype'],
stacks[0]['img_offset'], 1)
title = f'tomography image at theta={self.config["theta_range"]["start"]}'
- msnc.quickImshow(tomo_stack[0,:,:], title=title)
- tomo_or_bright = pyip.inputNum('\nSelect image bounds from bright field (0) or '+
- 'from first tomography image (1): ', min=0, max=1)
+ quickImshow(tomo_stack[0,:,:], title=title)
+ tomo_or_bright = input_menu(['bright field', 'first tomography image'],
+ header='\nSelect image bounds from')
else:
print('\nSelect image bounds from bright field')
tomo_or_bright = 0
if tomo_or_bright:
x_sum = np.sum(tomo_stack[0,:,:], 1)
- use_bounds = 'no'
+ use_bounds = False
if img_x_bounds[0] is not None and img_x_bounds[1] is not None:
tmp = np.copy(tomo_stack[0,:,:])
tmp_max = tmp.max()
tmp[img_x_bounds[0],:] = tmp_max
tmp[img_x_bounds[1]-1,:] = tmp_max
title = f'tomography image at theta={self.config["theta_range"]["start"]}'
- msnc.quickImshow(tmp, title=title)
+ quickImshow(tmp, title=title)
del tmp
x_sum_min = x_sum.min()
x_sum_max = x_sum.max()
- msnc.quickPlot((range(x_sum.size), x_sum),
+ quickPlot((range(x_sum.size), x_sum),
([img_x_bounds[0], img_x_bounds[0]], [x_sum_min, x_sum_max], 'r-'),
([img_x_bounds[1]-1, img_x_bounds[1]-1], [x_sum_min, x_sum_max], 'r-'),
title='sum over theta and y')
print(f'lower bound = {img_x_bounds[0]} (inclusive)\n'+
f'upper bound = {img_x_bounds[1]} (exclusive)]')
- use_bounds = pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True)
- if use_bounds == 'no':
- img_x_bounds = msnc.selectImageBounds(tomo_stack[0,:,:], 0,
+ use_bounds = input_yesno('Accept these bounds (y/n)?', 'y')
+ if not use_bounds:
+ img_x_bounds = list(selectImageBounds(tomo_stack[0,:,:], 0,
img_x_bounds[0], img_x_bounds[1], num_x_min,
- f'tomography image at theta={self.config["theta_range"]["start"]}')
+ f'tomography image at theta={self.config["theta_range"]["start"]}'))
if num_x_min is not None and img_x_bounds[1]-img_x_bounds[0]+1 < num_x_min:
logging.warning('Image bounds and pixel size prevent seamless stacking')
title = f'tomography image at theta={self.config["theta_range"]["start"]}'
- msnc.quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder,
+ quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder,
save_fig=self.save_plots, save_only=True)
- msnc.quickPlot(range(img_x_bounds[0], img_x_bounds[1]),
+ quickPlot(range(img_x_bounds[0], img_x_bounds[1]),
x_sum[img_x_bounds[0]:img_x_bounds[1]],
title='sum over theta and y', path=self.output_folder,
save_fig=self.save_plots, save_only=True)
@@ -865,29 +869,31 @@
x_sum = np.sum(self.tbf, 1)
x_sum_min = x_sum.min()
x_sum_max = x_sum.max()
- use_bounds = 'no'
+ use_bounds = False
if img_x_bounds[0] is not None and img_x_bounds[1] is not None:
tmp = np.copy(self.tbf)
tmp_max = tmp.max()
tmp[img_x_bounds[0],:] = tmp_max
tmp[img_x_bounds[1]-1,:] = tmp_max
title = 'bright field'
- msnc.quickImshow(tmp, title=title)
+ quickImshow(tmp, title=title)
del tmp
- msnc.quickPlot((range(x_sum.size), x_sum),
+ quickPlot((range(x_sum.size), x_sum),
([img_x_bounds[0], img_x_bounds[0]], [x_sum_min, x_sum_max], 'r-'),
([img_x_bounds[1]-1, img_x_bounds[1]-1], [x_sum_min, x_sum_max], 'r-'),
title='sum over theta and y')
print(f'lower bound = {img_x_bounds[0]} (inclusive)\n'+
f'upper bound = {img_x_bounds[1]} (exclusive)]')
- use_bounds = pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True)
- if use_bounds == 'no':
- use_fit = 'no'
- fit = msnc.fitStep(y=x_sum, model='rectangle', form='atan')
- x_low = fit.get('center1', None)
- x_upp = fit.get('center2', None)
- sig_low = fit.get('sigma1', None)
- sig_upp = fit.get('sigma2', None)
+ use_bounds = input_yesno('Accept these bounds (y/n)?', 'y')
+ if not use_bounds:
+ use_fit = False
+ fit = Fit.fit_data(np.array(range(len(x_sum))), x_sum, models='rectangle',
+ form='atan', guess=True)
+ parameters = fit.best_values
+ x_low = parameters.get('center1', None)
+ x_upp = parameters.get('center2', None)
+ sig_low = parameters.get('sigma1', None)
+ sig_upp = parameters.get('sigma2', None)
if (x_low is not None and x_upp is not None and sig_low is not None and
sig_upp is not None and 0 <= x_low < x_upp <= x_sum.size and
(sig_low+sig_upp)/(x_upp-x_low) < 0.1):
@@ -906,34 +912,50 @@
tmp[x_low,:] = tmp_max
tmp[x_upp-1,:] = tmp_max
title = 'bright field'
- msnc.quickImshow(tmp, title=title)
+ quickImshow(tmp, title=title)
del tmp
- msnc.quickPlot((range(x_sum.size), x_sum),
+ quickPlot((range(x_sum.size), x_sum),
([x_low, x_low], [x_sum_min, x_sum_max], 'r-'),
([x_upp, x_upp], [x_sum_min, x_sum_max], 'r-'),
title='sum over theta and y')
print(f'lower bound = {x_low} (inclusive)')
print(f'upper bound = {x_upp} (exclusive)]')
- use_fit = pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True)
- if use_fit == 'no':
- img_x_bounds = msnc.selectArrayBounds(x_sum, img_x_bounds[0], img_x_bounds[1],
- num_x_min, 'sum over theta and y')
+ use_fit = input_yesno('Accept these bounds (y/n)?', 'y')
+ if use_fit:
+ img_x_bounds = [x_low, x_upp]
else:
- img_x_bounds = [x_low, x_upp]
+ accept = False
+ while not accept:
+ mask, img_x_bounds = draw_mask_1d(x_sum, title='select x data range',
+ legend='sum over theta and y')
+ print(f'img_x_bounds = {img_x_bounds}')
+ while (len(img_x_bounds) != 1 or (len(x_sum) >= num_x_min and
+ img_x_bounds[0][1]-img_x_bounds[0][0]+1 < num_x_min)):
+ exit('Should not be here')
+ print('Please select exactly one continuous range')
+ mask, img_x_bounds = draw_mask_1d(x_sum, title='select x data range',
+ legend='sum over theta and y')
+ img_x_bounds = list(img_x_bounds[0])
+ quickPlot(x_sum, vlines=img_x_bounds, title='sum over theta and y')
+ print(f'img_x_bounds = {img_x_bounds} (lower bound inclusive, upper bound '+
+ 'exclusive)')
+ accept = input_yesno('Accept these bounds (y/n)?', 'y')
+ if not accept:
+ img_x_bounds = None
if num_x_min is not None and img_x_bounds[1]-img_x_bounds[0]+1 < num_x_min:
logging.warning('Image bounds and pixel size prevent seamless stacking')
- #msnc.quickPlot(range(img_x_bounds[0], img_x_bounds[1]),
+ #quickPlot(range(img_x_bounds[0], img_x_bounds[1]),
# x_sum[img_x_bounds[0]:img_x_bounds[1]],
# title='sum over theta and y', path=self.output_folder,
# save_fig=self.save_plots, save_only=True)
- msnc.quickPlot((range(x_sum.size), x_sum),
+ quickPlot((range(x_sum.size), x_sum),
([img_x_bounds[0], img_x_bounds[0]], [x_sum_min, x_sum_max], 'r-'),
([img_x_bounds[1], img_x_bounds[1]], [x_sum_min, x_sum_max], 'r-'),
title='sum over theta and y', path=self.output_folder,
save_fig=self.save_plots, save_only=True)
del x_sum
for i,stack in enumerate(stacks):
- tomo_stack = msnc.loadImageStack(tomo_stack_files[i], self.config['data_filetype'],
+ tomo_stack = loadImageStack(tomo_stack_files[i], self.config['data_filetype'],
stack['img_offset'], 1)
tomo_stack = tomo_stack[0,:,:]
if num_x_min is not None:
@@ -942,21 +964,21 @@
tomo_stack[img_x_bounds[1]-1,:] = tomo_stack_max
title = f'tomography image at theta={self.config["theta_range"]["start"]}'
if self.galaxy_flag:
- msnc.quickImshow(tomo_stack, title=title, path='setup_pngs',
+ quickImshow(tomo_stack, title=title, path='setup_pngs',
name=f'tomo_{stack["index"]}.png', save_fig=True, save_only=True,
show_grid=True)
else:
- msnc.quickImshow(tomo_stack, title=title, path=self.output_folder,
+ quickImshow(tomo_stack, title=title, path=self.output_folder,
name=f'tomo_{stack["index"]}.png', save_fig=self.save_plots,
save_only=True, show_grid=True)
del tomo_stack
logging.debug(f'img_x_bounds: {img_x_bounds}')
if self.save_plots_only:
- msnc.clearFig('bright field')
- msnc.clearFig('sum over theta and y')
+ clearImshow('bright field')
+ clearPlot('sum over theta and y')
if title:
- msnc.clearFig(title)
+ clearPlot(title)
# Update config and save to file
if preprocess is None:
@@ -972,30 +994,26 @@
zoom_perc = 100
if not self.galaxy_flag:
if preprocess is None or 'zoom_perc' not in preprocess:
- if pyip.inputYesNo(
- '\nDo you want to zoom in to reduce memory requirement (y/[n])? ',
- blank=True) == 'yes':
- zoom_perc = pyip.inputInt(' Enter zoom percentage [1, 100]: ',
- min=1, max=100)
+ if input_yesno('\nDo you want to zoom in to reduce memory requirement (y/n)?', 'n'):
+ zoom_perc = input_int(' Enter zoom percentage', 1, 100)
else:
zoom_perc = preprocess['zoom_perc']
- if msnc.is_num(zoom_perc, 1., 100.):
+ if is_num(zoom_perc, 1., 100.):
zoom_perc = int(zoom_perc)
else:
- msnc.illegal_value('zoom_perc', zoom_perc, 'config file')
+ illegal_value(zoom_perc, 'preprocess:zoom_perc', 'config file')
zoom_perc = 100
num_theta_skip = 0
if not self.galaxy_flag:
if preprocess is None or 'num_theta_skip' not in preprocess:
- if pyip.inputYesNo(
- 'Do you want to skip thetas to reduce memory requirement (y/[n])? ',
- blank=True) == 'yes':
- num_theta_skip = pyip.inputInt(' Enter the number skip theta interval'+
- f' [0, {self.num_thetas-1}]: ', min=0, max=self.num_thetas-1)
+ if input_yesno('Do you want to skip thetas to reduce memory requirement (y/n)?',
+ 'n'):
+ num_theta_skip = input_int(' Enter the number skip theta interval', 0,
+ self.num_thetas-1)
else:
num_theta_skip = preprocess['num_theta_skip']
- if not msnc.is_int(num_theta_skip, 0):
- msnc.illegal_value('num_theta_skip', num_theta_skip, 'config file')
+ if not is_int(num_theta_skip, 0):
+ illegal_value(num_theta_skip, 'preprocess:num_theta_skip', 'config file')
num_theta_skip = 0
logging.debug(f'zoom_perc = {zoom_perc}')
logging.debug(f'num_theta_skip = {num_theta_skip}')
@@ -1023,16 +1041,16 @@
title = f'{base_name} {zoom_perc}p'
title += f'_{index}'
tomo_file = re.sub(r"\s+", '_', f'{self.output_folder}/{title}.npy')
- load_flag = 'no'
+ load_flag = False
available = False
if os.path.isfile(tomo_file):
available = True
if required:
- load_flag = 'yes'
+ load_flag = True
else:
- load_flag = pyip.inputYesNo(f'\nDo you want to load {tomo_file} (y/n)? ')
+ load_flag = input_yesno(f'\nDo you want to load {tomo_file} (y/n)?')
stack = np.array([])
- if load_flag == 'yes':
+ if load_flag:
t0 = time()
logging.info(f'Loading {tomo_file} ...')
try:
@@ -1042,7 +1060,7 @@
logging.error(f'Error loading {tomo_file}')
logging.info(f'... done in {time()-t0:.2f} seconds!')
if stack.size:
- msnc.quickImshow(stack[:,0,:], title=title, path=self.output_folder,
+ quickImshow(stack[:,0,:], title=title, path=self.output_folder,
save_fig=self.save_plots, save_only=self.save_plots_only)
return stack, available
@@ -1106,7 +1124,7 @@
# Load a stack of tomography images
index = stack['index']
t0 = time()
- tomo_stack = msnc.loadImageStack(tomo_stack_files[i], self.config['data_filetype'],
+ tomo_stack = loadImageStack(tomo_stack_files[i], self.config['data_filetype'],
stack['img_offset'], self.config['theta_range']['num'], num_theta_skip,
img_x_bounds, img_y_bounds)
tomo_stack = tomo_stack.astype('float64')
@@ -1145,7 +1163,7 @@
if not self.galaxy_flag:
title = f'red stack fullres {index}'
if not self.test_mode:
- msnc.quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder,
+ quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder,
save_fig=self.save_plots, save_only=self.save_plots_only)
if zoom_perc != 100:
t0 = time()
@@ -1160,7 +1178,7 @@
if not self.galaxy_flag:
title = f'red stack {zoom_perc}p {index}'
if not self.test_mode:
- msnc.quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder,
+ quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder,
save_fig=self.save_plots, save_only=self.save_plots_only)
# Convert tomography stack from theta,row,column to row,theta,column
@@ -1214,7 +1232,7 @@
logging.debug(f'sinogram range = [{dist_from_edge}, {two_offset-dist_from_edge}]')
sinogram = tomo_plane_T[dist_from_edge:two_offset-dist_from_edge,:]
if plot_sinogram and not self.test_mode:
- msnc.quickImshow(sinogram.T, f'sinogram center offset{center_offset:.2f}',
+ quickImshow(sinogram.T, f'sinogram center offset{center_offset:.2f}',
path=self.output_folder, save_fig=self.save_plots,
save_only=self.save_plots_only, aspect='auto')
@@ -1244,14 +1262,14 @@
vmax = np.max(edges[0,:,:])
vmin = -vmax
if self.galaxy_flag:
- msnc.quickImshow(edges[0,:,:], title, path=path, save_fig=True, save_only=True,
+ quickImshow(edges[0,:,:], title, path=path, save_fig=True, save_only=True,
cmap='gray', vmin=vmin, vmax=vmax)
else:
if path is None:
path=self.output_folder
- msnc.quickImshow(edges[0,:,:], f'{title} coolwarm', path=path,
+ quickImshow(edges[0,:,:], f'{title} coolwarm', path=path,
save_fig=self.save_plots, save_only=self.save_plots_only, cmap='coolwarm')
- msnc.quickImshow(edges[0,:,:], f'{title} gray', path=path,
+ quickImshow(edges[0,:,:], f'{title} gray', path=path,
save_fig=self.save_plots, save_only=self.save_plots_only, cmap='gray',
vmin=vmin, vmax=vmax)
del edges
@@ -1303,8 +1321,7 @@
title = f'edges row{row} center offset{center_offset_vo:.2f} Vo'
self._plotEdgesOnePlane(recon_plane, title)
if not self.galaxy_flag:
- if (pyip.inputYesNo('Try finding center using phase correlation '+
- '(y/[n])? ', blank=True) == 'yes'):
+ if input_yesno('Try finding center using phase correlation (y/n)?', 'n'):
tomo_center = tomopy.find_center_pc(sinogram, sinogram, tol=0.1,
rotc_guess=tomo_center)
error = 1.
@@ -1319,13 +1336,9 @@
eff_pixel_size, cross_sectional_dim, False, num_core)
title = f'edges row{row} center_offset{center_offset:.2f} PC'
self._plotEdgesOnePlane(recon_plane, title)
- if (pyip.inputYesNo('Accept a center location ([y]) or continue '+
- 'search (n)? ', blank=True) != 'no'):
- center_offset = pyip.inputNum(
- f' Enter chosen center offset [{-int(center)}, {int(center)}] '+
- f'([{center_offset_vo:.2f}])): ', blank=True)
- if center_offset == '':
- center_offset = center_offset_vo
+ if input_yesno('Accept a center location (y) or continue search (n)?', 'y'):
+ center_offset = input_num(' Enter chosen center offset', -center, center,
+ center_offset_vo)
del sinogram_T
del recon_plane
return float(center_offset)
@@ -1338,7 +1351,7 @@
set_center = galaxy_param['set_center']
set_range = galaxy_param['set_range']
center_offset_step = galaxy_param['set_step']
- if (not msnc.is_num(set_range, 0) or not msnc.is_num(center_offset_step) or
+ if (not is_num(set_range, 0) or not is_num(center_offset_step) or
center_offset_step <= 0):
logging.warning('Illegal center finding search parameter, skip search')
del sinogram_T
@@ -1347,17 +1360,15 @@
center_offset_low = set_center-set_range
center_offset_upp = set_center+set_range
else:
- center_offset_low = pyip.inputInt('\nEnter lower bound for center offset '+
- f'[{-int(center)}, {int(center)}]: ', min=-int(center), max=int(center))
- center_offset_upp = pyip.inputInt('Enter upper bound for center offset '+
- f'[{center_offset_low}, {int(center)}]: ',
- min=center_offset_low, max=int(center))
+ center_offset_low = input_int('\nEnter lower bound for center offset',
+ -center, center)
+ center_offset_upp = input_int('Enter upper bound for center offset',
+ center_offset_low, center)
if center_offset_upp == center_offset_low:
center_offset_step = 1
else:
- center_offset_step = pyip.inputInt('Enter step size for center offset search '+
- f'[1, {center_offset_upp-center_offset_low}]: ',
- min=1, max=center_offset_upp-center_offset_low)
+ center_offset_step = input_int('Enter step size for center offset search',
+ 1, center_offset_upp-center_offset_low)
num_center_offset = 1+int((center_offset_upp-center_offset_low)/center_offset_step)
center_offsets = np.linspace(center_offset_low, center_offset_upp, num_center_offset)
for center_offset in center_offsets:
@@ -1373,8 +1384,7 @@
self._plotEdgesOnePlane(recon_plane, title, path='find_center_pngs')
else:
self._plotEdgesOnePlane(recon_plane, title)
- if self.galaxy_flag or pyip.inputInt('\nContinue (0) or end the search (1): ',
- min=0, max=1):
+ if self.galaxy_flag or input_int('\nContinue (0) or end the search (1)', 0, 1):
break
del sinogram_T
@@ -1382,8 +1392,7 @@
if self.galaxy_flag:
center_offset = center_offset_vo
else:
- center_offset = pyip.inputNum(f' Enter chosen center offset '+
- f'[{-int(center)}, {int(center)}]: ', min=-int(center), max=int(center))
+ center_offset = input_num(' Enter chosen center offset', -center, center)
return float(center_offset)
def _reconstructOneTomoStack(self, tomo_stack, thetas, row_bounds=None,
@@ -1402,7 +1411,7 @@
if row_bounds is None:
row_bounds = [0, tomo_stack.shape[0]]
else:
- if not msnc.is_index_range(row_bounds, 0, tomo_stack.shape[0]):
+ if not is_index_range(row_bounds, 0, tomo_stack.shape[0]):
raise ValueError('Illegal row bounds in reconstructOneTomoStack')
if thetas.size != tomo_stack.shape[1]:
raise ValueError('theta dimension mismatch in reconstructOneTomoStack')
@@ -1467,7 +1476,7 @@
# Find dark field images
dark_field = self.config['dark_field']
- img_start, num_imgs, dark_files = msnc.findImageFiles(
+ img_start, num_imgs, dark_files = findImageFiles(
dark_field['data_path'], self.config['data_filetype'], 'dark field')
if img_start < 0 or num_imgs < 1:
logging.error('Unable to find suitable dark field images')
@@ -1492,13 +1501,13 @@
logging.info(f'Number of dark field images = {dark_field["num"]}')
logging.info(f'Dark field image start index = {dark_field["img_start"]}')
if num_imgs and tiff_to_h5_flag and self.config['data_filetype'] == 'tif':
- dark_files = msnc.combine_tiffs_in_h5(dark_files, num_imgs,
+ dark_files = combine_tiffs_in_h5(dark_files, num_imgs,
f'{self.config["work_folder"]}/dark_field.h5')
dark_field['data_path'] = dark_files[0]
# Find bright field images
bright_field = self.config['bright_field']
- img_start, num_imgs, bright_files = msnc.findImageFiles(
+ img_start, num_imgs, bright_files = findImageFiles(
bright_field['data_path'], self.config['data_filetype'], 'bright field')
if img_start < 0 or num_imgs < 1:
logging.error('Unable to find suitable bright field images')
@@ -1520,7 +1529,7 @@
logging.info(f'Number of bright field images = {bright_field["num"]}')
logging.info(f'Bright field image start index = {bright_field["img_start"]}')
if num_imgs and tiff_to_h5_flag and self.config['data_filetype'] == 'tif':
- bright_files = msnc.combine_tiffs_in_h5(bright_files, num_imgs,
+ bright_files = combine_tiffs_in_h5(bright_files, num_imgs,
f'{self.config["work_folder"]}/bright_field.h5')
bright_field['data_path'] = bright_files[0]
@@ -1528,7 +1537,7 @@
tomo_stack_files = []
for stack in self.config['stack_info']['stacks']:
index = stack['index']
- img_start, num_imgs, tomo_files = msnc.findImageFiles(
+ img_start, num_imgs, tomo_files = findImageFiles(
stack['data_path'], self.config['data_filetype'], f'tomography set {index}')
if img_start < 0 or num_imgs < 1:
logging.error('Unable to find suitable tomography images')
@@ -1550,7 +1559,7 @@
logging.info(f'Number of tomography images for set {index} = {stack["num"]}')
logging.info(f'Tomography set {index} image start index = {stack["img_start"]}')
if num_imgs and tiff_to_h5_flag and self.config['data_filetype'] == 'tif':
- tomo_files = msnc.combine_tiffs_in_h5(tomo_files, num_imgs,
+ tomo_files = combine_tiffs_in_h5(tomo_files, num_imgs,
f'{self.config["work_folder"]}/tomo_field_{index}.h5')
stack['data_path'] = tomo_files[0]
tomo_stack_files.append(tomo_files)
@@ -1709,15 +1718,15 @@
center_stack_index = find_center['center_stack_index']
if (not isinstance(center_stack_index, int) or
center_stack_index not in available_stacks):
- msnc.illegal_value('center_stack_index', center_stack_index, 'config file')
+ illegal_value(center_stack_index, 'find_center:center_stack_index', 'config file')
else:
if self.test_mode:
assert(find_center.get('completed', False) == False)
else:
print('\nFound calibration center offset info for stack '+
f'{center_stack_index}')
- if (pyip.inputYesNo('Do you want to use this again ([y]/n)? ',
- blank=True) != 'no' and find_center.get('completed', False) == True):
+ if (input_yesno('Do you want to use this again (y/n)?', 'y')
+ and find_center.get('completed', False) == True):
return
# Load the required preprocessed stack
@@ -1749,11 +1758,11 @@
else:
while True:
if not center_stack_index:
- center_stack_index = pyip.inputInt('\nEnter tomography stack index to get '
- f'rotation axis centers {available_stacks}: ')
+ center_stack_index = input_int('\nEnter tomography stack index to get '
+ f'rotation axis centers ({available_stacks})', inset=available_stacks)
while center_stack_index not in available_stacks:
- center_stack_index = pyip.inputInt('\nEnter tomography stack index to get '
- f'rotation axis centers {available_stacks}: ')
+ center_stack_index = input_int('\nEnter tomography stack index to get '
+ f'rotation axis centers ({available_stacks})', inset=available_stacks)
tomo_stack_index = available_stacks.index(center_stack_index)
if not self.tomo_stacks[tomo_stack_index].size:
self.tomo_stacks[tomo_stack_index], available = self._loadTomo(
@@ -1785,8 +1794,8 @@
center_rows[0] = 0
if center_rows[1] == -1:
center_rows[1] = center_stack.shape[0]-1
- if not msnc.is_index_range(row_bounds, 0, center_stack.shape[0]):
- msnc.illegal_value('row_bounds', row_bounds)
+ if not is_index_range(row_bounds, 0, center_stack.shape[0]):
+ illegal_value(row_bounds, 'row_bounds', 'Tomo:findCenters')
return
# Set thetas (in degrees)
@@ -1806,27 +1815,27 @@
eff_pixel_size = 100.*pixel_size/zoom_perc
logging.debug(f'eff_pixel_size = {eff_pixel_size}')
if num_tomo_stacks == 1:
- accept = 'yes'
+ accept = True
if not self.test_mode and not self.galaxy_flag:
- accept = 'no'
+ accept = False
print('\nSelect bounds for image reconstruction')
- if msnc.is_index_range(row_bounds, 0, center_stack.shape[0]):
+ if is_index_range(row_bounds, 0, center_stack.shape[0]):
a_tmp = np.copy(center_stack[:,0,:])
a_tmp_max = a_tmp.max()
a_tmp[row_bounds[0],:] = a_tmp_max
a_tmp[row_bounds[1]-1,:] = a_tmp_max
print(f'lower bound = {row_bounds[0]} (inclusive)')
print(f'upper bound = {row_bounds[1]} (exclusive)')
- msnc.quickImshow(a_tmp, title=f'center stack theta={theta_start}',
+ quickImshow(a_tmp, title=f'center stack theta={theta_start}',
aspect='auto')
del a_tmp
- accept = pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True)
- if accept == 'no':
- (n1, n2) = msnc.selectImageBounds(center_stack[:,0,:], 0,
- title=f'center stack theta={theta_start}')
- else:
+ accept = input_yesno('Accept these bounds (y/n)?', 'y')
+ if accept:
n1 = row_bounds[0]
n2 = row_bounds[1]
+ else:
+ n1, n2 = selectImageBounds(center_stack[:,0,:], 0,
+ title=f'center stack theta={theta_start}')
else:
tomo_ref_heights = [stack['ref_height'] for stack in stacks]
n1 = int((1.+(tomo_ref_heights[0]+center_stack.shape[0]*eff_pixel_size-
@@ -1838,16 +1847,16 @@
tmp_max = tmp.max()
tmp[n1,:] = tmp_max
tmp[n2-1,:] = tmp_max
- if msnc.is_index_range(center_rows, 0, tmp.shape[0]):
+ if is_index_range(center_rows, 0, tmp.shape[0]):
tmp[center_rows[0],:] = tmp_max
tmp[center_rows[1]-1,:] = tmp_max
extent = [0, tmp.shape[1], tmp.shape[0], 0]
- msnc.quickImshow(tmp, title=f'center stack theta={theta_start}',
+ quickImshow(tmp, title=f'center stack theta={theta_start}',
path=self.output_folder, extent=extent, save_fig=self.save_plots,
save_only=self.save_plots_only, aspect='auto')
del tmp
#extent = [0, center_stack.shape[2], n2, n1]
- #msnc.quickImshow(center_stack[n1:n2,0,:], title=f'center stack theta={theta_start}',
+ #quickImshow(center_stack[n1:n2,0,:], title=f'center stack theta={theta_start}',
# path=self.output_folder, extent=extent, save_fig=self.save_plots,
# save_only=self.save_plots_only, show_grid=True, aspect='auto')
@@ -1859,36 +1868,35 @@
logging.info('Determine center offset at sample row boundaries')
# Lower row center
- use_row = 'no'
- use_center = 'no'
+ use_row = False
+ use_center = False
row = center_rows[0]
if self.test_mode or self.galaxy_flag:
- assert(msnc.is_int(row, n1, n2-2))
- if msnc.is_int(row, n1, n2-2):
+ assert(is_int(row, n1, n2-2))
+ if is_int(row, n1, n2-2):
if self.test_mode or self.galaxy_flag:
- use_row = 'yes'
+ use_row = True
else:
- msnc.quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto')
- use_row = pyip.inputYesNo('\nCurrent row index for lower center = '
- f'{row}, use this value ([y]/n)? ', blank=True)
+ quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto')
+ use_row = input_yesno('\nCurrent row index for lower center = '
+ f'{row}, use this value (y/n)?', 'y')
if self.save_plots_only:
- msnc.clearFig(f'theta={theta_start}')
- if use_row != 'no':
+ clearImshow(f'theta={theta_start}')
+ if use_row:
center_offset = find_center.get('lower_center_offset')
- if msnc.is_num(center_offset):
- use_center = pyip.inputYesNo('Current lower center offset = '+
- f'{center_offset}, use this value ([y]/n)? ', blank=True)
- if use_center == 'no':
- if use_row == 'no':
+ if is_num(center_offset):
+ use_center = input_yesno('Current lower center offset = '+
+ f'{center_offset}, use this value (y/n)?', 'y')
+ if not use_center:
+ if not use_row:
if not self.test_mode:
- msnc.quickImshow(center_stack[:,0,:], title=f'theta={theta_start}',
+ quickImshow(center_stack[:,0,:], title=f'theta={theta_start}',
aspect='auto')
- row = pyip.inputInt('\nEnter row index to find lower center '+
- f'[[{n1}], {n2-2}]: ', min=n1, max=n2-2, blank=True)
+ row = input_int('\nEnter row index to find lower center', n1, n2-2, n1)
if row == '':
row = n1
if self.save_plots_only:
- msnc.clearFig(f'theta={theta_start}')
+ clearImshow(f'theta={theta_start}')
# center_stack order: row,theta,column
center_offset = self._findCenterOnePlane(center_stack[row,:,:], row, thetas_deg,
eff_pixel_size, cross_sectional_dim, num_core=num_core,
@@ -1903,36 +1911,35 @@
lower_row = row
# Upper row center
- use_row = 'no'
- use_center = 'no'
+ use_row = False
+ use_center = False
row = center_rows[1]
if self.test_mode or self.galaxy_flag:
- assert(msnc.is_int(row, lower_row+1, n2-1))
- if msnc.is_int(row, lower_row+1, n2-1):
+ assert(is_int(row, lower_row+1, n2-1))
+ if is_int(row, lower_row+1, n2-1):
if self.test_mode or self.galaxy_flag:
- use_row = 'yes'
+ use_row = True
else:
- msnc.quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto')
- use_row = pyip.inputYesNo('\nCurrent row index for upper center = '
- f'{row}, use this value ([y]/n)? ', blank=True)
+ quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto')
+ use_row = input_yesno('\nCurrent row index for upper center = '
+ f'{row}, use this value (y/n)?', 'y')
if self.save_plots_only:
- msnc.clearFig(f'theta={theta_start}')
- if use_row != 'no':
+ clearImshow(f'theta={theta_start}')
+ if use_row:
center_offset = find_center.get('upper_center_offset')
- if msnc.is_num(center_offset):
- use_center = pyip.inputYesNo('Current upper center offset = '+
- f'{center_offset}, use this value ([y]/n)? ', blank=True)
- if use_center == 'no':
- if use_row == 'no':
+ if is_num(center_offset):
+ use_center = input_yesrnNo('Current upper center offset = '+
+ f'{center_offset}, use this value (y/n)?', 'y')
+ if not use_center:
+ if not use_row:
if not self.test_mode:
- msnc.quickImshow(center_stack[:,0,:], title=f'theta={theta_start}',
+ quickImshow(center_stack[:,0,:], title=f'theta={theta_start}',
aspect='auto')
- row = pyip.inputInt('\nEnter row index to find upper center '+
- f'[{lower_row+1}, [{n2-1}]]: ', min=lower_row+1, max=n2-1, blank=True)
+ row = input_int('\nEnter row index to find upper center', lower_row+1, n2-1, n2-1)
if row == '':
row = n2-1
if self.save_plots_only:
- msnc.clearFig(f'theta={theta_start}')
+ clearImshow(f'theta={theta_start}')
# center_stack order: row,theta,column
center_offset = self._findCenterOnePlane(center_stack[row,:,:], row, thetas_deg,
eff_pixel_size, cross_sectional_dim, num_core=num_core,
@@ -1992,7 +1999,7 @@
for i in range(-2, 3):
shift_i = shift+2*i
plane1_shifted = spi.shift(plane2, [0, shift_i])
- msnc.quickPlot((plane1[0,:],), (plane1_shifted[0,:],),
+ quickPlot((plane1[0,:],), (plane1_shifted[0,:],),
title=f'stacks {stack1} {stack2} shifted {2*i} theta={self.start_theta}',
path=self.output_folder, save_fig=self.save_plots,
save_only=self.save_plots_only)
@@ -2011,14 +2018,14 @@
for i in range(-2, 3):
shift_i = -shift+2*i
plane1_shifted = spi.shift(plane2, [0, shift_i])
- msnc.quickPlot((plane1[0,:],), (plane1_shifted[0,:],),
+ quickPlot((plane1[0,:],), (plane1_shifted[0,:],),
title=f'stacks {stack1} {stack2} shifted {2*i} theta={start_theta}',
path=self.output_folder, save_fig=self.save_plots,
save_only=self.save_plots_only)
del plane1, plane2, plane1_shifted
# Update config file
- self.config = msnc.update('config.txt', 'check_centers', True, 'find_centers')
+ self.config = update('config.txt', 'check_centers', True, 'find_centers')
def reconstructTomoStacks(self, galaxy_param=None, num_core=None):
"""Reconstruct tomography stacks.
@@ -2041,9 +2048,9 @@
center_offsets = galaxy_param['center_offsets']
assert(isinstance(center_offsets, list) and len(center_offsets) == 2)
lower_center_offset = center_offsets[0]
- assert(msnc.is_num(lower_center_offset))
+ assert(is_num(lower_center_offset))
upper_center_offset = center_offsets[1]
- assert(msnc.is_num(upper_center_offset))
+ assert(is_num(upper_center_offset))
else:
if galaxy_param:
logging.warning('Ignoring galaxy_param in reconstructTomoStacks (only for Galaxy)')
@@ -2128,23 +2135,23 @@
if self.galaxy_flag:
x_slice = int(self.tomo_stacks[i].shape[0]/2)
title = f'{basetitle} {index} xslice{x_slice}'
- msnc.quickImshow(self.tomo_recon_stacks[i][x_slice,:,:], title=title,
+ quickImshow(self.tomo_recon_stacks[i][x_slice,:,:], title=title,
path='center_slice_pngs', save_fig=True, save_only=True)
y_slice = int(self.tomo_stacks[i].shape[0]/2)
title = f'{basetitle} {index} yslice{y_slice}'
- msnc.quickImshow(self.tomo_recon_stacks[i][:,y_slice,:], title=title,
+ quickImshow(self.tomo_recon_stacks[i][:,y_slice,:], title=title,
path='center_slice_pngs', save_fig=True, save_only=True)
z_slice = int(self.tomo_stacks[i].shape[0]/2)
title = f'{basetitle} {index} zslice{z_slice}'
- msnc.quickImshow(self.tomo_recon_stacks[i][:,:,z_slice], title=title,
+ quickImshow(self.tomo_recon_stacks[i][:,:,z_slice], title=title,
path='center_slice_pngs', save_fig=True, save_only=True)
elif not self.test_mode:
x_slice = int(self.tomo_stacks[i].shape[0]/2)
title = f'{basetitle} {index} xslice{x_slice}'
- msnc.quickImshow(self.tomo_recon_stacks[i][x_slice,:,:], title=title,
+ quickImshow(self.tomo_recon_stacks[i][x_slice,:,:], title=title,
path=self.output_folder, save_fig=self.save_plots,
save_only=self.save_plots_only)
- msnc.quickPlot(self.tomo_recon_stacks[i]
+ quickPlot(self.tomo_recon_stacks[i]
[x_slice,int(self.tomo_recon_stacks[i].shape[2]/2),:],
title=f'{title} cut{int(self.tomo_recon_stacks[i].shape[2]/2)}',
path=self.output_folder, save_fig=self.save_plots,
@@ -2173,21 +2180,21 @@
tomosum = 0
[tomosum := tomosum+np.sum(tomo_recon_stack, axis=(0,2)) for tomo_recon_stack in
self.tomo_recon_stacks]
- msnc.quickPlot(tomosum, title='recon stack sum yz', path='center_slice_pngs',
+ quickPlot(tomosum, title='recon stack sum yz', path='center_slice_pngs',
save_fig=True, save_only=True)
# Create cross section profile in xz-plane
tomosum = 0
[tomosum := tomosum+np.sum(tomo_recon_stack, axis=(0,1)) for tomo_recon_stack in
self.tomo_recon_stacks]
- msnc.quickPlot(tomosum, title='recon stack sum xz', path='center_slice_pngs',
+ quickPlot(tomosum, title='recon stack sum xz', path='center_slice_pngs',
save_fig=True, save_only=True)
# Create cross section profile in xy-plane
num_tomo_stacks = len(stacks)
row_bounds = self.config['find_center']['row_bounds']
- if not msnc.is_index_range(row_bounds, 0, self.tomo_recon_stacks[0].shape[0]):
- msnc.illegal_value('row_bounds', row_bounds, 'config file')
+ if not is_index_range(row_bounds, 0, self.tomo_recon_stacks[0].shape[0]):
+ illegal_value(row_bounds, 'find_center:row_bounds', 'config file')
return
if num_tomo_stacks == 1:
low_bound = row_bounds[0]
@@ -2202,7 +2209,7 @@
tomosum = np.concatenate([tomosum,
np.sum(self.tomo_recon_stacks[num_tomo_stacks-1][row_bounds[0]:,:,:],
axis=(1,2))])
- msnc.quickPlot(tomosum, title='recon stack sum xy', path='center_slice_pngs',
+ quickPlot(tomosum, title='recon stack sum xy', path='center_slice_pngs',
save_fig=True, save_only=True)
def combineTomoStacks(self, galaxy_param=None):
@@ -2250,8 +2257,8 @@
# Get center stack boundaries
row_bounds = self.config['find_center']['row_bounds']
- if not msnc.is_index_range(row_bounds, 0, self.tomo_recon_stacks[0].shape[0]):
- msnc.illegal_value('row_bounds', row_bounds, 'config file')
+ if not is_index_range(row_bounds, 0, self.tomo_recon_stacks[0].shape[0]):
+ illegal_value(row_bounds, 'find_center:row_bounds', 'config file')
return
# Selecting x bounds (in yz-plane)
@@ -2265,41 +2272,57 @@
x_bounds[0] = 0
if x_bounds[1] == -1:
x_bounds[1] = tomosum.size
- if not msnc.is_index_range(x_bounds, 0, tomosum.size):
- msnc.illegal_value('x_bounds', x_bounds, 'galaxy input')
+ if not is_index_range(x_bounds, 0, tomosum.size):
+ illegal_value(x_bounds, 'x_bounds', 'galaxy input')
tomosum_min = tomosum.min()
tomosum_max = tomosum.max()
- msnc.quickPlot((range(tomosum.size), tomosum),
+ quickPlot((range(tomosum.size), tomosum),
([x_bounds[0], x_bounds[0]], [tomosum_min, tomosum_max], 'r-'),
([x_bounds[1]-1, x_bounds[1]-1], [tomosum_min, tomosum_max], 'r-'),
title=f'recon stack sum yz', path='combine_pngs', save_fig=True, save_only=True)
else:
if combine_stacks and 'x_bounds' in combine_stacks:
x_bounds = combine_stacks['x_bounds']
- if not msnc.is_index_range(x_bounds, 0, tomosum.size):
- msnc.illegal_value('x_bounds', x_bounds, 'config file')
+ if not is_index_range(x_bounds, 0, tomosum.size):
+ illegal_value(x_bounds, 'combine_stacks:x_bounds', 'config file')
elif not self.test_mode:
- msnc.quickPlot(tomosum, title='recon stack sum yz')
- if pyip.inputYesNo('\nCurrent image x-bounds: '+
- f'[{x_bounds[0]}, {x_bounds[1]}], use these values ([y]/n)? ',
- blank=True) == 'no':
- if pyip.inputYesNo(
- 'Do you want to change the image x-bounds ([y]/n)? ',
- blank=True) == 'no':
- x_bounds = [0, tomosum.size]
- else:
- x_bounds = msnc.selectArrayBounds(tomosum, title='recon stack sum yz')
+ accept = False
+ while not accept:
+ mask, x_bounds = draw_mask_1d(tomosum, current_index_ranges=x_bounds,
+ title='select x data range', legend='recon stack sum yz')
+ while len(x_bounds) != 1:
+ print('Please select exactly one continuous range')
+ mask, x_bounds = draw_mask_1d(tomosum, title='select x data range',
+ legend='recon stack sum yz')
+ x_bounds = list(x_bounds[0])
+ quickPlot(tomosum, vlines=x_bounds, title='recon stack sum yz')
+ print(f'x_bounds = {x_bounds} (lower bound inclusive, upper bound '+
+ 'exclusive)')
+ accept = input_yesno('Accept these bounds (y/n)?', 'y')
+ if not accept:
+ x_bounds = None
else:
- msnc.quickPlot(tomosum, title='recon stack sum yz')
- if pyip.inputYesNo('\nDo you want to change the image x-bounds (y/n)? ') == 'no':
+ if not input_yesno('\nDo you want to change the image x-bounds (y/n)?', 'n'):
x_bounds = [0, tomosum.size]
else:
- x_bounds = msnc.selectArrayBounds(tomosum, title='recon stack sum yz')
+ accept = False
+ while not accept:
+ mask, x_bounds = draw_mask_1d(tomosum, title='select x data range',
+ legend='recon stack sum yz')
+ while len(x_bounds) != 1:
+ print('Please select exactly one continuous range')
+ mask, x_bounds = draw_mask_1d(tomosum, title='select x data range',
+ legend='recon stack sum yz')
+ x_bounds = list(x_bounds[0])
+ quickPlot(tomosum, vlines=x_bounds, title='recon stack sum yz')
+ print(f'x_bounds = {x_bounds} (lower bound inclusive, upper bound '+
+ 'exclusive)')
+ accept = input_yesno('Accept these bounds (y/n)?', 'y')
if False and self.test_mode:
np.savetxt(f'{self.output_folder}/recon_stack_sum_yz.txt',
tomosum[x_bounds[0]:x_bounds[1]], fmt='%.6e')
if self.save_plots_only:
- msnc.clearFig('recon stack sum yz')
+ clearPlot('recon stack sum yz')
# Selecting y bounds (in xz-plane)
tomosum = 0
@@ -2311,41 +2334,57 @@
y_bounds[0] = 0
if y_bounds[1] == -1:
y_bounds[1] = tomosum.size
- if not msnc.is_index_range(y_bounds, 0, tomosum.size):
- msnc.illegal_value('y_bounds', y_bounds, 'galaxy input')
+ if not is_index_range(y_bounds, 0, tomosum.size):
+ illegal_value(y_bounds, 'y_bounds', 'galaxy input')
tomosum_min = tomosum.min()
tomosum_max = tomosum.max()
- msnc.quickPlot((range(tomosum.size), tomosum),
+ quickPlot((range(tomosum.size), tomosum),
([y_bounds[0], y_bounds[0]], [tomosum_min, tomosum_max], 'r-'),
([y_bounds[1]-1, y_bounds[1]-1], [tomosum_min, tomosum_max], 'r-'),
title=f'recon stack sum xz', path='combine_pngs', save_fig=True, save_only=True)
else:
if combine_stacks and 'y_bounds' in combine_stacks:
y_bounds = combine_stacks['y_bounds']
- if not msnc.is_index_range(y_bounds, 0, tomosum.size):
- msnc.illegal_value('y_bounds', y_bounds, 'config file')
+ if not is_index_range(y_bounds, 0, tomosum.size):
+ illegal_value(y_bounds, 'combine_stacks:y_bounds', 'config file')
elif not self.test_mode:
- msnc.quickPlot(tomosum, title='recon stack sum xz')
- if pyip.inputYesNo('\nCurrent image y-bounds: '+
- f'[{y_bounds[0]}, {y_bounds[1]}], use these values ([y]/n)? ',
- blank=True) == 'no':
- if pyip.inputYesNo(
- 'Do you want to change the image y-bounds ([y]/n)? ',
- blank=True) == 'no':
- y_bounds = [0, tomosum.size]
- else:
- y_bounds = msnc.selectArrayBounds(tomosum, title='recon stack sum yz')
+ accept = False
+ while not accept:
+ mask, y_bounds = draw_mask_1d(tomosum, current_index_ranges=y_bounds,
+ title='select y data range', legend='recon stack sum xz')
+ while len(y_bounds) != 1:
+ print('Please select exactly one continuous range')
+ mask, y_bounds = draw_mask_1d(tomosum, title='select y data range',
+ legend='recon stack sum xz')
+ y_bounds = list(y_bounds[0])
+ quickPlot(tomosum, vlines=y_bounds, title='recon stack sum xz')
+ print(f'y_bounds = {y_bounds} (lower bound inclusive, upper bound '+
+ 'exclusive)')
+ accept = input_yesno('Accept these bounds (y/n)?', 'y')
+ if not accept:
+ y_bounds = None
else:
- msnc.quickPlot(tomosum, title='recon stack sum xz')
- if pyip.inputYesNo('\nDo you want to change the image y-bounds (y/n)? ') == 'no':
+ if not input_yesno('\nDo you want to change the image y-bounds (y/n)?', 'n'):
y_bounds = [0, tomosum.size]
else:
- y_bounds = msnc.selectArrayBounds(tomosum, title='recon stack sum xz')
+ accept = False
+ while not accept:
+ mask, y_bounds = draw_mask_1d(tomosum, title='select y data range',
+ legend='recon stack sum xz')
+ while len(y_bounds) != 1:
+ print('Please select exactly one continuous range')
+ mask, y_bounds = draw_mask_1d(tomosum, title='select y data range',
+ legend='recon stack sum xz')
+ y_bounds = list(y_bounds[0])
+ quickPlot(tomosum, vlines=y_bounds, title='recon stack sum xz')
+ print(f'y_bounds = {y_bounds} (lower bound inclusive, upper bound '+
+ 'exclusive)')
+ accept = input_yesno('Accept these bounds (y/n)?', 'y')
if False and self.test_mode:
np.savetxt(f'{self.output_folder}/recon_stack_sum_xz.txt',
tomosum[y_bounds[0]:y_bounds[1]], fmt='%.6e')
if self.save_plots_only:
- msnc.clearFig('recon stack sum xz')
+ clearPlot('recon stack sum xz')
# Combine reconstructed tomography stacks
logging.info(f'Combining reconstructed stacks ...')
@@ -2378,7 +2417,7 @@
filename += ' fullres.dat'
else:
filename += f' {zoom_perc}p.dat'
- msnc.quickPlot(tomosum, title='recon combined sum xy',
+ quickPlot(tomosum, title='recon combined sum xy',
path=self.output_folder, save_fig=self.save_plots,
save_only=self.save_plots_only)
if False:
@@ -2404,24 +2443,32 @@
z_bounds[0] = 0
if z_bounds[1] == -1:
z_bounds[1] = tomosum.size
- if not msnc.is_index_range(z_bounds, 0, tomosum.size):
- msnc.illegal_value('z_bounds', z_bounds, 'galaxy input')
+ if not is_index_range(z_bounds, 0, tomosum.size):
+ illegal_value(z_bounds, 'z_bounds', 'galaxy input')
tomosum_min = tomosum.min()
tomosum_max = tomosum.max()
- msnc.quickPlot((range(tomosum.size), tomosum),
+ quickPlot((range(tomosum.size), tomosum),
([z_bounds[0], z_bounds[0]], [tomosum_min, tomosum_max], 'r-'),
([z_bounds[1]-1, z_bounds[1]-1], [tomosum_min, tomosum_max], 'r-'),
title=f'recon stack sum xy', path='combine_pngs', save_fig=True, save_only=True)
else:
- msnc.quickPlot(tomosum, title='recon combined sum xy')
- if pyip.inputYesNo(
- '\nDo you want to change the image z-bounds (y/[n])? ',
- blank=True) != 'yes':
+ if not input_yesno('\nDo you want to change the image z-bounds (y/n)?', 'n'):
z_bounds = [0, tomosum.size]
else:
- z_bounds = msnc.selectArrayBounds(tomosum, title='recon combined sum xy')
+ accept = False
+ while not accept:
+ mask, z_bounds = draw_mask_1d(tomosum, title='select z data range',
+ legend='recon stack sum xy')
+ while len(z_bounds) != 1:
+ print('Please select exactly one continuous range')
+ mask, z_bounds = draw_mask_1d(tomosum, title='select z data range',
+ legend='recon stack sum xy')
+ z_bounds = list(z_bounds[0])
+ quickPlot(tomosum, vlines=z_bounds, title='recon stack sum xy')
+ print(f'z_bounds = {z_bounds} (lower bound inclusive, upper bound exclusive)')
+ accept = input_yesno('Accept these bounds (y/n)?', 'y')
if self.save_plots_only:
- msnc.clearFig('recon combined sum xy')
+ clearPlot('recon combined sum xy')
if z_bounds[0] != 0 or z_bounds[1] != tomosum.size:
tomo_recon_combined = tomo_recon_combined[z_bounds[0]:z_bounds[1],:,:]
@@ -2434,13 +2481,13 @@
path = self.output_folder
save_fig = self.save_plots
save_only = self.save_plots_only
- msnc.quickImshow(tomo_recon_combined[int(tomo_recon_combined.shape[0]/2),:,:],
+ quickImshow(tomo_recon_combined[int(tomo_recon_combined.shape[0]/2),:,:],
title=f'recon combined xslice{int(tomo_recon_combined.shape[0]/2)}',
path=path, save_fig=save_fig, save_only=save_only)
- msnc.quickImshow(tomo_recon_combined[:,int(tomo_recon_combined.shape[1]/2),:],
+ quickImshow(tomo_recon_combined[:,int(tomo_recon_combined.shape[1]/2),:],
title=f'recon combined yslice{int(tomo_recon_combined.shape[1]/2)}',
path=path, save_fig=save_fig, save_only=save_only)
- msnc.quickImshow(tomo_recon_combined[:,:,int(tomo_recon_combined.shape[2]/2)],
+ quickImshow(tomo_recon_combined[:,:,int(tomo_recon_combined.shape[2]/2)],
title=f'recon combined zslice{int(tomo_recon_combined.shape[2]/2)}',
path=path, save_fig=save_fig, save_only=save_only)
diff -r 059819ea1f0e -r 26f99fdd8d61 tomo_combine.xml
--- a/tomo_combine.xml Wed Apr 27 17:28:26 2022 +0000
+++ b/tomo_combine.xml Thu Jul 28 16:05:24 2022 +0000
@@ -1,4 +1,4 @@
-
+
Combine reconstructed tomography stacks
tomo_macros.xml
@@ -38,7 +38,7 @@
-
+
+
Find the center axis for a tomography reconstruction
tomo_macros.xml
@@ -38,12 +38,14 @@
+
+
diff -r 059819ea1f0e -r 26f99fdd8d61 tomo_reconstruct.xml
--- a/tomo_reconstruct.xml Wed Apr 27 17:28:26 2022 +0000
+++ b/tomo_reconstruct.xml Thu Jul 28 16:05:24 2022 +0000
@@ -1,4 +1,4 @@
-
+
Perform a tomography reconstruction
tomo_macros.xml
diff -r 059819ea1f0e -r 26f99fdd8d61 tomo_setup.py
--- a/tomo_setup.py Wed Apr 27 17:28:26 2022 +0000
+++ b/tomo_setup.py Thu Jul 28 16:05:24 2022 +0000
@@ -11,7 +11,6 @@
import tracemalloc
from tomo import Tomo
-import msnc_tools as msnc
#from memory_profiler import profile
#@profile
@@ -21,20 +20,25 @@
parser = argparse.ArgumentParser(
description='Setup tomography reconstruction')
parser.add_argument('-i', '--inputfiles',
+ nargs='+',
default='inputfiles.txt',
- help='Input file collections')
+ help='Input file datasets or collections')
parser.add_argument('-c', '--config',
help='Input config')
+ parser.add_argument('-l', '--log',
+ type=argparse.FileType('w'),
+ default=sys.stdout,
+ help='Log file')
+ parser.add_argument('-t', '--inputfile_types',
+ nargs='+',
+ default='collection',
+ help='Input files type (collection or a list of set types: dark, bright, or data)')
parser.add_argument('--theta_range',
help='Theta range (lower bound, upper bound, number of angles)')
parser.add_argument('--output_config',
help='Output config')
parser.add_argument('--output_data',
help='Preprocessed tomography data')
- parser.add_argument('-l', '--log',
- type=argparse.FileType('w'),
- default=sys.stdout,
- help='Log file')
parser.add_argument('tomo_ranges', metavar='N', type=int, nargs='+')
args = parser.parse_args()
@@ -51,43 +55,59 @@
handlers=[logging.StreamHandler()])
logging.debug(f'config = {args.config}')
+ logging.debug(f'inputfiles = {args.inputfiles}')
+ logging.debug(f'inputfile_types = {args.inputfile_types}')
+ logging.debug(f'log = {args.log}')
+ logging.debug(f'is log stdout? {args.log is sys.stdout}')
logging.debug(f'theta_range = {args.theta_range.split()}')
logging.debug(f'output_config = {args.output_config}')
logging.debug(f'output_data = {args.output_data}')
- logging.debug(f'log = {args.log}')
- logging.debug(f'is log stdout? {args.log is sys.stdout}')
logging.debug(f'tomoranges = {args.tomo_ranges}')
- # Read input files and collect data files info
+ # Check input file type
+ if isinstance(args.inputfile_types, str) and args.inputfile_types == 'collection':
+ input_as_collection = True
+ elif isinstance(args.inputfile_types, list):
+ input_as_collection = False
+ else:
+ raise ValueError(f'Invalid args.inputfile_types: {args.inputfile_types} '+
+ f'{type(args.inputfile_types)}')
+
datasets = []
- with open(args.inputfiles) as cf:
- for line in cf:
- if not line.strip() or line.startswith('#'):
- continue
- fields = [x.strip() for x in line.split('\t')]
- filepath = fields[0]
- element_identifier = fields[1] if len(fields) > 1 else fields[0].split('/')[-1]
- datasets.append({'element_identifier' : fields[1], 'filepath' : filepath})
- logging.debug(f'datasets:\n{datasets}')
-
- # Read and sort data files
collections = []
- for dataset in datasets:
- element_identifier = [x.strip() for x in dataset['element_identifier'].split('_')]
- if len(element_identifier) > 1:
- name = element_identifier[0]
- else:
- name = 'other'
- filepath = dataset['filepath']
- if not len(collections):
- collections = [{'name' : name, 'filepaths' : [filepath]}]
- else:
- collection = [c for c in collections if c['name'] == name]
- if len(collection):
- collection[0]['filepaths'].append(filepath)
+ if input_as_collection:
+ # Read input file collections and collect data files info
+ with open(args.inputfiles) as cf:
+ for line in cf:
+ if not line.strip() or line.startswith('#'):
+ continue
+ fields = [x.strip() for x in line.split('\t')]
+ filepath = fields[0]
+ element_identifier = fields[1] if len(fields) > 1 else fields[0].split('/')[-1]
+ datasets.append({'element_identifier' : fields[1], 'filepath' : filepath})
+ logging.debug(f'datasets:\n{datasets}')
+
+ # Read and sort data files
+ for dataset in datasets:
+ element_identifier = [x.strip() for x in dataset['element_identifier'].split('_')]
+ if len(element_identifier) > 1:
+ name = element_identifier[0]
else:
- collection = {'name' : name, 'filepaths' : [filepath]}
- collections.append(collection)
+ name = 'other'
+ filepath = dataset['filepath']
+ if not len(collections):
+ collections = [{'name' : name, 'filepaths' : [filepath]}]
+ else:
+ collection = [c for c in collections if c['name'] == name]
+ if len(collection):
+ collection[0]['filepaths'].append(filepath)
+ else:
+ collection = {'name' : name, 'filepaths' : [filepath]}
+ collections.append(collection)
+ else:
+ # Collect input file datasets info
+ collections = [{'name' : filetype, 'filepaths' : [filepath]}
+ for filetype, filepath in zip(args.inputfile_types, args.inputfiles)]
logging.debug(f'collections:\n{collections}')
if len(args.tomo_ranges) != 2*len(collections):
raise ValueError('Inconsistent tomo ranges size.')
@@ -142,7 +162,12 @@
for stack in stack_info['stacks']:
stack['img_offset'] = args.tomo_ranges[2*num_collections]
stack['num'] = args.tomo_ranges[2*num_collections+1]
- tomo_files = [c['filepaths'] for c in collections if c['name'] == f'set{stack["index"]}']
+ if input_as_collection:
+ tomo_files = [c['filepaths'] for c in collections
+ if c['name'] == f'set{stack["index"]}']
+ else:
+ assert(collections[num_collections]['name'] == 'data')
+ tomo_files = [collections[num_collections]['filepaths']]
if len(tomo_files) != 1 or len(tomo_files[0]) < 1:
exit(f'Unable to obtain tomography images for set {stack["index"]}')
tomo_stack_files.append(tomo_files[0])
diff -r 059819ea1f0e -r 26f99fdd8d61 tomo_setup.xml
--- a/tomo_setup.xml Wed Apr 27 17:28:26 2022 +0000
+++ b/tomo_setup.xml Thu Jul 28 16:05:24 2022 +0000
@@ -1,28 +1,40 @@
-
+
Preprocess tomography images
tomo_macros.xml
-
+
+
+
@@ -32,15 +44,43 @@
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+ input['type_selector'] == 'collection'
+