Mercurial > repos > rv43 > tomo
changeset 49:26f99fdd8d61 draft
"planemo upload for repository https://github.com/rolfverberg/galaxytools commit 4f7738d02f4a3fd91373f43937ed311b6fe11a12"
author | rv43 |
---|---|
date | Thu, 28 Jul 2022 16:05:24 +0000 |
parents | 059819ea1f0e |
children | 79c216516ef9 |
files | detector.py fit.py general.py msnc_tools.py read_image.py read_image.xml tomo.py tomo_combine.xml tomo_find_center.xml tomo_reconstruct.xml tomo_setup.py tomo_setup.xml |
diffstat | 12 files changed, 2905 insertions(+), 1248 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/detector.py Thu Jul 28 16:05:24 2022 +0000 @@ -0,0 +1,345 @@ +import logging +import os +import yaml +from functools import cache +from copy import deepcopy + +#from .general import * +from general import illegal_value, is_int, is_num, input_yesno + +#from hexrd.instrument import HEDMInstrument, PlanarDetector + +class DetectorConfig: + def __init__(self, config_source): + self._config_source = config_source + + if isinstance(self._config_source, str): + self._config_file = self._config_source + self._config = self._load_config_file() + elif isinstance(self._config_source, dict): + self._config_file = None + self._config = self._config_source + else: + self._config_file = None + self._config = False + + self._valid = self._validate() + + if not self.valid: + logging.error(f'Cannot create a valid instance of {self.__class__.__name__} '+ + f'from {self._config_source}') + + def __repr__(self): + return(f'{self.__class__.__name__}({self._config_source.__repr__()})') + def __str__(self): + return(f'{self.__class__.__name__} generated from {self._config_source}') + + @property + def config_file(self): + return(self._config_file) + + @property + def config(self): + return(deepcopy(self._config)) + + @property + def valid(self): + return(self._valid) + + def load_config_file(self): + raise(NotImplementedError) + + def validate(self): + raise(NotImplementedError) + + def _load_config_file(self): + return(self.load_config_file()) + + def _validate(self): + if not self.config: + logging.error('A configuration must be loaded prior to calling Detector._validate') + return(False) + else: + return(self.validate()) + + def _write_to_file(self, out_file): + out_file = os.path.abspath(out_file) + + current_config_valid = self.validate() + if not current_config_valid: + write_invalid_config = input_yesno(s=f'This {self.__class__.__name__} is currently '+ + f'invalid. Write the configuration to {out_file} anyways?', default='no') + if not write_invalid_config: + logging.info('In accordance with user input, the invalid configuration will '+ + f'not be written to {out_file}') + return + + if os.access(out_file, os.W_OK): + if os.path.exists(out_file): + overwrite = input_yesno(s=f'{out_file} already exists. Overwrite?', default='no') + if overwrite: + self.write_to_file(out_file) + else: + logging.info(f'In accordance with user input, {out_file} will not be '+ + 'overwritten') + else: + self.write_to_file(out_file) + else: + logging.error(f'Insufficient permissions to write to {out_file}') + + def write_to_file(self, out_file): + raise(NotImplementedError) + +class YamlDetectorConfig(DetectorConfig): + def __init__(self, config_source, validate_yaml_pars=[]): + self._validate_yaml_pars = validate_yaml_pars + super().__init__(config_source) + + def load_config_file(self): + if not os.path.splitext(self._config_file)[1]: + if os.path.isfile(f'{self._config_file}.yml'): + self._config_file = f'{self._config_file}.yml' + if os.path.isfile(f'{self._config_file}.yaml'): + self._config_file = f'{self._config_file}.yaml' + if not os.path.isfile(self._config_file): + logging.error(f'Unable to load {self._config_file}') + return(False) + with open(self._config_file, 'r') as infile: + config = yaml.safe_load(infile) + if isinstance(config, dict): + return(config) + else: + logging.error(f'Unable to load {self._config_file} as a dictionary') + return(False) + + def validate(self): + if not self._validate_yaml_pars: + logging.warning('There are no required parameters provided for this detector '+ + 'configuration') + return(True) + + def validate_nested_pars(config, validate_yaml_par): + yaml_par_levels = validate_yaml_par.split(':') + first_level_par = yaml_par_levels[0] + try: + first_level_par = int(first_level_par) + except: + pass + try: + next_level_config = config[first_level_par] + if len(yaml_par_levels) > 1: + next_level_pars = ':'.join(yaml_par_levels[1:]) + return(validate_nested_pars(next_level_config, next_level_pars)) + else: + return(True) + except: + return(False) + + pars_missing = [p for p in self._validate_yaml_pars + if not validate_nested_pars(self.config, p)] + if len(pars_missing) > 0: + logging.error(f'Missing item(s) in configuration: {", ".join(pars_missing)}') + return(False) + else: + return(True) + + def write_to_file(self, out_file): + with open(out_file, 'w') as outf: + yaml.dump(self.config, outf) + + +class TomoDetectorConfig(YamlDetectorConfig): + def __init__(self, config_source): + validate_yaml_pars = ['detector', + 'lens_magnification', + 'detector:pixels:rows', + 'detector:pixels:columns', + *[f'detector:pixels:size:{i}' for i in range(2)]] + super().__init__(config_source, validate_yaml_pars=validate_yaml_pars) + + @property + @cache + def lens_magnification(self): + lens_magnification = self.config.get('lens_magnification') + if not isinstance(lens_magnification, (int, float)) or lens_magnification <= 0.: + illegal_value(lens_magnification, 'lens_magnification', 'detector file') + logging.warning('Using default lens_magnification value of 1.0') + return(1.0) + else: + return(lens_magnification) + + @property + @cache + def pixel_size(self): + pixel_size = self.config['detector'].get('pixels').get('size') + if isinstance(pixel_size, (int, float)): + if pixel_size <= 0.: + illegal_value(pixel_size, 'pixel_size', 'detector file') + return(None) + pixel_size /= self.lens_magnification + elif isinstance(pixel_size, list): + if ((len(pixel_size) > 2) or + (len(pixel_size) == 2 and pixel_size[0] != pixel_size[1])): + illegal_value(pixel_size, 'pixel size', 'detector file') + return(None) + elif not is_num(pixel_size[0], 0.): + illegal_value(pixel_size, 'pixel size', 'detector file') + return(None) + else: + pixel_size = pixel_size[0]/self.lens_magnification + else: + illegal_value(pixel_size, 'pixel size', 'detector file') + return(None) + + return(pixel_size) + + @property + @cache + def dimensions(self): + pixels = self.config['detector'].get('pixels') + num_rows = pixels.get('rows') + if not is_int(num_rows, 1): + illegal_value(num_rows, 'rows', 'detector file') + return(None) + num_columns = pixels.get('columns') + if not is_int(num_columns, 1): + illegal_value(num_columns, 'columns', 'detector file') + return(None) + return(num_rows, num_columns) + + +class EDDDetectorConfig(YamlDetectorConfig): + def __init__(self, config_source): + validate_yaml_pars = ['num_bins', + 'max_E', + # 'angle', # KLS leave this out for now -- I think it has to do with the relative geometry of sample, beam, and detector (not a property of the detector on its own), so may not belong here in the DetectorConfig object? + 'tth_angle', + 'slope', + 'intercept'] + super().__init__(config_source, validate_yaml_pars=validate_yaml_pars) + + @property + @cache + def num_bins(self): + try: + num_bins = int(self.config['num_bins']) + if num_bins <= 0: + raise(ValueError) + else: + return(num_bins) + except: + illegal_value(self.config['num_bins'], 'num_bins') + @property + @cache + def max_E(self): + try: + max_E = float(self.config['max_E']) + if max_E <= 0: + raise(ValueError) + else: + return(max_E) + except: + illegal_value(self.config['max_E'], 'max_E') + return(None) + + @property + def bin_energies(self): + return(self.slope * np.linspace(0, self.max_E, self.num_bins, endpoint=False) + + self.intercept) + + @property + def tth_angle(self): + try: + return(float(self.config['tth_angle'])) + except: + illegal_value(tth_angle, 'tth_angle') + return(None) + @tth_angle.setter + def tth_angle(self, value): + try: + self._config['tth_angle'] = float(value) + except: + illegal_value(value, 'tth_angle') + + @property + def slope(self): + try: + return(float(self.config['slope'])) + except: + illegal_value(slope, 'slope') + return(None) + @slope.setter + def slope(self, value): + try: + self._config['slope'] = float(value) + except: + illegal_value(value, 'slope') + + @property + def intercept(self): + try: + return(float(self.config['intercept'])) + except: + illegal_value(intercept, 'intercept') + return(None) + @intercept.setter + def intercept(self, value): + try: + self._config['intercept'] = float(value) + except: + illegal_value(value, 'intercept') + + +# class HexrdDetectorConfig(YamlDetectorConfig): +# def __init__(self, config_source, detector_names=[]): +# self.detector_names = detector_names +# validate_yaml_pars_each_detector = [*[f'buffer:{i}' for i in range(2)], +# 'distortion:function_name', +# *[f'distortion:parameters:{i}' for i in range(6)], +# 'pixels:columns', +# 'pixels:rows', +# *['pixels:size:%i' % i for i in range(2)], +# 'saturation_level', +# *[f'transform:tilt:{i}' for i in range(3)], +# *[f'transform:translation:{i}' for i in range(3)]] +# validate_yaml_pars = [] +# for detector_name in self.detector_names: +# validate_yaml_pars += [f'detectors:{detector_name}:{par}' for par in validate_yaml_pars_each_detector] + +# super().__init__(config_source, validate_yaml_pars=validate_yaml_pars) + +# def validate(self): +# yaml_valid = YamlDetectorConfig.validate(self) +# if not yaml_valid: +# return(False) +# else: +# hedm_instrument = HEDMInstrument(instrument_config=self.config) +# for detector_name in self.detector_names: +# if detector_name in hedm_instrument.detectors: +# if isinstance(hedm_instrument.detectors[detector_name], PlanarDetector): +# continue +# else: +# return(False) +# else: +# return(False) +# return(True) + +# class SAXSWAXSDetectorConfig(DetectorConfig): +# def __init__(self, config_source): +# super().__init__(config_source) + +# @property +# def ai(self): +# return(self.config) +# @ai.setter +# def ai(self, value): +# if isinstance(value, pyFAI.azimuthalIntegrator.AzimuthalIntegrator): +# self.config = ai +# else: +# illegal_value(value, 'azimuthal integrator') + +# # pyFAI will perform its own error-checking for the mask attribute. +# mask = property(self.ai.get_mask, self.ai,set_mask) + + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fit.py Thu Jul 28 16:05:24 2022 +0000 @@ -0,0 +1,663 @@ +#!/usr/bin/env python3 + +# -*- coding: utf-8 -*- +""" +Created on Mon Dec 6 15:36:22 2021 + +@author: rv43 +""" + +import sys +import re +import logging +import numpy as np + +from asteval import Interpreter +from copy import deepcopy +#from lmfit import Minimizer +from lmfit import Model, Parameters +from lmfit.models import ConstantModel, LinearModel, QuadraticModel, PolynomialModel,\ + StepModel, RectangleModel, GaussianModel, LorentzianModel + +#from .general import * +from general import is_index, index_nearest, quickPlot + +# sigma = fwhm_factor*fwhm +fwhm_factor = { + 'gaussian' : f'fwhm/(2*sqrt(2*log(2)))', + 'lorentzian' : f'0.5*fwhm', + 'splitlorentzian' : f'0.5*fwhm', # sigma = sigma_r + 'voight' : f'0.2776*fwhm', # sigma = gamma + 'pseudovoight' : f'0.5*fwhm'} # fraction = 0.5 + +# amplitude = height_factor*height*fwhm +height_factor = { + 'gaussian' : f'height*fwhm*0.5*sqrt(pi/log(2))', + 'lorentzian' : f'height*fwhm*0.5*pi', + 'splitlorentzian' : f'height*fwhm*0.5*pi', # sigma = sigma_r + 'voight' : f'3.334*height*fwhm', # sigma = gamma + 'pseudovoight' : f'1.268*height*fwhm'} # fraction = 0.5 + +class Fit: + """Wrapper class for lmfit + """ + def __init__(self, x, y, models=None, **kwargs): + self._x = x + self._y = y + self._model = None + self._parameters = Parameters() + self._result = None + if models is not None: + if callable(models) or isinstance(models, str): + kwargs = self.add_model(models, **kwargs) + elif isinstance(models, (tuple, list)): + for model in models: + kwargs = self.add_model(model, **kwargs) + self.fit(**kwargs) + + @classmethod + def fit_data(cls, x, y, models, **kwargs): + return cls(x, y, models, **kwargs) + + @property + def best_errors(self): + if self._result is None: + return None + errors = {} + names = sorted(self._result.params) + for name in names: + par = self._result.params[name] + errors[name] = par.stderr + return errors + + @property + def best_fit(self): + if self._result is None: + return None + return self._result.best_fit + + @property + def best_parameters(self): + if self._result is None: + return None + parameters = [] + names = sorted(self._result.params) + for name in names: + par = self._result.params[name] + parameters.append({'name' : par.name, 'value' : par.value, 'error' : par.stderr, + 'init_value' : par.init_value, 'min' : par.min, 'max' : par.max, + 'vary' : par.vary, 'expr' : par.expr}) + return parameters + + @property + def best_values(self): + if self._result is None: + return None + return self._result.params.valuesdict() + + @property + def chisqr(self): + return self._result.chisqr + + @property + def covar(self): + return self._result.covar + + @property + def init_values(self): + if self._result is None: + return None + return self._result.init_values + + @property + def num_func_eval(self): + return self._result.nfev + + @property + def redchi(self): + return self._result.redchi + + @property + def residual(self): + return self._result.residual + + @property + def success(self): + if not self._result.success: +# print(f'ier = {self._result.ier}') +# print(f'lmdif_message = {self._result.lmdif_message}') +# print(f'message = {self._result.message}') +# print(f'nfev = {self._result.nfev}') +# print(f'redchi = {self._result.redchi}') +# print(f'success = {self._result.success}') + if self._result.ier == 0 or self._result.ier == 5: + logging.warning(f'ier = {self._result.ier}: {self._result.message}') + else: + logging.warning(f'ier = {self._result.ier}: {self._result.message}') + return True +# self.print_fit_report() +# self.plot() + return self._result.success + + @property + def var_names(self): + """Intended to be used with covar + """ + if self._result is None: + return None + return self._result.var_names + + def print_fit_report(self, show_correl=False): + if self._result is not None: + print(self._result.fit_report(show_correl=show_correl)) + + def add_parameter(self, **parameter): + if not isinstance(parameter, dict): + illegal_value(parameter, 'parameter', 'add_parameter') + return + self._parameters.add(**parameter) + + def add_model(self, model, prefix=None, parameters=None, **kwargs): + # Create the new model +# print('\nAt start adding model:') +# self._parameters.pretty_print() + if prefix is not None and not isinstance(prefix, str): + logging.warning('Ignoring illegal prefix: {model} {type(model)}') + prefix = None + if callable(model): + newmodel = Model(model, prefix=prefix) + elif isinstance(model, str): + if model == 'constant': + newmodel = ConstantModel(prefix=prefix) + elif model == 'linear': + newmodel = LinearModel(prefix=prefix) + elif model == 'quadratic': + newmodel = QuadraticModel(prefix=prefix) + elif model == 'gaussian': + newmodel = GaussianModel(prefix=prefix) + elif model == 'step': + form = kwargs.get('form') + if form is not None: + del kwargs['form'] + if form is None or form not in ('linear', 'atan', 'arctan', 'erf', 'logistic'): + logging.error(f'Illegal form parameter for build-in step model ({form})') + return kwargs + newmodel = StepModel(prefix=prefix, form=form) + elif model == 'rectangle': + form = kwargs.get('form') + if form is not None: + del kwargs['form'] + if form is None or form not in ('linear', 'atan', 'arctan', 'erf', 'logistic'): + logging.error(f'Illegal form parameter for build-in rectangle model ({form})') + return kwargs + newmodel = RectangleModel(prefix=prefix, form=form) + else: + logging.error('Unknown build-in fit model') + return kwargs + else: + illegal_value(model, 'model', 'add_model') + return kwargs + + # Add the new model to the current one + if self._model is None: + self._model = newmodel + else: + self._model += newmodel + if self._parameters is None: + self._parameters = newmodel.make_params() + else: + self._parameters += newmodel.make_params() +# print('\nAfter adding model:') +# self._parameters.pretty_print() + + # Initialize the model parameters + if prefix is None: + prefix = "" + if parameters is not None: + if not isinstance(parameters, (tuple, list)): + illegal_value(parameters, 'parameters', 'add_model') + return kwargs + for parameter in parameters: + if not isinstance(parameter, dict): + illegal_value(parameter, 'parameter in parameters', 'add_model') + return kwargs + parameter['name'] = prefix+parameter['name'] + self._parameters.add(**parameter) + for name, value in kwargs.items(): + if isinstance(value, (int, float)): + self._parameters.add(prefix+name, value=value) +# print('\nAt end add_model:') +# self._parameters.pretty_print() + + return kwargs + + def fit(self, interactive=False, guess=False, **kwargs): + if self._model is None: + logging.error('Undefined fit model') + return + # Current parameter values + pars = self._parameters.valuesdict() + # Apply parameter updates through keyword arguments + for par in set(pars) & set(kwargs): + pars[par] = kwargs.pop(par) + self._parameters[par].set(value=pars[par]) + # Check for uninitialized parameters + for par, value in pars.items(): + if value is None or np.isinf(value) or np.isnan(value): + if interactive: + self._parameters[par].set(value= + input_num(f'Enter an initial value for {par}: ')) + else: + self._parameters[par].set(value=1.0) +# print('\nAt start actual fit:') +# print(f'kwargs = {kwargs}') +# self._parameters.pretty_print() +# print(f'parameters:\n{self._parameters}') +# print(f'x = {self._x}') +# print(f'len(x) = {len(self._x)}') +# print(f'y = {self._y}') +# print(f'len(y) = {len(self._y)}') + if guess: + self._parameters = self._model.guess(self._y, x=self._x) + self._result = self._model.fit(self._y, self._parameters, x=self._x, **kwargs) +# print('\nAt end actual fit:') +# print(f'var_names:\n{self._result.var_names}') +# print(f'stderr:\n{np.sqrt(np.diagonal(self._result.covar))}') +# self._parameters.pretty_print() +# print(f'parameters:\n{self._parameters}') + + def plot(self): + if self._result is None: + return + components = self._result.eval_components() + plots = ((self._x, self._y, '.'), (self._x, self._result.best_fit, 'k-'), + (self._x, self._result.init_fit, 'g-')) + legend = ['data', 'best fit', 'init'] + if len(components) > 1: + for modelname, y in components.items(): + if isinstance(y, (int, float)): + y *= np.ones(len(self._x)) + plots += ((self._x, y, '--'),) +# if modelname[-1] == '_': +# legend.append(modelname[:-1]) +# else: +# legend.append(modelname) + quickPlot(plots, legend=legend, block=True) + + @staticmethod + def guess_init_peak(x, y, *args, center_guess=None, use_max_for_center=True): + """ Return a guess for the initial height, center and fwhm for a peak + """ + center_guesses = None + if len(x) != len(y): + logging.error(f'Illegal x and y lengths ({len(x)}, {len(y)}), skip initial guess') + return None, None, None + if isinstance(center_guess, (int, float)): + if len(args): + logging.warning('Ignoring additional arguments for single center_guess value') + elif isinstance(center_guess, (tuple, list, np.ndarray)): + if len(center_guess) == 1: + logging.warning('Ignoring additional arguments for single center_guess value') + if not isinstance(center_guess[0], (int, float)): + raise ValueError(f'Illegal center_guess type ({type(center_guess[0])})') + center_guess = center_guess[0] + else: + if len(args) != 1: + raise ValueError(f'Illegal number of arguments ({len(args)})') + n = args[0] + if not is_index(n, 0, len(center_guess)): + raise ValueError('Illegal argument') + center_guesses = center_guess + center_guess = center_guesses[n] + elif center_guess is not None: + raise ValueError(f'Illegal center_guess type ({type(center_guess)})') + + # Sort the inputs + index = np.argsort(x) + x = x[index] + y = y[index] + miny = y.min() +# print(f'miny = {miny}') +# print(f'x_range = {x[0]} {x[-1]} {len(x)}') +# print(f'y_range = {y[0]} {y[-1]} {len(y)}') + +# xx = x +# yy = y + # Set range for current peak +# print(f'center_guesses = {center_guesses}') + if center_guesses is not None: + if n == 0: + low = 0 + upp = index_nearest(x, (center_guesses[0]+center_guesses[1])/2) + elif n == len(center_guesses)-1: + low = index_nearest(x, (center_guesses[n-1]+center_guesses[n])/2) + upp = len(x) + else: + low = index_nearest(x, (center_guesses[n-1]+center_guesses[n])/2) + upp = index_nearest(x, (center_guesses[n]+center_guesses[n+1])/2) +# print(f'low = {low}') +# print(f'upp = {upp}') + x = x[low:upp] + y = y[low:upp] +# quickPlot(x, y, vlines=(x[0], center_guess, x[-1]), block=True) + + # Estimate FHHM + maxy = y.max() +# print(f'x_range = {x[0]} {x[-1]} {len(x)}') +# print(f'y_range = {y[0]} {y[-1]} {len(y)} {miny} {maxy}') +# print(f'center_guess = {center_guess}') + if center_guess is None: + center_index = np.argmax(y) + center = x[center_index] + height = maxy-miny + else: + if use_max_for_center: + center_index = np.argmax(y) + center = x[center_index] + if center_index < 0.1*len(x) or center_index > 0.9*len(x): + center_index = index_nearest(x, center_guess) + center = center_guess + else: + center_index = index_nearest(x, center_guess) + center = center_guess + height = y[center_index]-miny +# print(f'center_index = {center_index}') +# print(f'center = {center}') +# print(f'height = {height}') + half_height = miny+0.5*height +# print(f'half_height = {half_height}') + fwhm_index1 = 0 + for i in range(center_index, fwhm_index1, -1): + if y[i] < half_height: + fwhm_index1 = i + break +# print(f'fwhm_index1 = {fwhm_index1} {x[fwhm_index1]}') + fwhm_index2 = len(x)-1 + for i in range(center_index, fwhm_index2): + if y[i] < half_height: + fwhm_index2 = i + break +# print(f'fwhm_index2 = {fwhm_index2} {x[fwhm_index2]}') +# quickPlot((x,y,'o'), vlines=(x[fwhm_index1], center, x[fwhm_index2]), block=True) + if fwhm_index1 == 0 and fwhm_index2 < len(x)-1: + fwhm = 2*(x[fwhm_index2]-center) + elif fwhm_index1 > 0 and fwhm_index2 == len(x)-1: + fwhm = 2*(center-x[fwhm_index1]) + else: + fwhm = x[fwhm_index2]-x[fwhm_index1] +# print(f'fwhm_index1 = {fwhm_index1} {x[fwhm_index1]}') +# print(f'fwhm_index2 = {fwhm_index2} {x[fwhm_index2]}') +# print(f'fwhm = {fwhm}') + + # Return height, center and FWHM +# quickPlot((x,y,'o'), (xx,yy), vlines=(x[fwhm_index1], center, x[fwhm_index2]), block=True) + return height, center, fwhm + + +class FitMultipeak(Fit): + """Fit data with multiple peaks + """ + def __init__(self, x, y, normalize=True): + super().__init__(x, deepcopy(y)) + self._norm = None + self._fwhm_max = None + self._sigma_max = None + if normalize: + self._normalize() + #quickPlot((self._x,self._y), block=True) + + @classmethod + def fit_multipeak(cls, x, y, centers, peak_models='gaussian', center_exprs=None, fit_type=None, + background_order=None, fwhm_max=None, plot_components=None): + """Make sure that centers and fwhm_max are in the correct units and consistent with expr + for a uniform fit (fit_type == 'uniform') + """ + fit = cls(x, y) + success = fit.fit(centers, fit_type=fit_type, peak_models=peak_models, fwhm_max=fwhm_max, + center_exprs=center_exprs, background_order=background_order, + plot_components=plot_components) + if success: + return fit.best_fit, fit.residual, fit.best_values, fit.best_errors, fit.redchi, \ + fit.success + else: + return np.array([]), np.array([]), {}, {}, sys.float_info.max, False + + def fit(self, centers, fit_type=None, peak_models=None, center_exprs=None, fwhm_max=None, + background_order=None, plot_components=None, param_constraint=False): + self._fwhm_max = fwhm_max + # Create the multipeak model + self._create_model(centers, fit_type, peak_models, center_exprs, background_order, + param_constraint) + + # Perform the fit + try: + if param_constraint: + super().fit(fit_kws={'xtol' : 1.e-5, 'ftol' : 1.e-5, 'gtol' : 1.e-5}) + else: + super().fit() + except: + return False + + # Check for valid fit parameter results + fit_failure = self._check_validity() + success = True + if fit_failure: + if param_constraint: + logging.warning(' -> Should not happen with param_constraint set, fail the fit') + success = False + else: + logging.info(' -> Retry fitting with constraints') + self.fit(centers, fit_type, peak_models, center_exprs, fwhm_max=fwhm_max, + background_order=background_order, plot_components=plot_components, + param_constraint=True) + else: + # Renormalize the data and results + self._renormalize() + + # Print report and plot components if requested + if plot_components is not None: + self.print_fit_report() + self.plot() + + return success + + def _create_model(self, centers, fit_type=None, peak_models=None, center_exprs=None, + background_order=None, param_constraint=False): + """Create the multipeak model + """ + if isinstance(centers, (int, float)): + centers = [centers] + num_peaks = len(centers) + if peak_models is None: + peak_models = num_peaks*['gaussian'] + elif isinstance(peak_models, str): + peak_models = num_peaks*[peak_models] + if len(peak_models) != num_peaks: + raise ValueError(f'Inconsistent number of peaks in peak_models ({len(peak_models)} vs '+ + f'{num_peaks})') + if num_peaks == 1: + if fit_type is not None: + logging.warning('Ignoring fit_type input for fitting one peak') + fit_type = None + if center_exprs is not None: + logging.warning('Ignoring center_exprs input for fitting one peak') + center_exprs = None + else: + if fit_type == 'uniform': + if center_exprs is None: + center_exprs = [f'scale_factor*{cen}' for cen in centers] + if len(center_exprs) != num_peaks: + raise ValueError(f'Inconsistent number of peaks in center_exprs '+ + f'({len(center_exprs)} vs {num_peaks})') + elif fit_type == 'unconstrained' or fit_type is None: + if center_exprs is not None: + logging.warning('Ignoring center_exprs input for unconstrained fit') + center_exprs = None + else: + raise ValueError(f'Illegal fit_type in fit_multigaussian {fit_type}') + self._sigma_max = None + if param_constraint: + min_value = sys.float_info.min + if self._fwhm_max is not None: + self._sigma_max = np.zeros(num_peaks) + else: + min_value = None + + # Reset the fit + self._model = None + self._parameters = Parameters() + self._result = None + + # Add background model + if background_order is not None: + if background_order == 0: + self.add_model('constant', prefix='background', c=0.0) + elif background_order == 1: + self.add_model('linear', prefix='background', slope=0.0, intercept=0.0) + elif background_order == 2: + self.add_model('quadratic', prefix='background', a=0.0, b=0.0, c=0.0) + else: + raise ValueError(f'background_order = {background_order}') + + # Add peaks and guess initial fit parameters + ast = Interpreter() + if num_peaks == 1: + height_init, cen_init, fwhm_init = self.guess_init_peak(self._x, self._y) + if self._fwhm_max is not None and fwhm_init > self._fwhm_max: + fwhm_init = self._fwhm_max + ast(f'fwhm = {fwhm_init}') + ast(f'height = {height_init}') + sig_init = ast(fwhm_factor[peak_models[0]]) + amp_init = ast(height_factor[peak_models[0]]) + sig_max = None + if self._sigma_max is not None: + ast(f'fwhm = {self._fwhm_max}') + sig_max = ast(fwhm_factor[peak_models[0]]) + self._sigma_max[0] = sig_max + self.add_model(peak_models[0], parameters=( + {'name' : 'amplitude', 'value' : amp_init, 'min' : min_value}, + {'name' : 'center', 'value' : cen_init, 'min' : min_value}, + {'name' : 'sigma', 'value' : sig_init, 'min' : min_value, 'max' : sig_max})) + else: + if fit_type == 'uniform': + self.add_parameter(name='scale_factor', value=1.0) + for i in range(num_peaks): + height_init, cen_init, fwhm_init = self.guess_init_peak(self._x, self._y, i, + center_guess=centers) + if self._fwhm_max is not None and fwhm_init > self._fwhm_max: + fwhm_init = self._fwhm_max + ast(f'fwhm = {fwhm_init}') + ast(f'height = {height_init}') + sig_init = ast(fwhm_factor[peak_models[i]]) + amp_init = ast(height_factor[peak_models[i]]) + sig_max = None + if self._sigma_max is not None: + ast(f'fwhm = {self._fwhm_max}') + sig_max = ast(fwhm_factor[peak_models[i]]) + self._sigma_max[i] = sig_max + if fit_type == 'uniform': + self.add_model(peak_models[i], prefix=f'peak{i+1}_', parameters=( + {'name' : 'amplitude', 'value' : amp_init, 'min' : min_value}, + {'name' : 'center', 'expr' : center_exprs[i], 'min' : min_value}, + {'name' : 'sigma', 'value' : sig_init, 'min' : min_value, + 'max' : sig_max})) + else: + self.add_model('gaussian', prefix=f'peak{i+1}_', parameters=( + {'name' : 'amplitude', 'value' : amp_init, 'min' : min_value}, + {'name' : 'center', 'value' : cen_init, 'min' : min_value}, + {'name' : 'sigma', 'value' : sig_init, 'min' : min_value, + 'max' : sig_max})) + + def _check_validity(self): + """Check for valid fit parameter results + """ + fit_failure = False + index = re.compile(r'\d+') + for parameter in self.best_parameters: + name = parameter['name'] + if ((('amplitude' in name or 'height' in name) and parameter['value'] <= 0.0) or + (('sigma' in name or 'fwhm' in name) and parameter['value'] <= 0.0) or + ('center' in name and parameter['value'] <= 0.0) or + (name == 'scale_factor' and parameter['value'] <= 0.0)): + logging.info(f'Invalid fit result for {name} ({parameter["value"]})') + fit_failure = True + if 'sigma' in name and self._sigma_max is not None: + if name == 'sigma': + sigma_max = self._sigma_max[0] + else: + sigma_max = self._sigma_max[int(index.search(name).group())-1] + i = int(index.search(name).group())-1 + if parameter['value'] > sigma_max: + logging.info(f'Invalid fit result for {name} ({parameter["value"]})') + fit_failure = True + elif parameter['value'] == sigma_max: + logging.warning(f'Edge result on for {name} ({parameter["value"]})') + if 'fwhm' in name and self._fwhm_max is not None: + if parameter['value'] > self._fwhm_max: + logging.info(f'Invalid fit result for {name} ({parameter["value"]})') + fit_failure = True + elif parameter['value'] == self._fwhm_max: + logging.warning(f'Edge result on for {name} ({parameter["value"]})') + return fit_failure + + def _normalize(self): + """Normalize the data + """ + y_min = self._y.min() + self._norm = (y_min, self._y.max()-y_min) + if self._norm[1] == 0.0: + self._norm = None + else: + self._y = (self._y-self._norm[0])/self._norm[1] + + def _renormalize(self): + """Renormalize the data and results + """ + if self._norm is None: + return + self._y = self._norm[0]+self._norm[1]*self._y + self._result.best_fit = self._norm[0]+self._norm[1]*self._result.best_fit + for name in self._result.params: + par = self._result.params[name] + if 'amplitude' in name or 'height' in name or 'background' in name: + par.value *= self._norm[1] + if par.stderr is not None: + par.stderr *= self._norm[1] + if par.init_value is not None: + par.init_value *= self._norm[1] + if par.min is not None and not np.isinf(par.min): + par.min *= self._norm[1] + if par.max is not None and not np.isinf(par.max): + par.max *= self._norm[1] + if 'intercept' in name or 'backgroundc' in name: + par.value += self._norm[0] + if par.init_value is not None: + par.init_value += self._norm[0] + if par.min is not None and not np.isinf(par.min): + par.min += self._norm[0] + if par.max is not None and not np.isinf(par.max): + par.max += self._norm[0] + self._result.init_fit = self._norm[0]+self._norm[1]*self._result.init_fit + init_values = {} + for name in self._result.init_values: + init_values[name] = self._result.init_values[name] + if init_values[name] is None: + continue + if 'amplitude' in name or 'height' in name or 'background' in name: + init_values[name] *= self._norm[1] + if 'intercept' in name or 'backgroundc' in name: + init_values[name] += self._norm[0] + self._result.init_values = init_values + # Don't renormalized chisqr, it has no useful meaning in physical units + #self._result.chisqr *= self._norm[1]*self._norm[1] + if self._result.covar is not None: + for i, name in enumerate(self._result.var_names): + if 'amplitude' in name or 'height' in name or 'background' in name: + for j in range(len(self._result.var_names)): + if self._result.covar[i,j] is not None: + self._result.covar[i,j] *= self._norm[1] + if self._result.covar[j,i] is not None: + self._result.covar[j,i] *= self._norm[1] + # Don't renormalized redchi, it has no useful meaning in physical units + #self._result.redchi *= self._norm[1]*self._norm[1] + self._result.residual *= self._norm[1]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/general.py Thu Jul 28 16:05:24 2022 +0000 @@ -0,0 +1,1374 @@ +#!/usr/bin/env python3 + +# -*- coding: utf-8 -*- +""" +Created on Mon Dec 6 15:36:22 2021 + +@author: rv43 +""" + +import logging + +import os +import sys +import re +import yaml +try: + import h5py +except: + pass +import numpy as np +try: + import matplotlib.pyplot as plt + from matplotlib.widgets import Button +except: + pass + +from ast import literal_eval +from copy import deepcopy +from time import time + + +def depth_list(L): return isinstance(L, list) and max(map(depth_list, L))+1 +def depth_tuple(T): return isinstance(T, tuple) and max(map(depth_tuple, T))+1 +def unwrap_tuple(T): + if depth_tuple(T) > 1 and len(T) == 1: + T = unwrap_tuple(*T) + return T + +def illegal_value(value, name, location=None, exit_flag=False): + if not isinstance(location, str): + location = '' + else: + location = f'in {location} ' + if isinstance(name, str): + logging.error(f'Illegal value for {name} {location}({value}, {type(value)})') + else: + logging.error(f'Illegal value {location}({value}, {type(value)})') + if exit_flag: + raise ValueError + +def is_int(v, v_min=None, v_max=None): + """Value is an integer in range v_min <= v <= v_max. + """ + if not isinstance(v, int): + return False + if v_min is not None and not isinstance(v_min, int): + illegal_value(v_min, 'v_min', 'is_int') + return False + if v_max is not None and not isinstance(v_max, int): + illegal_value(v_max, 'v_max', 'is_int') + return False + if v_min is not None and v_max is not None and v_min > v_max: + logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})') + return False + if (v_min is not None and v < v_min) or (v_max is not None and v > v_max): + return False + return True + +def is_int_pair(v, v_min=None, v_max=None): + """Value is an integer pair, each in range v_min <= v[i] <= v_max or + v_min[i] <= v[i] <= v_max[i]. + """ + if not (isinstance(v, (tuple, list)) and len(v) == 2 and isinstance(v[0], int) and + isinstance(v[1], int)): + return False + if v_min is not None or v_max is not None: + if (v_min is None or isinstance(v_min, int)) and (v_max is None or isinstance(v_max, int)): + if True in [True if not is_int(vi, v_min=v_min, v_max=v_max) else False for vi in v]: + return False + elif is_int_pair(v_min) and is_int_pair(v_max): + if True in [True if v_min[i] > v_max[i] else False for i in range(2)]: + logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})') + return False + if True in [True if not is_int(v[i], v_min[i], v_max[i]) else False for i in range(2)]: + return False + elif is_int_pair(v_min) and (v_max is None or isinstance(v_max, int)): + if True in [True if not is_int(v[i], v_min=v_min[i], v_max=v_max) else False + for i in range(2)]: + return False + elif (v_min is None or isinstance(v_min, int)) and is_int_pair(v_max): + if True in [True if not is_int(v[i], v_min=v_min, v_max=v_max[i]) else False + for i in range(2)]: + return False + else: + logging.error(f'Illegal v_min or v_max input ({v_min} {type(v_min)} and '+ + f'{v_max} {type(v_max)})') + return False + return True + +def is_int_series(l, v_min=None, v_max=None): + """Value is a tuple or list of integers, each in range v_min <= l[i] <= v_max. + """ + if v_min is not None and not isinstance(v_min, int): + illegal_value(v_min, 'v_min', 'is_int_series') + return False + if v_max is not None and not isinstance(v_max, int): + illegal_value(v_max, 'v_max', 'is_int_series') + return False + if not isinstance(l, (tuple, list)): + return False + if True in [True if not is_int(v, v_min=v_min, v_max=v_max) else False for v in l]: + return False + return True + +def is_num(v, v_min=None, v_max=None): + """Value is a number in range v_min <= v <= v_max. + """ + if not isinstance(v, (int, float)): + return False + if v_min is not None and not isinstance(v_min, (int, float)): + illegal_value(v_min, 'v_min', 'is_num') + return False + if v_max is not None and not isinstance(v_max, (int, float)): + illegal_value(v_max, 'v_max', 'is_num') + return False + if v_min is not None and v_max is not None and v_min > v_max: + logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})') + return False + if (v_min is not None and v < v_min) or (v_max is not None and v > v_max): + return False + return True + +def is_num_pair(v, v_min=None, v_max=None): + """Value is a number pair, each in range v_min <= v[i] <= v_max or + v_min[i] <= v[i] <= v_max[i]. + """ + if not (isinstance(v, (tuple, list)) and len(v) == 2 and isinstance(v[0], (int, float)) and + isinstance(v[1], (int, float))): + return False + if v_min is not None or v_max is not None: + if ((v_min is None or isinstance(v_min, (int, float))) and + (v_max is None or isinstance(v_max, (int, float)))): + if True in [True if not is_num(vi, v_min=v_min, v_max=v_max) else False for vi in v]: + return False + elif is_num_pair(v_min) and is_num_pair(v_max): + if True in [True if v_min[i] > v_max[i] else False for i in range(2)]: + logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})') + return False + if True in [True if not is_num(v[i], v_min[i], v_max[i]) else False for i in range(2)]: + return False + elif is_num_pair(v_min) and (v_max is None or isinstance(v_max, (int, float))): + if True in [True if not is_num(v[i], v_min=v_min[i], v_max=v_max) else False + for i in range(2)]: + return False + elif (v_min is None or isinstance(v_min, (int, float))) and is_num_pair(v_max): + if True in [True if not is_num(v[i], v_min=v_min, v_max=v_max[i]) else False + for i in range(2)]: + return False + else: + logging.error(f'Illegal v_min or v_max input ({v_min} {type(v_min)} and '+ + f'{v_max} {type(v_max)})') + return False + return True + +def is_num_series(l, v_min=None, v_max=None): + """Value is a tuple or list of numbers, each in range v_min <= l[i] <= v_max. + """ + if v_min is not None and not isinstance(v_min, (int, float)): + illegal_value(v_min, 'v_min', 'is_num_series') + return False + if v_max is not None and not isinstance(v_max, (int, float)): + illegal_value(v_max, 'v_max', 'is_num_series') + return False + if not isinstance(l, (tuple, list)): + return False + if True in [True if not is_num(v, v_min=v_min, v_max=v_max) else False for v in l]: + return False + return True + +def is_index(v, v_min=0, v_max=None): + """Value is an array index in range v_min <= v < v_max. + NOTE v_max IS NOT included! + """ + if isinstance(v_max, int): + if v_max <= v_min: + logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})') + return False + v_max -= 1 + return is_int(v, v_min, v_max) + +def is_index_range(v, v_min=0, v_max=None): + """Value is an array index range in range v_min <= v[0] <= v[1] <= v_max. + NOTE v_max IS included! + """ + if not is_int_pair(v): + return False + if not isinstance(v_min, int): + illegal_value(v_min, 'v_min', 'is_index_range') + return False + if v_max is not None: + if not isinstance(v_max, int): + illegal_value(v_max, 'v_max', 'is_index_range') + return False + if v_max < v_min: + logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})') + return False + if not v_min <= v[0] <= v[1] or (v_max is not None and v[1] > v_max): + return False + return True + +def index_nearest(a, value): + a = np.asarray(a) + if a.ndim > 1: + logging.warning(f'Illegal input array ({a}, {type(a)})') + # Round up for .5 + value *= 1.0+sys.float_info.epsilon + return (int)(np.argmin(np.abs(a-value))) + +def index_nearest_low(a, value): + a = np.asarray(a) + if a.ndim > 1: + logging.warning(f'Illegal input array ({a}, {type(a)})') + index = int(np.argmin(np.abs(a-value))) + if value < a[index] and index > 0: + index -= 1 + return index + +def index_nearest_upp(a, value): + a = np.asarray(a) + if a.ndim > 1: + logging.warning(f'Illegal input array ({a}, {type(a)})') + index = int(np.argmin(np.abs(a-value))) + if value > a[index] and index < a.size-1: + index += 1 + return index + +def round_to_n(x, n=1): + if x == 0.0: + return 0 + else: + return round(x, n-1-int(np.floor(np.log10(abs(x))))) + +def round_up_to_n(x, n=1): + xr = round_to_n(x, n) + if abs(x/xr) > 1.0: + xr += np.sign(x)*10**(np.floor(np.log10(abs(x)))+1-n) + return xr + +def trunc_to_n(x, n=1): + xr = round_to_n(x, n) + if abs(xr/x) > 1.0: + xr -= np.sign(x)*10**(np.floor(np.log10(abs(x)))+1-n) + return xr + +def string_to_list(s): + """Return a list of numbers by splitting/expanding a string on any combination of + dashes, commas, and/or whitespaces + e.g: '1, 3, 5-8,12 ' -> [1, 3, 5, 6, 7, 8, 12] + """ + if not isinstance(s, str): + illegal_value(s, location='string_to_list') + return None + if not len(s): + return [] + try: + list1 = [x for x in re.split('\s+,\s+|\s+,|,\s+|\s+|,', s.strip())] + except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError): + return None + try: + l = [] + for l1 in list1: + l2 = [literal_eval(x) for x in re.split('\s+-\s+|\s+-|-\s+|\s+|-', l1)] + if len(l2) == 1: + l += l2 + elif len(l2) == 2 and l2[1] > l2[0]: + l += [i for i in range(l2[0], l2[1]+1)] + else: + raise ValueError + except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError): + return None + return sorted(set(l)) + +def get_trailing_int(string): + indexRegex = re.compile(r'\d+$') + mo = indexRegex.search(string) + if mo is None: + return None + else: + return int(mo.group()) + +def input_int(s=None, v_min=None, v_max=None, default=None, inset=None): + if default is not None: + if not isinstance(default, int): + illegal_value(default, 'default', 'input_int') + return None + default_string = f' [{default}]' + else: + default_string = '' + if v_min is not None: + if not isinstance(v_min, int): + illegal_value(v_min, 'v_min', 'input_int') + return None + if default is not None and default < v_min: + logging.error('Illegal v_min, default combination ({v_min}, {default})') + return None + if v_max is not None: + if not isinstance(v_max, int): + illegal_value(v_max, 'v_max', 'input_int') + return None + if v_min is not None and v_min > v_max: + logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})') + return None + if default is not None and default > v_max: + logging.error('Illegal default, v_max combination ({default}, {v_max})') + return None + if inset is not None: + if (not isinstance(inset, (tuple, list)) or False in [True if isinstance(i, int) else + False for i in inset]): + illegal_value(inset, 'inset', 'input_int') + return None + if v_min is not None and v_max is not None: + v_range = f' ({v_min}, {v_max})' + elif v_min is not None: + v_range = f' (>= {v_min})' + elif v_max is not None: + v_range = f' (<= {v_max})' + else: + v_range = '' + if s is None: + print(f'Enter an integer{v_range}{default_string}: ') + else: + print(f'{s}{v_range}{default_string}: ') + try: + i = input() + if isinstance(i, str) and not len(i): + v = default + else: + v = literal_eval(i) + if inset and v not in inset: + raise ValueError(f'{v} not part of the set {inset}') + except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError): + v = None + except: + print('Unexpected error') + raise + if not is_int(v, v_min, v_max): + print('Illegal input, enter a valid integer') + v = input_int(s, v_min, v_max, default) + return v + +def input_num(s=None, v_min=None, v_max=None, default=None): + if default is not None: + if not isinstance(default, (int, float)): + illegal_value(default, 'default', 'input_num') + return None + default_string = f' [{default}]' + else: + default_string = '' + if v_min is not None: + if not isinstance(v_min, (int, float)): + illegal_value(vmin, 'vmin', 'input_num') + return None + if default is not None and default < v_min: + logging.error('Illegal v_min, default combination ({v_min}, {default})') + return None + if v_max is not None: + if not isinstance(v_max, (int, float)): + illegal_value(vmax, 'vmax', 'input_num') + return None + if v_min is not None and v_max < v_min: + logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})') + return None + if default is not None and default > v_max: + logging.error('Illegal default, v_max combination ({default}, {v_max})') + return None + if v_min is not None and v_max is not None: + v_range = f' ({v_min}, {v_max})' + elif v_min is not None: + v_range = f' (>= {v_min})' + elif v_max is not None: + v_range = f' (<= {v_max})' + else: + v_range = '' + if s is None: + print(f'Enter a number{v_range}{default_string}: ') + else: + print(f'{s}{v_range}{default_string}: ') + try: + i = input() + if isinstance(i, str) and not len(i): + v = default + else: + v = literal_eval(i) + except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError): + v = None + except: + print('Unexpected error') + raise + if not is_num(v, v_min, v_max): + print('Illegal input, enter a valid number') + v = input_num(s, v_min, v_max, default) + return v + +def input_int_list(s=None, v_min=None, v_max=None): + if v_min is not None and not isinstance(v_min, int): + illegal_value(vmin, 'vmin', 'input_int_list') + return None + if v_max is not None: + if not isinstance(v_max, int): + illegal_value(vmax, 'vmax', 'input_int_list') + return None + if v_max < v_min: + logging.error(f'Illegal v_min, v_max combination ({v_min}, {v_max})') + return None + if v_min is not None and v_max is not None: + v_range = f' (each value in ({v_min}, {v_max}))' + elif v_min is not None: + v_range = f' (each value >= {v_min})' + elif v_max is not None: + v_range = f' (each value <= {v_max})' + else: + v_range = '' + if s is None: + print(f'Enter a series of integers{v_range}: ') + else: + print(f'{s}{v_range}: ') + try: + l = string_to_list(input()) + except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError): + l = None + except: + print('Unexpected error') + raise + if (not isinstance(l, list) or + True in [True if not is_int(v, v_min, v_max) else False for v in l]): + print('Illegal input: enter a valid set of dash/comma/whitespace separated integers '+ + 'e.g. 2,3,5-8,10') + l = input_int_list(s, v_min, v_max) + return l + +def input_yesno(s=None, default=None): + if default is not None: + if not isinstance(default, str): + illegal_value(default, 'default', 'input_yesno') + return None + if default.lower() in 'yes': + default = 'y' + elif default.lower() in 'no': + default = 'n' + else: + illegal_value(default, 'default', 'input_yesno') + return None + default_string = f' [{default}]' + else: + default_string = '' + if s is None: + print(f'Enter yes or no{default_string}: ') + else: + print(f'{s}{default_string}: ') + i = input() + if isinstance(i, str) and not len(i): + i = default + if i is not None and i.lower() in 'yes': + v = True + elif i is not None and i.lower() in 'no': + v = False + else: + print('Illegal input, enter yes or no') + v = input_yesno(s, default) + return v + +def input_menu(items, default=None, header=None): + if not isinstance(items, (tuple, list)) or False in [True if isinstance(i, str) else False + for i in items]: + illegal_value(items, 'items', 'input_menu') + return None + if default is not None: + if not (isinstance(default, str) and default in items): + logging.error(f'Illegal value for default ({default}), must be in {items}') + return None + default_string = f' [{items.index(default)+1}]' + else: + default_string = '' + if header is None: + print(f'Choose one of the following items (1, {len(items)}){default_string}:') + else: + print(f'{header} (1, {len(items)}){default_string}:') + for i, choice in enumerate(items): + print(f' {i+1}: {choice}') + choice = input() + if isinstance(choice, str) and not len(choice): + choice = items.index(default) + else: + choice = literal_eval(choice) + if isinstance(choice, int) and 1 <= choice <= len(items): + choice -= 1 + else: + print(f'Illegal choice, enter a number between 1 and {len(items)}') + choice = input_menu(items, default) + return choice + +def create_mask(x, bounds=None, reverse_mask=False, current_mask=None): + # bounds is a pair of number in the same units a x + if not isinstance(x, (tuple, list, np.ndarray)) or not len(x): + logging.warning(f'Illegal input array ({x}, {type(x)})') + return None + if bounds is not None and not is_num_pair(bounds): + logging.warning(f'Illegal bounds parameter ({bounds} {type(bounds)}, input ignored') + bounds = None + if bounds is not None: + if not reverse_mask: + mask = np.logical_and(x > min(bounds), x < max(bounds)) + else: + mask = np.logical_or(x < min(bounds), x > max(bounds)) + else: + mask = np.ones(len(x), dtype=bool) + if current_mask is not None: + if not isinstance(current_mask, (tuple, list, np.ndarray)) or len(current_mask) != len(x): + logging.warning(f'Illegal current_mask ({current_mask}, {type(current_mask)}), '+ + 'input ignored') + else: + mask = np.logical_and(mask, current_mask) + if not True in mask: + logging.warning('Entire data array is masked') + return mask + +def draw_mask_1d(ydata, xdata=None, current_index_ranges=None, current_mask=None, + select_mask=True, num_index_ranges_max=None, title=None, legend=None): + def draw_selections(ax): + ax.clear() + ax.set_title(title) + ax.legend([legend]) + ax.plot(xdata, ydata, 'k') + for (low, upp) in current_include: + xlow = 0.5*(xdata[max(0, low-1)]+xdata[low]) + xupp = 0.5*(xdata[upp]+xdata[min(num_data-1, upp+1)]) + ax.axvspan(xlow, xupp, facecolor='green', alpha=0.5) + for (low, upp) in current_exclude: + xlow = 0.5*(xdata[max(0, low-1)]+xdata[low]) + xupp = 0.5*(xdata[upp]+xdata[min(num_data-1, upp+1)]) + ax.axvspan(xlow, xupp, facecolor='red', alpha=0.5) + for (low, upp) in selected_index_ranges: + xlow = 0.5*(xdata[max(0, low-1)]+xdata[low]) + xupp = 0.5*(xdata[upp]+xdata[min(num_data-1, upp+1)]) + ax.axvspan(xlow, xupp, facecolor=selection_color, alpha=0.5) + ax.get_figure().canvas.draw() + + def onclick(event): + if event.inaxes in [fig.axes[0]]: + selected_index_ranges.append(index_nearest_upp(xdata, event.xdata)) + + def onrelease(event): + if len(selected_index_ranges) > 0: + if isinstance(selected_index_ranges[-1], int): + if event.inaxes in [fig.axes[0]]: + event.xdata = index_nearest_low(xdata, event.xdata) + if selected_index_ranges[-1] <= event.xdata: + selected_index_ranges[-1] = (selected_index_ranges[-1], event.xdata) + else: + selected_index_ranges[-1] = (event.xdata, selected_index_ranges[-1]) + draw_selections(event.inaxes) + else: + selected_index_ranges.pop(-1) + + def confirm_selection(event): + plt.close() + + def clear_last_selection(event): + if len(selected_index_ranges): + selected_index_ranges.pop(-1) + draw_selections(ax) + + def update_mask(mask): + for (low, upp) in selected_index_ranges: + selected_mask = np.logical_and(xdata >= xdata[low], xdata <= xdata[upp]) + mask = np.logical_or(mask, selected_mask) + for (low, upp) in unselected_index_ranges: + unselected_mask = np.logical_and(xdata >= xdata[low], xdata <= xdata[upp]) + mask[unselected_mask] = False + return mask + + def update_index_ranges(mask): + # Update the currently included index ranges (where mask is True) + current_include = [] + for i, m in enumerate(mask): + if m == True: + if len(current_include) == 0 or type(current_include[-1]) == tuple: + current_include.append(i) + else: + if len(current_include) > 0 and isinstance(current_include[-1], int): + current_include[-1] = (current_include[-1], i-1) + if len(current_include) > 0 and isinstance(current_include[-1], int): + current_include[-1] = (current_include[-1], num_data-1) + return current_include + + # Check for valid inputs + ydata = np.asarray(ydata) + if ydata.ndim > 1: + logging.warning(f'Illegal ydata dimension ({ydata.ndim})') + return None, None + num_data = ydata.size + if xdata is None: + xdata = np.arange(num_data) + else: + xdata = np.asarray(xdata, dtype=np.float64) + if xdata.ndim > 1 or xdata.size != num_data: + logging.warning(f'Illegal xdata shape ({xdata.shape})') + return None, None + if not np.all(xdata[:-1] < xdata[1:]): + logging.warning('Illegal xdata: must be monotonically increasing') + return None, None + if current_index_ranges is not None: + if not isinstance(current_index_ranges, (tuple, list)): + logging.warning('Illegal current_index_ranges parameter ({current_index_ranges}, '+ + f'{type(current_index_ranges)})') + return None, None + if not isinstance(select_mask, bool): + logging.warning('Illegal select_mask parameter ({select_mask}, {type(select_mask)})') + return None, None + if num_index_ranges_max is not None: + logging.warning('num_index_ranges_max input not yet implemented in draw_mask_1d') + if title is None: + title = 'select ranges of data' + elif not isinstance(title, str): + illegal(title, 'title') + title = '' + if legend is None and not isinstance(title, str): + illegal(legend, 'legend') + legend = None + + if select_mask: + title = f'Click and drag to {title} you wish to include' + selection_color = 'green' + else: + title = f'Click and drag to {title} you wish to exclude' + selection_color = 'red' + + # Set initial selected mask and the selected/unselected index ranges as needed + selected_index_ranges = [] + unselected_index_ranges = [] + selected_mask = np.full(xdata.shape, False, dtype=bool) + if current_index_ranges is None: + if current_mask is None: + if not select_mask: + selected_index_ranges = [(0, num_data-1)] + selected_mask = np.full(xdata.shape, True, dtype=bool) + else: + selected_mask = np.copy(np.asarray(current_mask, dtype=bool)) + if current_index_ranges is not None and len(current_index_ranges): + current_index_ranges = sorted([(low, upp) for (low, upp) in current_index_ranges]) + for (low, upp) in current_index_ranges: + if low > upp or low >= num_data or upp < 0: + continue + if low < 0: + low = 0 + if upp >= num_data: + upp = num_data-1 + selected_index_ranges.append((low, upp)) + selected_mask = update_mask(selected_mask) + if current_index_ranges is not None and current_mask is not None: + selected_mask = np.logical_and(current_mask, selected_mask) + if current_mask is not None: + selected_index_ranges = update_index_ranges(selected_mask) + + # Set up range selections for display + current_include = selected_index_ranges + current_exclude = [] + selected_index_ranges = [] + if not len(current_include): + if select_mask: + current_exclude = [(0, num_data-1)] + else: + current_include = [(0, num_data-1)] + else: + if current_include[0][0] > 0: + current_exclude.append((0, current_include[0][0]-1)) + for i in range(1, len(current_include)): + current_exclude.append((current_include[i-1][1]+1, current_include[i][0]-1)) + if current_include[-1][1] < num_data-1: + current_exclude.append((current_include[-1][1]+1, num_data-1)) + + # Set up matplotlib figure + plt.close('all') + fig, ax = plt.subplots() + plt.subplots_adjust(bottom=0.2) + draw_selections(ax) + + # Set up event handling for click-and-drag range selection + cid_click = fig.canvas.mpl_connect('button_press_event', onclick) + cid_release = fig.canvas.mpl_connect('button_release_event', onrelease) + + # Set up confirm / clear range selection buttons + confirm_b = Button(plt.axes([0.75, 0.05, 0.15, 0.075]), 'Confirm') + clear_b = Button(plt.axes([0.59, 0.05, 0.15, 0.075]), 'Clear') + cid_confirm = confirm_b.on_clicked(confirm_selection) + cid_clear = clear_b.on_clicked(clear_last_selection) + + # Show figure + plt.show(block=True) + + # Disconnect callbacks when figure is closed + fig.canvas.mpl_disconnect(cid_click) + fig.canvas.mpl_disconnect(cid_release) + confirm_b.disconnect(cid_confirm) + clear_b.disconnect(cid_clear) + + # Swap selection depending on select_mask + if not select_mask: + selected_index_ranges, unselected_index_ranges = unselected_index_ranges, \ + selected_index_ranges + + # Update the mask with the currently selected/unselected x-ranges + selected_mask = update_mask(selected_mask) + + # Update the currently included index ranges (where mask is True) + current_include = update_index_ranges(selected_mask) + + return selected_mask, current_include + +def findImageFiles(path, filetype, name=None): + if isinstance(name, str): + name = f' {name} ' + else: + name = ' ' + # Find available index range + if filetype == 'tif': + if not isinstance(path, str) or not os.path.isdir(path): + illegal_value(path, 'path', 'findImageRange') + return -1, 0, [] + indexRegex = re.compile(r'\d+') + # At this point only tiffs + files = sorted([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) and + f.endswith('.tif') and indexRegex.search(f)]) + num_imgs = len(files) + if num_imgs < 1: + logging.warning('No available'+name+'files') + return -1, 0, [] + first_index = indexRegex.search(files[0]).group() + last_index = indexRegex.search(files[-1]).group() + if first_index is None or last_index is None: + logging.error('Unable to find correctly indexed'+name+'images') + return -1, 0, [] + first_index = int(first_index) + last_index = int(last_index) + if num_imgs != last_index-first_index+1: + logging.error('Non-consecutive set of indices for'+name+'images') + return -1, 0, [] + paths = [os.path.join(path, f) for f in files] + elif filetype == 'h5': + if not isinstance(path, str) or not os.path.isfile(path): + illegal_value(path, 'path', 'findImageRange') + return -1, 0, [] + # At this point only h5 in alamo2 detector style + first_index = 0 + with h5py.File(path, 'r') as f: + num_imgs = f['entry/instrument/detector/data'].shape[0] + last_index = num_imgs-1 + paths = [path] + else: + illegal_value(filetype, 'filetype', 'findImageRange') + return -1, 0, [] + logging.debug('\nNumber of available'+name+f'images: {num_imgs}') + logging.debug('Index range of available'+name+f'images: [{first_index}, '+ + f'{last_index}]') + + return first_index, num_imgs, paths + +def selectImageRange(first_index, offset, num_imgs, name=None, num_required=None): + if isinstance(name, str): + name = f' {name} ' + else: + name = ' ' + # Check existing values + use_input = False + if (is_int(first_index, 0) and is_int(offset, 0) and is_int(num_imgs, 1)): + if offset < 0: + use_input = input_yesno(f'\nCurrent{name}first index = {first_index}, '+ + 'use this value (y/n)?', 'y') + else: + use_input = input_yesno(f'\nCurrent{name}first index/offset = '+ + f'{first_index}/{offset}, use these values (y/n)?', 'y') + if num_required is None: + if use_input: + use_input = input_yesno(f'Current number of{name}images = '+ + f'{num_imgs}, use this value (y/n)? ', 'y') + if use_input: + return first_index, offset, num_imgs + + # Check range against requirements + if num_imgs < 1: + logging.warning('No available'+name+'images') + return -1, -1, 0 + if num_required is None: + if num_imgs == 1: + return first_index, 0, 1 + else: + if not is_int(num_required, 1): + illegal_value(num_required, 'num_required', 'selectImageRange') + return -1, -1, 0 + if num_imgs < num_required: + logging.error('Unable to find the required'+name+ + f'images ({num_imgs} out of {num_required})') + return -1, -1, 0 + + # Select index range + print('\nThe number of available'+name+f'images is {num_imgs}') + if num_required is None: + last_index = first_index+num_imgs + use_all = f'Use all ([{first_index}, {last_index}])' + pick_offset = 'Pick a first index offset and a number of images' + pick_bounds = 'Pick the first and last index' + choice = input_menu([use_all, pick_offset, pick_bounds], default=pick_offset) + if not choice: + offset = 0 + elif choice == 1: + offset = input_int('Enter the first index offset', 0, last_index-first_index) + first_index += offset + if first_index == last_index: + num_imgs = 1 + else: + num_imgs = input_int('Enter the number of images', 1, num_imgs-offset) + else: + offset = input_int('Enter the first index', first_index, last_index) + first_index += offset + num_imgs = input_int('Enter the last index', first_index, last_index)-first_index+1 + else: + use_all = f'Use ([{first_index}, {first_index+num_required-1}])' + pick_offset = 'Pick the first index offset' + choice = input_menu([use_all, pick_offset], pick_offset) + offset = 0 + if choice == 1: + offset = input_int('Enter the first index offset', 0, num_imgs-num_required) + first_index += offset + num_imgs = num_required + + return first_index, offset, num_imgs + +def loadImage(f, img_x_bounds=None, img_y_bounds=None): + """Load a single image from file. + """ + if not os.path.isfile(f): + logging.error(f'Unable to load {f}') + return None + img_read = plt.imread(f) + if not img_x_bounds: + img_x_bounds = (0, img_read.shape[0]) + else: + if (not isinstance(img_x_bounds, (tuple, list)) or len(img_x_bounds) != 2 or + not (0 <= img_x_bounds[0] < img_x_bounds[1] <= img_read.shape[0])): + logging.error(f'inconsistent row dimension in {f}') + return None + if not img_y_bounds: + img_y_bounds = (0, img_read.shape[1]) + else: + if (not isinstance(img_y_bounds, list) or len(img_y_bounds) != 2 or + not (0 <= img_y_bounds[0] < img_y_bounds[1] <= img_read.shape[1])): + logging.error(f'inconsistent column dimension in {f}') + return None + return img_read[img_x_bounds[0]:img_x_bounds[1],img_y_bounds[0]:img_y_bounds[1]] + +def loadImageStack(files, filetype, img_offset, num_imgs, num_img_skip=0, + img_x_bounds=None, img_y_bounds=None): + """Load a set of images and return them as a stack. + """ + logging.debug(f'img_offset = {img_offset}') + logging.debug(f'num_imgs = {num_imgs}') + logging.debug(f'num_img_skip = {num_img_skip}') + logging.debug(f'\nfiles:\n{files}\n') + img_stack = np.array([]) + if filetype == 'tif': + img_read_stack = [] + i = 1 + t0 = time() + for f in files[img_offset:img_offset+num_imgs:num_img_skip+1]: + if not i%20: + logging.info(f' loading {i}/{num_imgs}: {f}') + else: + logging.debug(f' loading {i}/{num_imgs}: {f}') + img_read = loadImage(f, img_x_bounds, img_y_bounds) + img_read_stack.append(img_read) + i += num_img_skip+1 + img_stack = np.stack([img_read for img_read in img_read_stack]) + logging.info(f'... done in {time()-t0:.2f} seconds!') + logging.debug(f'img_stack shape = {np.shape(img_stack)}') + del img_read_stack, img_read + elif filetype == 'h5': + if not isinstance(files[0], str) and not os.path.isfile(files[0]): + illegal_value(files[0], 'files[0]', 'loadImageStack') + return img_stack + t0 = time() + logging.info(f'Loading {files[0]}') + with h5py.File(files[0], 'r') as f: + shape = f['entry/instrument/detector/data'].shape + if len(shape) != 3: + logging.error(f'inconsistent dimensions in {files[0]}') + if not img_x_bounds: + img_x_bounds = (0, shape[1]) + else: + if (not isinstance(img_x_bounds, (tuple, list)) or len(img_x_bounds) != 2 or + not (0 <= img_x_bounds[0] < img_x_bounds[1] <= shape[1])): + logging.error(f'inconsistent row dimension in {files[0]} {img_x_bounds} '+ + f'{shape[1]}') + if not img_y_bounds: + img_y_bounds = (0, shape[2]) + else: + if (not isinstance(img_y_bounds, list) or len(img_y_bounds) != 2 or + not (0 <= img_y_bounds[0] < img_y_bounds[1] <= shape[2])): + logging.error(f'inconsistent column dimension in {files[0]}') + img_stack = f.get('entry/instrument/detector/data')[ + img_offset:img_offset+num_imgs:num_img_skip+1, + img_x_bounds[0]:img_x_bounds[1],img_y_bounds[0]:img_y_bounds[1]] + logging.info(f'... done in {time()-t0:.2f} seconds!') + else: + illegal_value(filetype, 'filetype', 'loadImageStack') + return img_stack + +def combine_tiffs_in_h5(files, num_imgs, h5_filename): + img_stack = loadImageStack(files, 'tif', 0, num_imgs) + with h5py.File(h5_filename, 'w') as f: + f.create_dataset('entry/instrument/detector/data', data=img_stack) + del img_stack + return [h5_filename] + +def clearImshow(title=None): + plt.ioff() + if title is None: + title = 'quick imshow' + elif not isinstance(title, str): + illegal_value(title, 'title', 'clearImshow') + return + plt.close(fig=title) + +def clearPlot(title=None): + plt.ioff() + if title is None: + title = 'quick plot' + elif not isinstance(title, str): + illegal_value(title, 'title', 'clearPlot') + return + plt.close(fig=title) + +def quickImshow(a, title=None, path=None, name=None, save_fig=False, save_only=False, + clear=True, extent=None, show_grid=False, grid_color='w', grid_linewidth=1, **kwargs): + if title is not None and not isinstance(title, str): + illegal_value(title, 'title', 'quickImshow') + return + if path is not None and not isinstance(path, str): + illegal_value(path, 'path', 'quickImshow') + return + if not isinstance(save_fig, bool): + illegal_value(save_fig, 'save_fig', 'quickImshow') + return + if not isinstance(save_only, bool): + illegal_value(save_only, 'save_only', 'quickImshow') + return + if not isinstance(clear, bool): + illegal_value(clear, 'clear', 'quickImshow') + return + if not title: + title='quick imshow' +# else: +# title = re.sub(r"\s+", '_', title) + if name is None: + ttitle = re.sub(r"\s+", '_', title) + if path is None: + path = f'{ttitle}.png' + else: + path = f'{path}/{ttitle}.png' + else: + if path is None: + path = name + else: + path = f'{path}/{name}' + if extent is None: + extent = (0, a.shape[1], a.shape[0], 0) + if clear: + plt.close(fig=title) + if not save_only: + plt.ion() + plt.figure(title) + plt.imshow(a, extent=extent, **kwargs) + if show_grid: + ax = plt.gca() + ax.grid(color=grid_color, linewidth=grid_linewidth) +# if title != 'quick imshow': +# plt.title = title + if save_only: + plt.savefig(path) + plt.close(fig=title) + else: + if save_fig: + plt.savefig(path) + +def quickPlot(*args, xerr=None, yerr=None, vlines=None, title=None, xlim=None, ylim=None, + xlabel=None, ylabel=None, legend=None, path=None, name=None, show_grid=False, + save_fig=False, save_only=False, clear=True, block=False, **kwargs): + if title is not None and not isinstance(title, str): + illegal_value(title, 'title', 'quickPlot') + title = None + if xlim is not None and not isinstance(xlim, (tuple, list)) and len(xlim) != 2: + illegal_value(xlim, 'xlim', 'quickPlot') + xlim = None + if ylim is not None and not isinstance(ylim, (tuple, list)) and len(ylim) != 2: + illegal_value(ylim, 'ylim', 'quickPlot') + ylim = None + if xlabel is not None and not isinstance(xlabel, str): + illegal_value(xlabel, 'xlabel', 'quickPlot') + xlabel = None + if ylabel is not None and not isinstance(ylabel, str): + illegal_value(ylabel, 'ylabel', 'quickPlot') + ylabel = None + if legend is not None and not isinstance(legend, (tuple, list)): + illegal_value(legend, 'legend', 'quickPlot') + legend = None + if path is not None and not isinstance(path, str): + illegal_value(path, 'path', 'quickPlot') + return + if not isinstance(show_grid, bool): + illegal_value(show_grid, 'show_grid', 'quickPlot') + return + if not isinstance(save_fig, bool): + illegal_value(save_fig, 'save_fig', 'quickPlot') + return + if not isinstance(save_only, bool): + illegal_value(save_only, 'save_only', 'quickPlot') + return + if not isinstance(clear, bool): + illegal_value(clear, 'clear', 'quickPlot') + return + if not isinstance(block, bool): + illegal_value(block, 'block', 'quickPlot') + return + if title is None: + title = 'quick plot' +# else: +# title = re.sub(r"\s+", '_', title) + if name is None: + ttitle = re.sub(r"\s+", '_', title) + if path is None: + path = f'{ttitle}.png' + else: + path = f'{path}/{ttitle}.png' + else: + if path is None: + path = name + else: + path = f'{path}/{name}' + if clear: + plt.close(fig=title) + args = unwrap_tuple(args) + if depth_tuple(args) > 1 and (xerr is not None or yerr is not None): + logging.warning('Error bars ignored form multiple curves') + if not save_only: + if block: + plt.ioff() + else: + plt.ion() + plt.figure(title) + if depth_tuple(args) > 1: + for y in args: + plt.plot(*y, **kwargs) + else: + if xerr is None and yerr is None: + plt.plot(*args, **kwargs) + else: + plt.errorbar(*args, xerr=xerr, yerr=yerr, **kwargs) + if vlines is not None: + for v in vlines: + plt.axvline(v, color='r', linestyle='--', **kwargs) +# if vlines is not None: +# for s in tuple(([x, x], list(plt.gca().get_ylim())) for x in vlines): +# plt.plot(*s, color='red', **kwargs) + if xlim is not None: + plt.xlim(xlim) + if ylim is not None: + plt.ylim(ylim) + if xlabel is not None: + plt.xlabel(xlabel) + if ylabel is not None: + plt.ylabel(ylabel) + if show_grid: + ax = plt.gca() + ax.grid(color='k')#, linewidth=1) + if legend is not None: + plt.legend(legend) + if save_only: + plt.savefig(path) + plt.close(fig=title) + else: + if save_fig: + plt.savefig(path) + if block: + plt.show(block=block) + +def selectArrayBounds(a, x_low=None, x_upp=None, num_x_min=None, ask_bounds=False, + title='select array bounds'): + """Interactively select the lower and upper data bounds for a numpy array. + """ + if isinstance(a, (tuple, list)): + a = np.array(a) + if not isinstance(a, np.ndarray) or a.ndim != 1: + illegal_value(a.ndim, 'array type or dimension', 'selectArrayBounds') + return None + len_a = len(a) + if num_x_min is None: + num_x_min = 1 + else: + if num_x_min < 2 or num_x_min > len_a: + logging.warning('Illegal value for num_x_min in selectArrayBounds, input ignored') + num_x_min = 1 + + # Ask to use current bounds + if ask_bounds and (x_low is not None or x_upp is not None): + if x_low is None: + x_low = 0 + if not is_int(x_low, 0, len_a-num_x_min): + illegal_value(x_low, 'x_low', 'selectArrayBounds') + return None + if x_upp is None: + x_upp = len_a + if not is_int(x_upp, x_low+num_x_min, len_a): + illegal_value(x_upp, 'x_upp', 'selectArrayBounds') + return None + quickPlot((range(len_a), a), vlines=(x_low,x_upp), title=title) + if not input_yesno(f'\nCurrent array bounds: [{x_low}, {x_upp}] '+ + 'use these values (y/n)?', 'y'): + x_low = None + x_upp = None + else: + clearPlot(title) + return x_low, x_upp + + if x_low is None: + x_min = 0 + x_max = len_a + x_low_max = len_a-num_x_min + while True: + quickPlot(range(x_min, x_max), a[x_min:x_max], title=title) + zoom_flag = input_yesno('Set lower data bound (y) or zoom in (n)?', 'y') + if zoom_flag: + x_low = input_int(' Set lower data bound', 0, x_low_max) + break + else: + x_min = input_int(' Set lower zoom index', 0, x_low_max) + x_max = input_int(' Set upper zoom index', x_min+1, x_low_max+1) + else: + if not is_int(x_low, 0, len_a-num_x_min): + illegal_value(x_low, 'x_low', 'selectArrayBounds') + return None + if x_upp is None: + x_min = x_low+num_x_min + x_max = len_a + x_upp_min = x_min + while True: + quickPlot(range(x_min, x_max), a[x_min:x_max], title=title) + zoom_flag = input_yesno('Set upper data bound (y) or zoom in (n)?', 'y') + if zoom_flag: + x_upp = input_int(' Set upper data bound', x_upp_min, len_a) + break + else: + x_min = input_int(' Set upper zoom index', x_upp_min, len_a-1) + x_max = input_int(' Set upper zoom index', x_min+1, len_a) + else: + if not is_int(x_upp, x_low+num_x_min, len_a): + illegal_value(x_upp, 'x_upp', 'selectArrayBounds') + return None + print(f'lower bound = {x_low} (inclusive)\nupper bound = {x_upp} (exclusive)]') + quickPlot((range(len_a), a), vlines=(x_low,x_upp), title=title) + if not input_yesno('Accept these bounds (y/n)?', 'y'): + x_low, x_upp = selectArrayBounds(a, None, None, num_x_min, title=title) + clearPlot(title) + return x_low, x_upp + +def selectImageBounds(a, axis, low=None, upp=None, num_min=None, + title='select array bounds'): + """Interactively select the lower and upper data bounds for a 2D numpy array. + """ + if isinstance(a, np.ndarray): + if a.ndim != 2: + illegal_value(a.ndim, 'array dimension', 'selectImageBounds') + return None + elif isinstance(a, (tuple, list)): + if len(a) != 2: + illegal_value(len(a), 'array dimension', 'selectImageBounds') + return None + if len(a[0]) != len(a[1]) or not (isinstance(a[0], (tuple, list, np.ndarray)) and + isinstance(a[1], (tuple, list, np.ndarray))): + logging.error(f'Illegal array type in selectImageBounds ({type(a[0])} {type(a[1])})') + return None + a = np.array(a) + else: + illegal_value(a, 'array type', 'selectImageBounds') + return None + if axis < 0 or axis >= a.ndim: + illegal_value(axis, 'axis', 'selectImageBounds') + return None + low_save = low + upp_save = upp + num_min_save = num_min + if num_min is None: + num_min = 1 + else: + if num_min < 2 or num_min > a.shape[axis]: + logging.warning('Illegal input for num_min in selectImageBounds, input ignored') + num_min = 1 + if low is None: + min_ = 0 + max_ = a.shape[axis] + low_max = a.shape[axis]-num_min + while True: + if axis: + quickImshow(a[:,min_:max_], title=title, aspect='auto', + extent=[min_,max_,a.shape[0],0]) + else: + quickImshow(a[min_:max_,:], title=title, aspect='auto', + extent=[0,a.shape[1], max_,min_]) + zoom_flag = input_yesno('Set lower data bound (y) or zoom in (n)?', 'y') + if zoom_flag: + low = input_int(' Set lower data bound', 0, low_max) + break + else: + min_ = input_int(' Set lower zoom index', 0, low_max) + max_ = input_int(' Set upper zoom index', min_+1, low_max+1) + else: + if not is_int(low, 0, a.shape[axis]-num_min): + illegal_value(low, 'low', 'selectImageBounds') + return None + if upp is None: + min_ = low+num_min + max_ = a.shape[axis] + upp_min = min_ + while True: + if axis: + quickImshow(a[:,min_:max_], title=title, aspect='auto', + extent=[min_,max_,a.shape[0],0]) + else: + quickImshow(a[min_:max_,:], title=title, aspect='auto', + extent=[0,a.shape[1], max_,min_]) + zoom_flag = input_yesno('Set upper data bound (y) or zoom in (n)?', 'y') + if zoom_flag: + upp = input_int(' Set upper data bound', upp_min, a.shape[axis]) + break + else: + min_ = input_int(' Set upper zoom index', upp_min, a.shape[axis]-1) + max_ = input_int(' Set upper zoom index', min_+1, a.shape[axis]) + else: + if not is_int(upp, low+num_min, a.shape[axis]): + illegal_value(upp, 'upp', 'selectImageBounds') + return None + bounds = (low, upp) + a_tmp = np.copy(a) + a_tmp_max = a.max() + if axis: + a_tmp[:,bounds[0]] = a_tmp_max + a_tmp[:,bounds[1]-1] = a_tmp_max + else: + a_tmp[bounds[0],:] = a_tmp_max + a_tmp[bounds[1]-1,:] = a_tmp_max + print(f'lower bound = {low} (inclusive)\nupper bound = {upp} (exclusive)') + quickImshow(a_tmp, title=title) + del a_tmp + if not input_yesno('Accept these bounds (y/n)?', 'y'): + bounds = selectImageBounds(a, axis, low=low_save, upp=upp_save, num_min=num_min_save, + title=title) + return bounds + + +class Config: + """Base class for processing a config file or dictionary. + """ + def __init__(self, config_file=None, config_dict=None): + self.config = {} + self.load_flag = False + self.suffix = None + + # Load config file + if config_file is not None and config_dict is not None: + logging.warning('Ignoring config_dict (both config_file and config_dict are specified)') + if config_file is not None: + self.loadFile(config_file) + elif config_dict is not None: + self.loadDict(config_dict) + + def loadFile(self, config_file): + """Load a config file. + """ + if self.load_flag: + logging.warning('Overwriting any previously loaded config file') + self.config = {} + + # Ensure config file exists + if not os.path.isfile(config_file): + logging.error(f'Unable to load {config_file}') + return + + # Load config file (for now for Galaxy, allow .dat extension) + self.suffix = os.path.splitext(config_file)[1] + if self.suffix == '.yml' or self.suffix == '.yaml' or self.suffix == '.dat': + with open(config_file, 'r') as f: + self.config = yaml.safe_load(f) + elif self.suffix == '.txt': + with open(config_file, 'r') as f: + lines = f.read().splitlines() + self.config = {item[0].strip():literal_eval(item[1].strip()) for item in + [line.split('#')[0].split('=') for line in lines if '=' in line.split('#')[0]]} + else: + illegal_value(self.suffix, 'config file extension', 'Config.loadFile') + + # Make sure config file was correctly loaded + if isinstance(self.config, dict): + self.load_flag = True + else: + logging.error(f'Unable to load dictionary from config file: {config_file}') + self.config = {} + + def loadDict(self, config_dict): + """Takes a dictionary and places it into self.config. + """ + if self.load_flag: + logging.warning('Overwriting the previously loaded config file') + + if isinstance(config_dict, dict): + self.config = config_dict + self.load_flag = True + else: + illegal_value(config_dict, 'dictionary config object', 'Config.loadDict') + self.config = {} + + def saveFile(self, config_file): + """Save the config file (as a yaml file only right now). + """ + suffix = os.path.splitext(config_file)[1] + if suffix != '.yml' and suffix != '.yaml': + illegal_value(suffix, 'config file extension', 'Config.saveFile') + + # Check if config file exists + if os.path.isfile(config_file): + logging.info(f'Updating {config_file}') + else: + logging.info(f'Saving {config_file}') + + # Save config file + with open(config_file, 'w') as f: + yaml.safe_dump(self.config, f) + + def validate(self, pars_required, pars_missing=None): + """Returns False if any required keys are missing. + """ + if not self.load_flag: + logging.error('Load a config file prior to calling Config.validate') + + def validate_nested_pars(config, par): + par_levels = par.split(':') + first_level_par = par_levels[0] + try: + first_level_par = int(first_level_par) + except: + pass + try: + next_level_config = config[first_level_par] + if len(par_levels) > 1: + next_level_par = ':'.join(par_levels[1:]) + return validate_nested_pars(next_level_config, next_level_par) + else: + return True + except: + return False + + pars_missing = [p for p in pars_required if not validate_nested_pars(self.config, p)] + if len(pars_missing) > 0: + logging.error(f'Missing item(s) in configuration: {", ".join(pars_missing)}') + return False + else: + return True
--- a/msnc_tools.py Wed Apr 27 17:28:26 2022 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,900 +0,0 @@ -#!/usr/bin/env python3 - -# -*- coding: utf-8 -*- -""" -Created on Mon Dec 6 15:36:22 2021 - -@author: rv43 -""" - -import logging - -import os -import sys -import re -import yaml -import h5py -try: - import pyinputplus as pyip -except: - pass -import numpy as np -import imageio as img -import matplotlib.pyplot as plt -from time import time -from ast import literal_eval -try: - from lmfit.models import StepModel, RectangleModel -except: - pass - -def depth_list(L): return isinstance(L, list) and max(map(depth_list, L))+1 -def depth_tuple(T): return isinstance(T, tuple) and max(map(depth_tuple, T))+1 - -def is_int(v, v_min=None, v_max=None): - """Value is an integer in range v_min <= v <= v_max. - """ - if not isinstance(v, int): - return False - if (v_min is not None and v < v_min) or (v_max is not None and v > v_max): - return False - return True - -def is_num(v, v_min=None, v_max=None): - """Value is a number in range v_min <= v <= v_max. - """ - if not isinstance(v, (int,float)): - return False - if (v_min is not None and v < v_min) or (v_max is not None and v > v_max): - return False - return True - -def is_index(v, v_min=0, v_max=None): - """Value is an array index in range v_min <= v < v_max. - """ - if not isinstance(v, int): - return False - if v < v_min or (v_max is not None and v >= v_max): - return False - return True - -def is_index_range(v, v_min=0, v_max=None): - """Value is an array index range in range v_min <= v[0] <= v[1] <= v_max. - """ - if not (isinstance(v, list) and len(v) == 2 and isinstance(v[0], int) and - isinstance(v[1], int)): - return False - if not 0 <= v[0] <= v[1] or (v_max is not None and v[1] > v_max): - return False - return True - -def illegal_value(name, value, location=None, exit_flag=False): - if not isinstance(location, str): - location = '' - else: - location = f'in {location} ' - if isinstance(name, str): - logging.error(f'Illegal value for {name} {location}({value}, {type(value)})') - else: - logging.error(f'Illegal value {location}({value}, {type(value)})') - if exit_flag: - exit(1) - -def get_trailing_int(string): - indexRegex = re.compile(r'\d+$') - mo = indexRegex.search(string) - if mo is None: - return None - else: - return int(mo.group()) - -def findImageFiles(path, filetype, name=None): - if isinstance(name, str): - name = f' {name} ' - else: - name = ' ' - # Find available index range - if filetype == 'tif': - if not isinstance(path, str) and not os.path.isdir(path): - illegal_value('path', path, 'findImageRange') - return -1, 0, [] - indexRegex = re.compile(r'\d+') - # At this point only tiffs - files = sorted([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) and - f.endswith('.tif') and indexRegex.search(f)]) - num_imgs = len(files) - if num_imgs < 1: - logging.warning('No available'+name+'files') - return -1, 0, [] - first_index = indexRegex.search(files[0]).group() - last_index = indexRegex.search(files[-1]).group() - if first_index is None or last_index is None: - logging.error('Unable to find correctly indexed'+name+'images') - return -1, 0, [] - first_index = int(first_index) - last_index = int(last_index) - if num_imgs != last_index-first_index+1: - logging.error('Non-consecutive set of indices for'+name+'images') - return -1, 0, [] - paths = [os.path.join(path, f) for f in files] - elif filetype == 'h5': - if not isinstance(path, str) or not os.path.isfile(path): - illegal_value('path', path, 'findImageRange') - return -1, 0, [] - # At this point only h5 in alamo2 detector style - first_index = 0 - with h5py.File(path, 'r') as f: - num_imgs = f['entry/instrument/detector/data'].shape[0] - last_index = num_imgs-1 - paths = [path] - else: - illegal_value('filetype', filetype, 'findImageRange') - return -1, 0, [] - logging.debug('\nNumber of available'+name+f'images: {num_imgs}') - logging.debug('Index range of available'+name+f'images: [{first_index}, '+ - f'{last_index}]') - - return first_index, num_imgs, paths - -def selectImageRange(first_index, offset, num_imgs, name=None, num_required=None): - if isinstance(name, str): - name = f' {name} ' - else: - name = ' ' - # Check existing values - use_input = 'no' - if (is_int(first_index, 0) and is_int(offset, 0) and is_int(num_imgs, 1)): - if offset < 0: - use_input = pyip.inputYesNo('\nCurrent'+name+f'first index = {first_index}, '+ - 'use this value ([y]/n)? ', blank=True) - else: - use_input = pyip.inputYesNo('\nCurrent'+name+'first index/offset = '+ - f'{first_index}/{offset}, use these values ([y]/n)? ', - blank=True) - if num_required is None: - if use_input != 'no': - use_input = pyip.inputYesNo('Current number of'+name+'images = '+ - f'{num_imgs}, use this value ([y]/n)? ', - blank=True) - if use_input != 'no': - return first_index, offset, num_imgs - - # Check range against requirements - if num_imgs < 1: - logging.warning('No available'+name+'images') - return -1, -1, 0 - if num_required is None: - if num_imgs == 1: - return first_index, 0, 1 - else: - if not is_int(num_required, 1): - illegal_value('num_required', num_required, 'selectImageRange') - return -1, -1, 0 - if num_imgs < num_required: - logging.error('Unable to find the required'+name+ - f'images ({num_imgs} out of {num_required})') - return -1, -1, 0 - - # Select index range - print('\nThe number of available'+name+f'images is {num_imgs}') - if num_required is None: - last_index = first_index+num_imgs - use_all = f'Use all ([{first_index}, {last_index}])' - pick_offset = 'Pick a first index offset and a number of images' - pick_bounds = 'Pick the first and last index' - menuchoice = pyip.inputMenu([use_all, pick_offset, pick_bounds], numbered=True) - if menuchoice == use_all: - offset = 0 - elif menuchoice == pick_offset: - offset = pyip.inputInt('Enter the first index offset'+ - f' [0, {last_index-first_index}]: ', min=0, max=last_index-first_index) - first_index += offset - if first_index == last_index: - num_imgs = 1 - else: - num_imgs = pyip.inputInt(f'Enter the number of images [1, {num_imgs-offset}]: ', - min=1, max=num_imgs-offset) - else: - offset = pyip.inputInt(f'Enter the first index [{first_index}, {last_index}]: ', - min=first_index, max=last_index)-first_index - first_index += offset - num_imgs = pyip.inputInt(f'Enter the last index [{first_index}, {last_index}]: ', - min=first_index, max=last_index)-first_index+1 - else: - use_all = f'Use ([{first_index}, {first_index+num_required-1}])' - pick_offset = 'Pick the first index offset' - menuchoice = pyip.inputMenu([use_all, pick_offset], numbered=True) - offset = 0 - if menuchoice == pick_offset: - offset = pyip.inputInt('Enter the first index offset'+ - f'[0, {num_imgs-num_required}]: ', min=0, max=num_imgs-num_required) - first_index += offset - num_imgs = num_required - - return first_index, offset, num_imgs - -def loadImage(f, img_x_bounds=None, img_y_bounds=None): - """Load a single image from file. - """ - if not os.path.isfile(f): - logging.error(f'Unable to load {f}') - return None - img_read = img.imread(f) - if not img_x_bounds: - img_x_bounds = [0, img_read.shape[0]] - else: - if (not isinstance(img_x_bounds, list) or len(img_x_bounds) != 2 or - not (0 <= img_x_bounds[0] < img_x_bounds[1] <= img_read.shape[0])): - logging.error(f'inconsistent row dimension in {f}') - return None - if not img_y_bounds: - img_y_bounds = [0, img_read.shape[1]] - else: - if (not isinstance(img_y_bounds, list) or len(img_y_bounds) != 2 or - not (0 <= img_y_bounds[0] < img_y_bounds[1] <= img_read.shape[1])): - logging.error(f'inconsistent column dimension in {f}') - return None - return img_read[img_x_bounds[0]:img_x_bounds[1],img_y_bounds[0]:img_y_bounds[1]] - -def loadImageStack(files, filetype, img_offset, num_imgs, num_img_skip=0, - img_x_bounds=None, img_y_bounds=None): - """Load a set of images and return them as a stack. - """ - logging.debug(f'img_offset = {img_offset}') - logging.debug(f'num_imgs = {num_imgs}') - logging.debug(f'num_img_skip = {num_img_skip}') - logging.debug(f'\nfiles:\n{files}\n') - img_stack = np.array([]) - if filetype == 'tif': - img_read_stack = [] - i = 1 - t0 = time() - for f in files[img_offset:img_offset+num_imgs:num_img_skip+1]: - if not i%20: - logging.info(f' loading {i}/{num_imgs}: {f}') - else: - logging.debug(f' loading {i}/{num_imgs}: {f}') - img_read = loadImage(f, img_x_bounds, img_y_bounds) - img_read_stack.append(img_read) - i += num_img_skip+1 - img_stack = np.stack([img_read for img_read in img_read_stack]) - logging.info(f'... done in {time()-t0:.2f} seconds!') - logging.debug(f'img_stack shape = {np.shape(img_stack)}') - del img_read_stack, img_read - elif filetype == 'h5': - if not isinstance(files[0], str) and not os.path.isfile(files[0]): - illegal_value('files[0]', files[0], 'loadImageStack') - return img_stack - t0 = time() - logging.info(f'Loading {files[0]}') - with h5py.File(files[0], 'r') as f: - shape = f['entry/instrument/detector/data'].shape - if len(shape) != 3: - logging.error(f'inconsistent dimensions in {files[0]}') - if not img_x_bounds: - img_x_bounds = [0, shape[1]] - else: - if (not isinstance(img_x_bounds, list) or len(img_x_bounds) != 2 or - not (0 <= img_x_bounds[0] < img_x_bounds[1] <= shape[1])): - logging.error(f'inconsistent row dimension in {files[0]}') - if not img_y_bounds: - img_y_bounds = [0, shape[2]] - else: - if (not isinstance(img_y_bounds, list) or len(img_y_bounds) != 2 or - not (0 <= img_y_bounds[0] < img_y_bounds[1] <= shape[2])): - logging.error(f'inconsistent column dimension in {files[0]}') - img_stack = f.get('entry/instrument/detector/data')[ - img_offset:img_offset+num_imgs:num_img_skip+1, - img_x_bounds[0]:img_x_bounds[1],img_y_bounds[0]:img_y_bounds[1]] - logging.info(f'... done in {time()-t0:.2f} seconds!') - else: - illegal_value('filetype', filetype, 'findImageRange') - return img_stack - -def combine_tiffs_in_h5(files, num_imgs, h5_filename): - img_stack = loadImageStack(files, 'tif', 0, num_imgs) - with h5py.File(h5_filename, 'w') as f: - f.create_dataset('entry/instrument/detector/data', data=img_stack) - del img_stack - return [h5_filename] - -def clearFig(title): - if not isinstance(title, str): - illegal_value('title', title, 'clearFig') - return - plt.close(fig=re.sub(r"\s+", '_', title)) - -def quickImshow(a, title=None, path=None, name=None, save_fig=False, save_only=False, - clear=True, extent=None, show_grid=False, grid_color='w', grid_linewidth=1, **kwargs): - if title is not None and not isinstance(title, str): - illegal_value('title', title, 'quickImshow') - return - if path is not None and not isinstance(path, str): - illegal_value('path', path, 'quickImshow') - return - if not isinstance(save_fig, bool): - illegal_value('save_fig', save_fig, 'quickImshow') - return - if not isinstance(save_only, bool): - illegal_value('save_only', save_only, 'quickImshow') - return - if not isinstance(clear, bool): - illegal_value('clear', clear, 'quickImshow') - return - if not title: - title='quick_imshow' - else: - title = re.sub(r"\s+", '_', title) - if name is None: - if path is None: - path = f'{title}.png' - else: - path = f'{path}/{title}.png' - else: - if path is None: - path = name - else: - path = f'{path}/{name}' - if extent is None: - extent = (0, a.shape[1], a.shape[0], 0) - if clear: - plt.close(fig=title) - if save_only: - plt.figure(title) - plt.imshow(a, extent=extent, **kwargs) - if show_grid: - ax = plt.gca() - ax.grid(color=grid_color, linewidth=grid_linewidth) - if title != 'quick_imshow': - plt.title = title - plt.savefig(path) - plt.close(fig=title) - #plt.imsave(f'{title}.png', a, **kwargs) - else: - plt.ion() - plt.figure(title) - plt.imshow(a, extent=extent, **kwargs) - if show_grid: - ax = plt.gca() - ax.grid(color=grid_color, linewidth=grid_linewidth) - if title != 'quick_imshow': - plt.title = title - if save_fig: - plt.savefig(path) - plt.pause(1) - -def quickPlot(*args, title=None, path=None, name=None, save_fig=False, save_only=False, - clear=True, show_grid=False, **kwargs): - if title is not None and not isinstance(title, str): - illegal_value('title', title, 'quickPlot') - return - if path is not None and not isinstance(path, str): - illegal_value('path', path, 'quickPlot') - return - if not isinstance(save_fig, bool): - illegal_value('save_fig', save_fig, 'quickPlot') - return - if not isinstance(save_only, bool): - illegal_value('save_only', save_only, 'quickPlot') - return - if not isinstance(clear, bool): - illegal_value('clear', clear, 'quickPlot') - return - if not title: - title = 'quick_plot' - else: - title = re.sub(r"\s+", '_', title) - if name is None: - if path is None: - path = f'{title}.png' - else: - path = f'{path}/{title}.png' - else: - if path is None: - path = name - else: - path = f'{path}/{name}' - if clear: - plt.close(fig=title) - if save_only: - plt.figure(title) - if depth_tuple(args) > 1: - for y in args: - plt.plot(*y, **kwargs) - else: - plt.plot(*args, **kwargs) - if show_grid: - ax = plt.gca() - ax.grid(color='k')#, linewidth=1) - if title != 'quick_plot': - plt.title = title - plt.savefig(path) - plt.close(fig=title) - else: - plt.ion() - plt.figure(title) - if depth_tuple(args) > 1: - for y in args: - plt.plot(*y, **kwargs) - else: - plt.plot(*args, **kwargs) - if show_grid: - ax = plt.gca() - ax.grid(color='k')#, linewidth=1) - if title != 'quick_plot': - plt.title = title - if save_fig: - plt.savefig(path) - plt.pause(1) - -def selectArrayBounds(a, x_low=None, x_upp=None, num_x_min=None, - title='select array bounds'): - """Interactively select the lower and upper data bounds for a numpy array. - """ - if not isinstance(a, np.ndarray) or a.ndim != 1: - logging.error('Illegal array type or dimension in selectArrayBounds') - return None - x_low_save = x_low - x_upp_save = x_upp - num_x_min_save = num_x_min - if num_x_min is None: - num_x_min = 1 - else: - if num_x_min < 2 or num_x_min > a.size: - logging.warning('Illegal input for num_x_min in selectArrayBounds, input ignored') - num_x_min = 1 - if x_low is None: - x_min = 0 - x_max = a.size - x_low_max = a.size-num_x_min - while True: - quickPlot(range(x_min, x_max), a[x_min:x_max], title=title) - zoom_flag = pyip.inputInt('Set lower data bound (0) or zoom in (1)?: ', - min=0, max=1) - if zoom_flag: - x_min = pyip.inputInt(f' Set lower zoom index [0, {x_low_max}]: ', - min=0, max=x_low_max) - x_max = pyip.inputInt(f' Set upper zoom index [{x_min+1}, {x_low_max+1}]: ', - min=x_min+1, max=x_low_max+1) - else: - x_low = pyip.inputInt(f' Set lower data bound [0, {x_low_max}]: ', - min=0, max=x_low_max) - break - else: - if not is_int(x_low, 0, a.size-num_x_min): - illegal_value('x_low', x_low, 'selectArrayBounds') - return None - if x_upp is None: - x_min = x_low+num_x_min - x_max = a.size - x_upp_min = x_min - while True: - quickPlot(range(x_min, x_max), a[x_min:x_max], title=title) - zoom_flag = pyip.inputInt('Set upper data bound (0) or zoom in (1)?: ', - min=0, max=1) - if zoom_flag: - x_min = pyip.inputInt(f' Set upper zoom index [{x_upp_min}, {a.size-1}]: ', - min=x_upp_min, max=a.size-1) - x_max = pyip.inputInt(f' Set upper zoom index [{x_min+1}, {a.size}]: ', - min=x_min+1, max=a.size) - else: - x_upp = pyip.inputInt(f' Set upper data bound [{x_upp_min}, {a.size}]: ', - min=x_upp_min, max=a.size) - break - else: - if not is_int(x_upp, x_low+num_x_min, a.size): - illegal_value('x_upp', x_upp, 'selectArrayBounds') - return None - bounds = [x_low, x_upp] - print(f'lower bound = {x_low} (inclusive)\nupper bound = {x_upp} (exclusive)]') - #quickPlot(range(bounds[0], bounds[1]), a[bounds[0]:bounds[1]], title=title) - quickPlot((range(a.size), a), ([bounds[0], bounds[0]], [a.min(), a.max()], 'r-'), - ([bounds[1], bounds[1]], [a.min(), a.max()], 'r-'), title=title) - if pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True) == 'no': - bounds = selectArrayBounds(a, x_low_save, x_upp_save, num_x_min_save, title=title) - return bounds - -def selectImageBounds(a, axis, low=None, upp=None, num_min=None, - title='select array bounds'): - """Interactively select the lower and upper data bounds for a 2D numpy array. - """ - if not isinstance(a, np.ndarray) or a.ndim != 2: - logging.error('Illegal array type or dimension in selectImageBounds') - return None - if axis < 0 or axis >= a.ndim: - illegal_value('axis', axis, 'selectImageBounds') - return None - low_save = low - upp_save = upp - num_min_save = num_min - if num_min is None: - num_min = 1 - else: - if num_min < 2 or num_min > a.shape[axis]: - logging.warning('Illegal input for num_min in selectImageBounds, input ignored') - num_min = 1 - if low is None: - min_ = 0 - max_ = a.shape[axis] - low_max = a.shape[axis]-num_min - while True: - if axis: - quickImshow(a[:,min_:max_], title=title, aspect='auto', - extent=[min_,max_,a.shape[0],0]) - else: - quickImshow(a[min_:max_,:], title=title, aspect='auto', - extent=[0,a.shape[1], max_,min_]) - zoom_flag = pyip.inputInt('Set lower data bound (0) or zoom in (1)?: ', - min=0, max=1) - if zoom_flag: - min_ = pyip.inputInt(f' Set lower zoom index [0, {low_max}]: ', - min=0, max=low_max) - max_ = pyip.inputInt(f' Set upper zoom index [{min_+1}, {low_max+1}]: ', - min=min_+1, max=low_max+1) - else: - low = pyip.inputInt(f' Set lower data bound [0, {low_max}]: ', - min=0, max=low_max) - break - else: - if not is_int(low, 0, a.shape[axis]-num_min): - illegal_value('low', low, 'selectImageBounds') - return None - if upp is None: - min_ = low+num_min - max_ = a.shape[axis] - upp_min = min_ - while True: - if axis: - quickImshow(a[:,min_:max_], title=title, aspect='auto', - extent=[min_,max_,a.shape[0],0]) - else: - quickImshow(a[min_:max_,:], title=title, aspect='auto', - extent=[0,a.shape[1], max_,min_]) - zoom_flag = pyip.inputInt('Set upper data bound (0) or zoom in (1)?: ', - min=0, max=1) - if zoom_flag: - min_ = pyip.inputInt(f' Set upper zoom index [{upp_min}, {a.shape[axis]-1}]: ', - min=upp_min, max=a.shape[axis]-1) - max_ = pyip.inputInt(f' Set upper zoom index [{min_+1}, {a.shape[axis]}]: ', - min=min_+1, max=a.shape[axis]) - else: - upp = pyip.inputInt(f' Set upper data bound [{upp_min}, {a.shape[axis]}]: ', - min=upp_min, max=a.shape[axis]) - break - else: - if not is_int(upp, low+num_min, a.shape[axis]): - illegal_value('upp', upp, 'selectImageBounds') - return None - bounds = [low, upp] - a_tmp = np.copy(a) - a_tmp_max = a.max() - if axis: - a_tmp[:,bounds[0]] = a_tmp_max - a_tmp[:,bounds[1]-1] = a_tmp_max - else: - a_tmp[bounds[0],:] = a_tmp_max - a_tmp[bounds[1]-1,:] = a_tmp_max - print(f'lower bound = {low} (inclusive)\nupper bound = {upp} (exclusive)') - quickImshow(a_tmp, title=title) - del a_tmp - if pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True) == 'no': - bounds = selectImageBounds(a, axis, low=low_save, upp=upp_save, num_min=num_min_save, - title=title) - return bounds - -def fitStep(x=None, y=None, model='step', form='arctan'): - if not isinstance(y, np.ndarray) or y.ndim != 1: - logging.error('Illegal array type or dimension for y in fitStep') - return - if isinstance(x, type(None)): - x = np.array(range(y.size)) - elif not isinstance(x, np.ndarray) or x.ndim != 1 or x.size != y.size: - logging.error('Illegal array type or dimension for x in fitStep') - return - if not isinstance(model, str) or not model in ('step', 'rectangle'): - illegal_value('model', model, 'fitStepModel') - return - if not isinstance(form, str) or not form in ('linear', 'atan', 'arctan', 'erf', 'logistic'): - illegal_value('form', form, 'fitStepModel') - return - - if model == 'step': - mod = StepModel(form=form) - else: - mod = RectangleModel(form=form) - pars = mod.guess(y, x=x) - out = mod.fit(y, pars, x=x) - #print(f'fit_report:\n{out.fit_report()}') - #quickPlot((x,y),(x,out.best_fit)) - return out.best_values - -class Config: - """Base class for processing a config file or dictionary. - """ - def __init__(self, config_file=None, config_dict=None): - self.config = {} - self.load_flag = False - self.suffix = None - - # Load config file - if config_file is not None and config_dict is not None: - logging.warning('Ignoring config_dict (both config_file and config_dict are specified)') - if config_file is not None: - self.loadFile(config_file) - elif config_dict is not None: - self.loadDict(config_dict) - - def loadFile(self, config_file): - """Load a config file. - """ - if self.load_flag: - logging.warning('Overwriting any previously loaded config file') - self.config = {} - - # Ensure config file exists - if not os.path.isfile(config_file): - logging.error(f'Unable to load {config_file}') - return - - # Load config file (for now for Galaxy, allow .dat extension) - self.suffix = os.path.splitext(config_file)[1] - if self.suffix == '.yml' or self.suffix == '.yaml' or self.suffix == '.dat': - with open(config_file, 'r') as f: - self.config = yaml.safe_load(f) - elif self.suffix == '.txt': - with open(config_file, 'r') as f: - lines = f.read().splitlines() - self.config = {item[0].strip():literal_eval(item[1].strip()) for item in - [line.split('#')[0].split('=') for line in lines if '=' in line.split('#')[0]]} - else: - logging.error(f'Illegal config file extension: {self.suffix}') - - # Make sure config file was correctly loaded - if isinstance(self.config, dict): - self.load_flag = True - else: - logging.error(f'Unable to load dictionary from config file: {config_file}') - self.config = {} - - def loadDict(self, config_dict): - """Takes a dictionary and places it into self.config. - """ - exit('loadDict not tested yet, what format do we follow: txt or yaml?') - if self.load_flag: - logging.warning('Overwriting the previously loaded config file') - - if isinstance(config_dict, dict): - self.config = config_dict - self.load_flag = True - else: - logging.error(f'Illegal dictionary config object: {config_dict}') - self.config = {} - - def saveFile(self, config_file): - """Save the config file (as a yaml file only right now). - """ - suffix = os.path.splitext(config_file)[1] - if suffix != '.yml' and suffix != '.yaml': - logging.error(f'Illegal config file extension: {suffix}') - - # Check if config file exists - if os.path.isfile(config_file): - logging.info(f'Updating {config_file}') - else: - logging.info(f'Saving {config_file}') - - # Save config file - with open(config_file, 'w') as f: - yaml.dump(self.config, f) - - def validate(self, pars_required, pars_missing=None): - """Returns False if any required first level keys are missing. - """ - if not self.load_flag: - logging.error('Load a config file prior to calling Config.validate') - pars = [p for p in pars_required if p not in self.config] - if isinstance(pars_missing, list): - pars_missing.extend(pars) - elif pars_missing is not None: - illegal_value('pars_missing', pars_missing, 'Config.validate') - if len(pars) > 0: - return False - return True - -#RV FIX this is for a txt file, obsolete? -# def update_txt(self, config_file, key, value, search_string=None, header=None): -# if not self.load_flag: -# logging.error('Load a config file prior to calling Config.update') -# -# if not os.path.isfile(config_file): -# logging.error(f'Unable to load {config_file}') -# lines = [] -# else: -# with open(config_file, 'r') as f: -# lines = f.read().splitlines() -# config = {item[0].strip():literal_eval(item[1].strip()) for item in -# [line.split('#')[0].split('=') for line in lines if '=' in line.split('#')[0]]} -# if not isinstance(key, str): -# illegal_value('key', key, 'Config.update') -# return config -# if isinstance(value, str): -# newline = f"{key} = '{value}'" -# else: -# newline = f'{key} = {value}' -# if key in config.keys(): -# # Update key with value -# for index,line in enumerate(lines): -# if '=' in line: -# item = line.split('#')[0].split('=') -# if item[0].strip() == key: -# lines[index] = newline -# break -# else: -# # Insert new key/value pair -# if search_string is not None: -# if isinstance(search_string, str): -# search_string = [search_string] -# elif not isinstance(search_string, (tuple, list)): -# illegal_value('search_string', search_string, 'Config.update') -# search_string = None -# update_flag = False -# if search_string is not None: -# indices = [[index for index,line in enumerate(lines) if item in line] -# for item in search_string] -# for i,index in enumerate(indices): -# if index: -# if len(search_string) > 1 and key < search_string[i]: -# lines.insert(index[0], newline) -# else: -# lines.insert(index[0]+1, newline) -# update_flag = True -# break -# if not update_flag: -# if isinstance(header, str): -# lines += ['', header, newline] -# else: -# lines += ['', newline] -# # Write updated config file -# with open(config_file, 'w') as f: -# for line in lines: -# f.write(f'{line}\n') -# # Update loaded config -# config['key'] = value -# -#RV update and bring into Config if needed again -#def search(config_file, search_string): -# if not os.path.isfile(config_file): -# logging.error(f'Unable to load {config_file}') -# return False -# with open(config_file, 'r') as f: -# lines = f.read() -# if search_string in lines: -# return True -# return False - -class Detector: - """Class for processing a detector info file or dictionary. - """ - def __init__(self, detector_id): - self.detector = {} - self.load_flag = False - self.validate_flag = False - - # Load detector file - self.loadFile(detector_id) - - def loadFile(self, detector_id): - """Load a detector file. - """ - if self.load_flag: - logging.warning('Overwriting the previously loaded detector file') - self.detector = {} - - # Ensure detector file exists - if not isinstance(detector_id, str): - illegal_value('detector_id', detector_id, 'Detector.loadFile') - return - detector_file = f'{detector_id}.yaml' - if not os.path.isfile(detector_file): - detector_file = self.config['detector_id']+'.yaml' - if not os.path.isfile(detector_file): - logging.error(f'Unable to load detector info file for {detector_id}') - return - - # Load detector file - with open(detector_file, 'r') as f: - self.detector = yaml.safe_load(f) - - # Make sure detector file was correctly loaded - if isinstance(self.detector, dict): - self.load_flag = True - else: - logging.error(f'Unable to load dictionary from detector file: {detector_file}') - self.detector = {} - - def validate(self): - """Returns False if any config parameters is illegal or missing. - """ - if not self.load_flag: - logging.error('Load a detector file prior to calling Detector.validate') - - # Check for required first-level keys - pars_required = ['detector', 'lens_magnification'] - pars_missing = [p for p in pars_required if p not in self.detector] - if len(pars_missing) > 0: - logging.error(f'Missing item(s) in detector file: {", ".join(pars_missing)}') - return False - - is_valid = True - - # Check detector pixel config keys - pixels = self.detector['detector'].get('pixels') - if not pixels: - pars_missing.append('detector:pixels') - else: - rows = pixels.get('rows') - if not rows: - pars_missing.append('detector:pixels:rows') - columns = pixels.get('columns') - if not columns: - pars_missing.append('detector:pixels:columns') - size = pixels.get('size') - if not size: - pars_missing.append('detector:pixels:size') - - if not len(pars_missing): - self.validate_flag = True - else: - is_valid = False - - return is_valid - - def getPixelSize(self): - """Returns the detector pixel size. - """ - if not self.validate_flag: - logging.error('Validate detector file info prior to calling Detector.getPixelSize') - - lens_magnification = self.detector.get('lens_magnification') - if not isinstance(lens_magnification, (int,float)) or lens_magnification <= 0.: - illegal_value('lens_magnification', lens_magnification, 'detector file') - return 0 - pixel_size = self.detector['detector'].get('pixels').get('size') - if isinstance(pixel_size, (int,float)): - if pixel_size <= 0.: - illegal_value('pixel_size', pixel_size, 'detector file') - return 0 - pixel_size /= lens_magnification - elif isinstance(pixel_size, list): - if ((len(pixel_size) > 2) or - (len(pixel_size) == 2 and pixel_size[0] != pixel_size[1])): - illegal_value('pixel size', pixel_size, 'detector file') - return 0 - elif not is_num(pixel_size[0], 0.): - illegal_value('pixel size', pixel_size, 'detector file') - return 0 - else: - pixel_size = pixel_size[0]/lens_magnification - else: - illegal_value('pixel size', pixel_size, 'detector file') - return 0 - - return pixel_size - - def getDimensions(self): - """Returns the detector pixel dimensions. - """ - if not self.validate_flag: - logging.error('Validate detector file info prior to calling Detector.getDimensions') - - pixels = self.detector['detector'].get('pixels') - num_rows = pixels.get('rows') - if not is_int(num_rows, 1): - illegal_value('rows', num_rows, 'detector file') - return (0, 0) - num_columns = pixels.get('columns') - if not is_int(num_columns, 1): - illegal_value('columns', num_columns, 'detector file') - return (0, 0) - - return num_rows, num_columns
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/read_image.py Thu Jul 28 16:05:24 2022 +0000 @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import logging + +import sys +import argparse +import numpy as np + +def __main__(): + + # Parse command line arguments + parser = argparse.ArgumentParser( + description='Read a reconstructed image') + parser.add_argument('-i', '--input_image', + help='Reconstructed image file') + parser.add_argument('-l', '--log', + type=argparse.FileType('w'), + default=sys.stdout, + help='Log file') + args = parser.parse_args() + + # Set basic log configuration + logging_format = '%(asctime)s : %(levelname)s - %(module)s : %(funcName)s - %(message)s' + log_level = 'INFO' + level = getattr(logging, log_level.upper(), None) + if not isinstance(level, int): + raise ValueError(f'Invalid log_level: {log_level}') + logging.basicConfig(format=logging_format, level=level, force=True, + handlers=[logging.StreamHandler()]) + + logging.info(f'input_image = {args.input_image}') + logging.debug(f'log = {args.log}') + logging.debug(f'is log stdout? {args.log is sys.stdout}') + + # Load image + f = np.load(args.input_image) + logging.info(f'f shape = {f.shape}') + +if __name__ == "__main__": + __main__() +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/read_image.xml Thu Jul 28 16:05:24 2022 +0000 @@ -0,0 +1,20 @@ +<tool id="read_image" name="Read Reconstructed Image" version="0.1.0" python_template_version="3.9"> + <description>Read reconstructed image</description> + <macros> + <import>tomo_macros.xml</import> + </macros> + <command detect_errors="exit_code"><![CDATA[ + $__tool_directory__/read_image.py + -i '$recon_image' + -l '$log' + ]]></command> + <inputs> + <param name="recon_image" type='data' format='npz' optional='false' label="Reconstructed image"/> + </inputs> + <outputs> + </outputs> + <help><![CDATA[ + Read reconstructed tomography image. + ]]></help> + <expand macro="citations"/> +</tool>
--- a/tomo.py Wed Apr 27 17:28:26 2022 +0000 +++ b/tomo.py Thu Jul 28 16:05:24 2022 +0000 @@ -14,10 +14,6 @@ import getopt import re import io -try: - import pyinputplus as pyip -except: - pass import argparse import numpy as np try: @@ -43,7 +39,13 @@ except: pass -import msnc_tools as msnc +from detector import TomoDetectorConfig +from fit import Fit +#from msnctools.general import illegal_value, is_int, is_num, is_index_range, get_trailing_int, \ +from general import illegal_value, is_int, is_num, is_index_range, get_trailing_int, \ + input_int, input_num, input_yesno, input_menu, findImageFiles, loadImageStack, clearPlot, \ + draw_mask_1d, quickPlot, clearImshow, quickImshow, combine_tiffs_in_h5, Config +from general import selectArrayBounds,selectImageRange, selectImageBounds # the following tomopy routines don't run with more than 24 cores on Galaxy-Dev # - tomopy.find_center_vo @@ -68,7 +70,7 @@ def __exit__(self, exc_type, exc_value, traceback): ne.set_num_threads(self.num_core_org) -class ConfigTomo(msnc.Config): +class ConfigTomo(Config): """Class for processing a config file. """ @@ -96,9 +98,9 @@ # Check number of tomography image stacks self.num_tomo_stacks = self.config.get('num_tomo_stacks', 1) - if not msnc.is_int(self.num_tomo_stacks, 1): + if not is_int(self.num_tomo_stacks, 1): self.num_tomo_stacks = None - msnc.illegal_value('num_tomo_stacks', self.num_tomo_stacks, 'config file') + illegal_value(self.num_tomo_stacks, 'num_tomo_stacks', 'config file') return False logging.info(f'num_tomo_stacks = {self.num_tomo_stacks}') @@ -109,8 +111,8 @@ logging.error(f'Incorrect number of tomography data path names in config file') is_valid = False self.tomo_data_paths = [tomo_data_paths_indices[i][1] for i in range(self.num_tomo_stacks)] - self.tomo_data_indices = [msnc.get_trailing_int(tomo_data_paths_indices[i][0]) - if msnc.get_trailing_int(tomo_data_paths_indices[i][0]) else None + self.tomo_data_indices = [get_trailing_int(tomo_data_paths_indices[i][0]) + if get_trailing_int(tomo_data_paths_indices[i][0]) else None for i in range(self.num_tomo_stacks)] tomo_ref_height_indices = sorted({key:value for key,value in self.config.items() if 'z_pos' in key}.items()) @@ -125,18 +127,18 @@ # Check tomo angle (theta) range self.start_theta = self.config.get('start_theta', 0.) - if not msnc.is_num(self.start_theta, 0.): - msnc.illegal_value('start_theta', self.start_theta, 'config file') + if not is_num(self.start_theta, 0.): + illegal_value(self.start_theta, 'start_theta', 'config file') is_valid = False logging.debug(f'start_theta = {self.start_theta}') self.end_theta = self.config.get('end_theta', 180.) - if not msnc.is_num(self.end_theta, self.start_theta): - msnc.illegal_value('end_theta', self.end_theta, 'config file') + if not is_num(self.end_theta, self.start_theta): + illegal_value(self.end_theta, 'end_theta', 'config file') is_valid = False logging.debug(f'end_theta = {self.end_theta}') self.num_thetas = self.config.get('num_thetas') - if not (self.num_thetas is None or msnc.is_int(self.num_thetas, 1)): - msnc.illegal_value('num_thetas', self.num_thetas, 'config file') + if not (self.num_thetas is None or is_int(self.num_thetas, 1)): + illegal_value(self.num_thetas, 'num_thetas', 'config file') self.num_thetas = None is_valid = False logging.debug(f'num_thetas = {self.num_thetas}') @@ -165,16 +167,16 @@ # Check number of tomography image stacks stack_info = self.config['stack_info'] self.num_tomo_stacks = stack_info.get('num', 1) - if not msnc.is_int(self.num_tomo_stacks, 1): + if not is_int(self.num_tomo_stacks, 1): self.num_tomo_stacks = None - msnc.illegal_value('stack_info:num', self.num_tomo_stacks, 'config file') + illegal_value(self.num_tomo_stacks, 'num_tomo_stacks', 'config file') return False logging.info(f'num_tomo_stacks = {self.num_tomo_stacks}') # Find tomography images file/folders and stack parameters stacks = stack_info.get('stacks') if stacks is None or len(stacks) is not self.num_tomo_stacks: - msnc.illegal_value('stack_info:stacks', stacks, 'config file') + illegal_value(stacks, 'stacks', 'config file') return False self.tomo_data_paths = [] self.tomo_data_indices = [] @@ -192,18 +194,18 @@ self.num_thetas = None else: self.start_theta = theta_range.get('start', 0.) - if not msnc.is_num(self.start_theta, 0.): - msnc.illegal_value('theta_range:start', self.start_theta, 'config file') + if not is_num(self.start_theta, 0.): + illegal_value(self.start_theta, 'theta_range:start', 'config file') is_valid = False logging.debug(f'start_theta = {self.start_theta}') self.end_theta = theta_range.get('end', 180.) - if not msnc.is_num(self.end_theta, self.start_theta): - msnc.illegal_value('theta_range:end', self.end_theta, 'config file') + if not is_num(self.end_theta, self.start_theta): + illegal_value(self.end_theta, 'theta_range:end', 'config file') is_valid = False logging.debug(f'end_theta = {self.end_theta}') self.num_thetas = theta_range.get('num') - if self.num_thetas and not msnc.is_int(self.num_thetas, 1): - msnc.illegal_value('theta_range:num', self.num_thetas, 'config file') + if self.num_thetas and not is_int(self.num_thetas, 1): + illegal_value(self.num_thetas, 'theta_range:num', 'config file') self.num_thetas = None is_valid = False logging.debug(f'num_thetas = {self.num_thetas}') @@ -218,7 +220,7 @@ # Check work_folder (shared by both file formats) work_folder = os.path.abspath(self.config.get('work_folder', '')) if not os.path.isdir(work_folder): - msnc.illegal_value('work_folder', work_folder, 'config file') + illegal_value(work_folder, 'work_folder', 'config file') is_valid = False logging.info(f'work_folder: {work_folder}') @@ -226,7 +228,7 @@ self.data_filetype = self.config.get('data_filetype', 'tif') if not isinstance(self.data_filetype, str) or (self.data_filetype != 'tif' and self.data_filetype != 'h5'): - msnc.illegal_value('data_filetype', self.data_filetype, 'config file') + illegal_value(self.data_filetype, 'data_filetype', 'config file') if self.suffix == '.yml' or self.suffix == '.yaml': is_valid = self._validate_yaml() @@ -244,7 +246,7 @@ self.tdf_data_path = os.path.abspath( f'{work_folder}/{self.tdf_data_path}') else: - msnc.illegal_value('tdf_data_path', tdf_data_fil, 'config file') + illegal_value(tdf_data_fil, 'tdf_data_path', 'config file') is_valid = False else: if isinstance(self.tdf_data_path, int): @@ -255,7 +257,7 @@ self.tdf_data_path = os.path.abspath( f'{work_folder}/{self.tdf_data_path}') else: - msnc.illegal_value('tdf_data_path', self.tdf_data_path, 'config file') + illegal_value(self.tdf_data_path, 'tdf_data_path', 'config file') is_valid = False logging.info(f'dark field images path = {self.tdf_data_path}') @@ -267,7 +269,7 @@ self.tbf_data_path = os.path.abspath( f'{work_folder}/{self.tbf_data_path}') else: - msnc.illegal_value('tbf_data_path', tbf_data_fil, 'config file') + illegal_value(tbf_data_fil, 'tbf_data_path', 'config file') is_valid = False else: if isinstance(self.tbf_data_path, int): @@ -278,7 +280,7 @@ self.tbf_data_path = os.path.abspath( f'{work_folder}/{self.tbf_data_path}') else: - msnc.illegal_value('tbf_data_path', self.tbf_data_path, 'config file') + illegal_value(self.tbf_data_path, 'tbf_data_path', 'config file') is_valid = False logging.info(f'bright field images path = {self.tbf_data_path}') @@ -293,7 +295,7 @@ if not os.path.isabs(data_path): data_path = os.path.abspath(f'{work_folder}/{data_path}') else: - msnc.illegal_value(f'stack_info:stacks:data_path', data_path, 'config file') + illegal_value(data_path, 'stack_info:stacks:data_path', 'config file') is_valid = False data_path = None else: @@ -303,7 +305,7 @@ if not os.path.isabs(data_path): data_path = os.path.abspath(f'{work_folder}/{data_path}') else: - msnc.illegal_value(f'stack_info:stacks:data_path', data_path, 'config file') + illegal_value(data_path, 'stack_info:stacks:data_path', 'config file') is_valid = False data_path = None tomo_data_paths.append(data_path) @@ -315,7 +317,7 @@ else: index = 1 elif not isinstance(index, int): - msnc.illegal_value(f'stack_info:stacks:index', index, 'config file') + illegal_value(index, 'stack_info:stacks:index', 'config file') is_valid = False index = None tomo_data_indices.append(index) @@ -326,13 +328,13 @@ ref_height = None else: ref_height = 0. - elif not msnc.is_num(ref_height): - msnc.illegal_value(f'stack_info:stacks:ref_height', ref_height, 'config file') + elif not is_num(ref_height): + illegal_value(ref_height, 'stack_info:stacks:ref_height', 'config file') is_valid = False ref_height = None # Set reference heights relative to first stack - if (len(tomo_ref_heights) and msnc.is_num(ref_height) and - msnc.is_num(tomo_ref_heights[0])): + if (len(tomo_ref_heights) and is_num(ref_height) and + is_num(tomo_ref_heights[0])): ref_height = (round(ref_height-tomo_ref_heights[0], 3)) tomo_ref_heights.append(ref_height) tomo_ref_heights[0] = 0.0 @@ -436,7 +438,7 @@ else: raise OSError(f'Invalid config_out input {config_out} {type(config_out)}') if not os.path.isdir(output_folder): - raise OSError(f'Invalid output_folder input {output_folder} {type(output_folder)}') + os.mkdir(os.path.abspath(output_folder)) if isinstance(log_stream, str): path = os.path.split(log_stream)[0] if path and not os.path.isdir(path): @@ -515,12 +517,12 @@ self.is_valid = self.cf.validate() # Load detector info file - df = msnc.Detector(self.cf.config['detector']['id']) + df = TomoDetectorConfig(self.cf.config['detector']['id']) # Check detector info file parameters - if df.validate(): - pixel_size = df.getPixelSize() - num_rows, num_columns = df.getDimensions() + if df.valid: + pixel_size = df.pixel_size + num_rows, num_columns = df.dimensions if not pixel_size or not num_rows or not num_columns: self.is_valid = False else: @@ -577,9 +579,9 @@ # Check number of tomography angles/thetas num_thetas = self.config['theta_range'].get('num') if num_thetas is None: - num_thetas = pyip.inputInt('\nEnter the number of thetas (>0): ', greaterThan=0) - elif not msnc.is_int(num_thetas, 0): - msnc.illegal_value('num_thetas', num_thetas, 'config file') + num_thetas = input_int('\nEnter the number of thetas', 1) + elif not is_int(num_thetas, 0): + illegal_value(num_thetas, 'num_thetas', 'config file') self.is_valid = False return self.config['theta_range']['num'] = num_thetas @@ -591,7 +593,7 @@ img_offset = dark_field.get('img_offset', -1) num_imgs = dark_field.get('num', 0) if not self.test_mode: - img_start, img_offset, num_imgs = msnc.selectImageRange(img_start, img_offset, + img_start, img_offset, num_imgs = selectImageRange(img_start, img_offset, num_imgs, 'dark field') if img_start < 0 or num_imgs < 1: logging.error('Unable to find suitable dark field images') @@ -610,7 +612,7 @@ img_offset = bright_field.get('img_offset', -1) num_imgs = bright_field.get('num', 0) if not self.test_mode: - img_start, img_offset, num_imgs = msnc.selectImageRange(img_start, img_offset, + img_start, img_offset, num_imgs = selectImageRange(img_start, img_offset, num_imgs, 'bright field') if img_start < 0 or num_imgs < 1: logging.error('Unable to find suitable bright field images') @@ -632,7 +634,7 @@ img_offset = stack.get('img_offset', -1) num_imgs = stack.get('num', 0) if not self.test_mode: - img_start, img_offset, num_imgs = msnc.selectImageRange(img_start, img_offset, + img_start, img_offset, num_imgs = selectImageRange(img_start, img_offset, num_imgs, f'tomography stack {index}', num_thetas) if img_start < 0 or num_imgs != num_thetas: logging.error('Unable to find suitable tomography images') @@ -656,7 +658,7 @@ # Load the dark field images logging.debug('Loading dark field...') dark_field = self.config['dark_field'] - tdf_stack = msnc.loadImageStack(tdf_files, self.config['data_filetype'], + tdf_stack = loadImageStack(tdf_files, self.config['data_filetype'], dark_field['img_offset'], dark_field['num']) # Take median @@ -671,10 +673,10 @@ logging.debug(f'tdf_mean = {tdf_mean}') np.nan_to_num(self.tdf, copy=False, nan=tdf_mean, posinf=tdf_mean, neginf=0.) if self.galaxy_flag: - msnc.quickImshow(self.tdf, title='dark field', path='setup_pngs', + quickImshow(self.tdf, title='dark field', path='setup_pngs', save_fig=True, save_only=True) elif not self.test_mode: - msnc.quickImshow(self.tdf, title='dark field', path=self.output_folder, + quickImshow(self.tdf, title='dark field', path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only) def _genBright(self, tbf_files): @@ -683,7 +685,7 @@ # Load the bright field images logging.debug('Loading bright field...') bright_field = self.config['bright_field'] - tbf_stack = msnc.loadImageStack(tbf_files, self.config['data_filetype'], + tbf_stack = loadImageStack(tbf_files, self.config['data_filetype'], bright_field['img_offset'], bright_field['num']) # Take median @@ -705,10 +707,10 @@ else: logging.warning('Dark field unavailable') if self.galaxy_flag: - msnc.quickImshow(self.tbf, title='bright field', path='setup_pngs', + quickImshow(self.tbf, title='bright field', path='setup_pngs', save_fig=True, save_only=True) elif not self.test_mode: - msnc.quickImshow(self.tbf, title='bright field', path=self.output_folder, + quickImshow(self.tbf, title='bright field', path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only) def _setDetectorBounds(self, tomo_stack_files): @@ -721,10 +723,10 @@ img_x_bounds = preprocess.get('img_x_bounds', [0, self.tbf.shape[0]]) if img_x_bounds[0] is not None and img_x_bounds[1] is not None: if img_x_bounds[0] < 0: - msnc.illegal_value('img_x_bounds[0]', img_x_bounds[0], 'config file') + illegal_value(img_x_bounds[0], 'preprocess:img_x_bounds[0]', 'config file') img_x_bounds[0] = 0 - if not msnc.is_index_range(img_x_bounds, 0, self.tbf.shape[0]): - msnc.illegal_value('img_x_bounds[1]', img_x_bounds[1], 'config file') + if not is_index_range(img_x_bounds, 0, self.tbf.shape[0]): + illegal_value(img_x_bounds[1], 'preprocess:img_x_bounds[1]', 'config file') img_x_bounds[1] = self.tbf.shape[0] if self.test_mode: # Update config and save to file @@ -763,11 +765,13 @@ x_low = 0 x_upp = x_sum.size if num_x_min is not None: - fit = msnc.fitStep(y=x_sum, model='rectangle', form='atan') - x_low = fit.get('center1', None) - x_upp = fit.get('center2', None) - sig_low = fit.get('sigma1', None) - sig_upp = fit.get('sigma2', None) + fit = Fit.fit_data(np.array(range(len(x_sum))), x_sum, models='rectangle', + form='atan', guess=True) + parameters = fit.best_values + x_low = parameters.get('center1', None) + x_upp = parameters.get('center2', None) + sig_low = parameters.get('sigma1', None) + sig_upp = parameters.get('sigma2', None) if (x_low is not None and x_upp is not None and sig_low is not None and sig_upp is not None and 0 <= x_low < x_upp <= x_sum.size and (sig_low+sig_upp)/(x_upp-x_low) < 0.1): @@ -784,14 +788,14 @@ else: x_low = 0 x_upp = x_sum.size - msnc.quickPlot((range(x_sum.size), x_sum), + quickPlot((range(x_sum.size), x_sum), ([x_low, x_low], [x_sum_min, x_sum_max], 'r-'), ([x_upp-1, x_upp-1], [x_sum_min, x_sum_max], 'r-'), title=f'sum bright field over theta/y (row bounds: [{x_low}, {x_upp}])', path='setup_pngs', name='detectorbounds.png', save_fig=True, save_only=True, show_grid=True) for i,stack in enumerate(stacks): - tomo_stack = msnc.loadImageStack(tomo_stack_files[i], self.config['data_filetype'], + tomo_stack = loadImageStack(tomo_stack_files[i], self.config['data_filetype'], stack['img_offset'], 1) tomo_stack = tomo_stack[0,:,:] if num_x_min is not None: @@ -799,7 +803,7 @@ tomo_stack[x_low,:] = tomo_stack_max tomo_stack[x_upp-1,:] = tomo_stack_max title = f'tomography image at theta={self.config["theta_range"]["start"]}' - msnc.quickImshow(tomo_stack, title=title, path='setup_pngs', + quickImshow(tomo_stack, title=title, path='setup_pngs', name=f'tomo_{stack["index"]}.png', save_fig=True, save_only=True, show_grid=True) del tomo_stack @@ -817,47 +821,47 @@ # For one tomography stack only: load the first image title = None - msnc.quickImshow(self.tbf, title='bright field') + quickImshow(self.tbf, title='bright field') if num_tomo_stacks == 1: - tomo_stack = msnc.loadImageStack(tomo_stack_files[0], self.config['data_filetype'], + tomo_stack = loadImageStack(tomo_stack_files[0], self.config['data_filetype'], stacks[0]['img_offset'], 1) title = f'tomography image at theta={self.config["theta_range"]["start"]}' - msnc.quickImshow(tomo_stack[0,:,:], title=title) - tomo_or_bright = pyip.inputNum('\nSelect image bounds from bright field (0) or '+ - 'from first tomography image (1): ', min=0, max=1) + quickImshow(tomo_stack[0,:,:], title=title) + tomo_or_bright = input_menu(['bright field', 'first tomography image'], + header='\nSelect image bounds from') else: print('\nSelect image bounds from bright field') tomo_or_bright = 0 if tomo_or_bright: x_sum = np.sum(tomo_stack[0,:,:], 1) - use_bounds = 'no' + use_bounds = False if img_x_bounds[0] is not None and img_x_bounds[1] is not None: tmp = np.copy(tomo_stack[0,:,:]) tmp_max = tmp.max() tmp[img_x_bounds[0],:] = tmp_max tmp[img_x_bounds[1]-1,:] = tmp_max title = f'tomography image at theta={self.config["theta_range"]["start"]}' - msnc.quickImshow(tmp, title=title) + quickImshow(tmp, title=title) del tmp x_sum_min = x_sum.min() x_sum_max = x_sum.max() - msnc.quickPlot((range(x_sum.size), x_sum), + quickPlot((range(x_sum.size), x_sum), ([img_x_bounds[0], img_x_bounds[0]], [x_sum_min, x_sum_max], 'r-'), ([img_x_bounds[1]-1, img_x_bounds[1]-1], [x_sum_min, x_sum_max], 'r-'), title='sum over theta and y') print(f'lower bound = {img_x_bounds[0]} (inclusive)\n'+ f'upper bound = {img_x_bounds[1]} (exclusive)]') - use_bounds = pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True) - if use_bounds == 'no': - img_x_bounds = msnc.selectImageBounds(tomo_stack[0,:,:], 0, + use_bounds = input_yesno('Accept these bounds (y/n)?', 'y') + if not use_bounds: + img_x_bounds = list(selectImageBounds(tomo_stack[0,:,:], 0, img_x_bounds[0], img_x_bounds[1], num_x_min, - f'tomography image at theta={self.config["theta_range"]["start"]}') + f'tomography image at theta={self.config["theta_range"]["start"]}')) if num_x_min is not None and img_x_bounds[1]-img_x_bounds[0]+1 < num_x_min: logging.warning('Image bounds and pixel size prevent seamless stacking') title = f'tomography image at theta={self.config["theta_range"]["start"]}' - msnc.quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder, + quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder, save_fig=self.save_plots, save_only=True) - msnc.quickPlot(range(img_x_bounds[0], img_x_bounds[1]), + quickPlot(range(img_x_bounds[0], img_x_bounds[1]), x_sum[img_x_bounds[0]:img_x_bounds[1]], title='sum over theta and y', path=self.output_folder, save_fig=self.save_plots, save_only=True) @@ -865,29 +869,31 @@ x_sum = np.sum(self.tbf, 1) x_sum_min = x_sum.min() x_sum_max = x_sum.max() - use_bounds = 'no' + use_bounds = False if img_x_bounds[0] is not None and img_x_bounds[1] is not None: tmp = np.copy(self.tbf) tmp_max = tmp.max() tmp[img_x_bounds[0],:] = tmp_max tmp[img_x_bounds[1]-1,:] = tmp_max title = 'bright field' - msnc.quickImshow(tmp, title=title) + quickImshow(tmp, title=title) del tmp - msnc.quickPlot((range(x_sum.size), x_sum), + quickPlot((range(x_sum.size), x_sum), ([img_x_bounds[0], img_x_bounds[0]], [x_sum_min, x_sum_max], 'r-'), ([img_x_bounds[1]-1, img_x_bounds[1]-1], [x_sum_min, x_sum_max], 'r-'), title='sum over theta and y') print(f'lower bound = {img_x_bounds[0]} (inclusive)\n'+ f'upper bound = {img_x_bounds[1]} (exclusive)]') - use_bounds = pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True) - if use_bounds == 'no': - use_fit = 'no' - fit = msnc.fitStep(y=x_sum, model='rectangle', form='atan') - x_low = fit.get('center1', None) - x_upp = fit.get('center2', None) - sig_low = fit.get('sigma1', None) - sig_upp = fit.get('sigma2', None) + use_bounds = input_yesno('Accept these bounds (y/n)?', 'y') + if not use_bounds: + use_fit = False + fit = Fit.fit_data(np.array(range(len(x_sum))), x_sum, models='rectangle', + form='atan', guess=True) + parameters = fit.best_values + x_low = parameters.get('center1', None) + x_upp = parameters.get('center2', None) + sig_low = parameters.get('sigma1', None) + sig_upp = parameters.get('sigma2', None) if (x_low is not None and x_upp is not None and sig_low is not None and sig_upp is not None and 0 <= x_low < x_upp <= x_sum.size and (sig_low+sig_upp)/(x_upp-x_low) < 0.1): @@ -906,34 +912,50 @@ tmp[x_low,:] = tmp_max tmp[x_upp-1,:] = tmp_max title = 'bright field' - msnc.quickImshow(tmp, title=title) + quickImshow(tmp, title=title) del tmp - msnc.quickPlot((range(x_sum.size), x_sum), + quickPlot((range(x_sum.size), x_sum), ([x_low, x_low], [x_sum_min, x_sum_max], 'r-'), ([x_upp, x_upp], [x_sum_min, x_sum_max], 'r-'), title='sum over theta and y') print(f'lower bound = {x_low} (inclusive)') print(f'upper bound = {x_upp} (exclusive)]') - use_fit = pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True) - if use_fit == 'no': - img_x_bounds = msnc.selectArrayBounds(x_sum, img_x_bounds[0], img_x_bounds[1], - num_x_min, 'sum over theta and y') + use_fit = input_yesno('Accept these bounds (y/n)?', 'y') + if use_fit: + img_x_bounds = [x_low, x_upp] else: - img_x_bounds = [x_low, x_upp] + accept = False + while not accept: + mask, img_x_bounds = draw_mask_1d(x_sum, title='select x data range', + legend='sum over theta and y') + print(f'img_x_bounds = {img_x_bounds}') + while (len(img_x_bounds) != 1 or (len(x_sum) >= num_x_min and + img_x_bounds[0][1]-img_x_bounds[0][0]+1 < num_x_min)): + exit('Should not be here') + print('Please select exactly one continuous range') + mask, img_x_bounds = draw_mask_1d(x_sum, title='select x data range', + legend='sum over theta and y') + img_x_bounds = list(img_x_bounds[0]) + quickPlot(x_sum, vlines=img_x_bounds, title='sum over theta and y') + print(f'img_x_bounds = {img_x_bounds} (lower bound inclusive, upper bound '+ + 'exclusive)') + accept = input_yesno('Accept these bounds (y/n)?', 'y') + if not accept: + img_x_bounds = None if num_x_min is not None and img_x_bounds[1]-img_x_bounds[0]+1 < num_x_min: logging.warning('Image bounds and pixel size prevent seamless stacking') - #msnc.quickPlot(range(img_x_bounds[0], img_x_bounds[1]), + #quickPlot(range(img_x_bounds[0], img_x_bounds[1]), # x_sum[img_x_bounds[0]:img_x_bounds[1]], # title='sum over theta and y', path=self.output_folder, # save_fig=self.save_plots, save_only=True) - msnc.quickPlot((range(x_sum.size), x_sum), + quickPlot((range(x_sum.size), x_sum), ([img_x_bounds[0], img_x_bounds[0]], [x_sum_min, x_sum_max], 'r-'), ([img_x_bounds[1], img_x_bounds[1]], [x_sum_min, x_sum_max], 'r-'), title='sum over theta and y', path=self.output_folder, save_fig=self.save_plots, save_only=True) del x_sum for i,stack in enumerate(stacks): - tomo_stack = msnc.loadImageStack(tomo_stack_files[i], self.config['data_filetype'], + tomo_stack = loadImageStack(tomo_stack_files[i], self.config['data_filetype'], stack['img_offset'], 1) tomo_stack = tomo_stack[0,:,:] if num_x_min is not None: @@ -942,21 +964,21 @@ tomo_stack[img_x_bounds[1]-1,:] = tomo_stack_max title = f'tomography image at theta={self.config["theta_range"]["start"]}' if self.galaxy_flag: - msnc.quickImshow(tomo_stack, title=title, path='setup_pngs', + quickImshow(tomo_stack, title=title, path='setup_pngs', name=f'tomo_{stack["index"]}.png', save_fig=True, save_only=True, show_grid=True) else: - msnc.quickImshow(tomo_stack, title=title, path=self.output_folder, + quickImshow(tomo_stack, title=title, path=self.output_folder, name=f'tomo_{stack["index"]}.png', save_fig=self.save_plots, save_only=True, show_grid=True) del tomo_stack logging.debug(f'img_x_bounds: {img_x_bounds}') if self.save_plots_only: - msnc.clearFig('bright field') - msnc.clearFig('sum over theta and y') + clearImshow('bright field') + clearPlot('sum over theta and y') if title: - msnc.clearFig(title) + clearPlot(title) # Update config and save to file if preprocess is None: @@ -972,30 +994,26 @@ zoom_perc = 100 if not self.galaxy_flag: if preprocess is None or 'zoom_perc' not in preprocess: - if pyip.inputYesNo( - '\nDo you want to zoom in to reduce memory requirement (y/[n])? ', - blank=True) == 'yes': - zoom_perc = pyip.inputInt(' Enter zoom percentage [1, 100]: ', - min=1, max=100) + if input_yesno('\nDo you want to zoom in to reduce memory requirement (y/n)?', 'n'): + zoom_perc = input_int(' Enter zoom percentage', 1, 100) else: zoom_perc = preprocess['zoom_perc'] - if msnc.is_num(zoom_perc, 1., 100.): + if is_num(zoom_perc, 1., 100.): zoom_perc = int(zoom_perc) else: - msnc.illegal_value('zoom_perc', zoom_perc, 'config file') + illegal_value(zoom_perc, 'preprocess:zoom_perc', 'config file') zoom_perc = 100 num_theta_skip = 0 if not self.galaxy_flag: if preprocess is None or 'num_theta_skip' not in preprocess: - if pyip.inputYesNo( - 'Do you want to skip thetas to reduce memory requirement (y/[n])? ', - blank=True) == 'yes': - num_theta_skip = pyip.inputInt(' Enter the number skip theta interval'+ - f' [0, {self.num_thetas-1}]: ', min=0, max=self.num_thetas-1) + if input_yesno('Do you want to skip thetas to reduce memory requirement (y/n)?', + 'n'): + num_theta_skip = input_int(' Enter the number skip theta interval', 0, + self.num_thetas-1) else: num_theta_skip = preprocess['num_theta_skip'] - if not msnc.is_int(num_theta_skip, 0): - msnc.illegal_value('num_theta_skip', num_theta_skip, 'config file') + if not is_int(num_theta_skip, 0): + illegal_value(num_theta_skip, 'preprocess:num_theta_skip', 'config file') num_theta_skip = 0 logging.debug(f'zoom_perc = {zoom_perc}') logging.debug(f'num_theta_skip = {num_theta_skip}') @@ -1023,16 +1041,16 @@ title = f'{base_name} {zoom_perc}p' title += f'_{index}' tomo_file = re.sub(r"\s+", '_', f'{self.output_folder}/{title}.npy') - load_flag = 'no' + load_flag = False available = False if os.path.isfile(tomo_file): available = True if required: - load_flag = 'yes' + load_flag = True else: - load_flag = pyip.inputYesNo(f'\nDo you want to load {tomo_file} (y/n)? ') + load_flag = input_yesno(f'\nDo you want to load {tomo_file} (y/n)?') stack = np.array([]) - if load_flag == 'yes': + if load_flag: t0 = time() logging.info(f'Loading {tomo_file} ...') try: @@ -1042,7 +1060,7 @@ logging.error(f'Error loading {tomo_file}') logging.info(f'... done in {time()-t0:.2f} seconds!') if stack.size: - msnc.quickImshow(stack[:,0,:], title=title, path=self.output_folder, + quickImshow(stack[:,0,:], title=title, path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only) return stack, available @@ -1106,7 +1124,7 @@ # Load a stack of tomography images index = stack['index'] t0 = time() - tomo_stack = msnc.loadImageStack(tomo_stack_files[i], self.config['data_filetype'], + tomo_stack = loadImageStack(tomo_stack_files[i], self.config['data_filetype'], stack['img_offset'], self.config['theta_range']['num'], num_theta_skip, img_x_bounds, img_y_bounds) tomo_stack = tomo_stack.astype('float64') @@ -1145,7 +1163,7 @@ if not self.galaxy_flag: title = f'red stack fullres {index}' if not self.test_mode: - msnc.quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder, + quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only) if zoom_perc != 100: t0 = time() @@ -1160,7 +1178,7 @@ if not self.galaxy_flag: title = f'red stack {zoom_perc}p {index}' if not self.test_mode: - msnc.quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder, + quickImshow(tomo_stack[0,:,:], title=title, path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only) # Convert tomography stack from theta,row,column to row,theta,column @@ -1214,7 +1232,7 @@ logging.debug(f'sinogram range = [{dist_from_edge}, {two_offset-dist_from_edge}]') sinogram = tomo_plane_T[dist_from_edge:two_offset-dist_from_edge,:] if plot_sinogram and not self.test_mode: - msnc.quickImshow(sinogram.T, f'sinogram center offset{center_offset:.2f}', + quickImshow(sinogram.T, f'sinogram center offset{center_offset:.2f}', path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only, aspect='auto') @@ -1244,14 +1262,14 @@ vmax = np.max(edges[0,:,:]) vmin = -vmax if self.galaxy_flag: - msnc.quickImshow(edges[0,:,:], title, path=path, save_fig=True, save_only=True, + quickImshow(edges[0,:,:], title, path=path, save_fig=True, save_only=True, cmap='gray', vmin=vmin, vmax=vmax) else: if path is None: path=self.output_folder - msnc.quickImshow(edges[0,:,:], f'{title} coolwarm', path=path, + quickImshow(edges[0,:,:], f'{title} coolwarm', path=path, save_fig=self.save_plots, save_only=self.save_plots_only, cmap='coolwarm') - msnc.quickImshow(edges[0,:,:], f'{title} gray', path=path, + quickImshow(edges[0,:,:], f'{title} gray', path=path, save_fig=self.save_plots, save_only=self.save_plots_only, cmap='gray', vmin=vmin, vmax=vmax) del edges @@ -1303,8 +1321,7 @@ title = f'edges row{row} center offset{center_offset_vo:.2f} Vo' self._plotEdgesOnePlane(recon_plane, title) if not self.galaxy_flag: - if (pyip.inputYesNo('Try finding center using phase correlation '+ - '(y/[n])? ', blank=True) == 'yes'): + if input_yesno('Try finding center using phase correlation (y/n)?', 'n'): tomo_center = tomopy.find_center_pc(sinogram, sinogram, tol=0.1, rotc_guess=tomo_center) error = 1. @@ -1319,13 +1336,9 @@ eff_pixel_size, cross_sectional_dim, False, num_core) title = f'edges row{row} center_offset{center_offset:.2f} PC' self._plotEdgesOnePlane(recon_plane, title) - if (pyip.inputYesNo('Accept a center location ([y]) or continue '+ - 'search (n)? ', blank=True) != 'no'): - center_offset = pyip.inputNum( - f' Enter chosen center offset [{-int(center)}, {int(center)}] '+ - f'([{center_offset_vo:.2f}])): ', blank=True) - if center_offset == '': - center_offset = center_offset_vo + if input_yesno('Accept a center location (y) or continue search (n)?', 'y'): + center_offset = input_num(' Enter chosen center offset', -center, center, + center_offset_vo) del sinogram_T del recon_plane return float(center_offset) @@ -1338,7 +1351,7 @@ set_center = galaxy_param['set_center'] set_range = galaxy_param['set_range'] center_offset_step = galaxy_param['set_step'] - if (not msnc.is_num(set_range, 0) or not msnc.is_num(center_offset_step) or + if (not is_num(set_range, 0) or not is_num(center_offset_step) or center_offset_step <= 0): logging.warning('Illegal center finding search parameter, skip search') del sinogram_T @@ -1347,17 +1360,15 @@ center_offset_low = set_center-set_range center_offset_upp = set_center+set_range else: - center_offset_low = pyip.inputInt('\nEnter lower bound for center offset '+ - f'[{-int(center)}, {int(center)}]: ', min=-int(center), max=int(center)) - center_offset_upp = pyip.inputInt('Enter upper bound for center offset '+ - f'[{center_offset_low}, {int(center)}]: ', - min=center_offset_low, max=int(center)) + center_offset_low = input_int('\nEnter lower bound for center offset', + -center, center) + center_offset_upp = input_int('Enter upper bound for center offset', + center_offset_low, center) if center_offset_upp == center_offset_low: center_offset_step = 1 else: - center_offset_step = pyip.inputInt('Enter step size for center offset search '+ - f'[1, {center_offset_upp-center_offset_low}]: ', - min=1, max=center_offset_upp-center_offset_low) + center_offset_step = input_int('Enter step size for center offset search', + 1, center_offset_upp-center_offset_low) num_center_offset = 1+int((center_offset_upp-center_offset_low)/center_offset_step) center_offsets = np.linspace(center_offset_low, center_offset_upp, num_center_offset) for center_offset in center_offsets: @@ -1373,8 +1384,7 @@ self._plotEdgesOnePlane(recon_plane, title, path='find_center_pngs') else: self._plotEdgesOnePlane(recon_plane, title) - if self.galaxy_flag or pyip.inputInt('\nContinue (0) or end the search (1): ', - min=0, max=1): + if self.galaxy_flag or input_int('\nContinue (0) or end the search (1)', 0, 1): break del sinogram_T @@ -1382,8 +1392,7 @@ if self.galaxy_flag: center_offset = center_offset_vo else: - center_offset = pyip.inputNum(f' Enter chosen center offset '+ - f'[{-int(center)}, {int(center)}]: ', min=-int(center), max=int(center)) + center_offset = input_num(' Enter chosen center offset', -center, center) return float(center_offset) def _reconstructOneTomoStack(self, tomo_stack, thetas, row_bounds=None, @@ -1402,7 +1411,7 @@ if row_bounds is None: row_bounds = [0, tomo_stack.shape[0]] else: - if not msnc.is_index_range(row_bounds, 0, tomo_stack.shape[0]): + if not is_index_range(row_bounds, 0, tomo_stack.shape[0]): raise ValueError('Illegal row bounds in reconstructOneTomoStack') if thetas.size != tomo_stack.shape[1]: raise ValueError('theta dimension mismatch in reconstructOneTomoStack') @@ -1467,7 +1476,7 @@ # Find dark field images dark_field = self.config['dark_field'] - img_start, num_imgs, dark_files = msnc.findImageFiles( + img_start, num_imgs, dark_files = findImageFiles( dark_field['data_path'], self.config['data_filetype'], 'dark field') if img_start < 0 or num_imgs < 1: logging.error('Unable to find suitable dark field images') @@ -1492,13 +1501,13 @@ logging.info(f'Number of dark field images = {dark_field["num"]}') logging.info(f'Dark field image start index = {dark_field["img_start"]}') if num_imgs and tiff_to_h5_flag and self.config['data_filetype'] == 'tif': - dark_files = msnc.combine_tiffs_in_h5(dark_files, num_imgs, + dark_files = combine_tiffs_in_h5(dark_files, num_imgs, f'{self.config["work_folder"]}/dark_field.h5') dark_field['data_path'] = dark_files[0] # Find bright field images bright_field = self.config['bright_field'] - img_start, num_imgs, bright_files = msnc.findImageFiles( + img_start, num_imgs, bright_files = findImageFiles( bright_field['data_path'], self.config['data_filetype'], 'bright field') if img_start < 0 or num_imgs < 1: logging.error('Unable to find suitable bright field images') @@ -1520,7 +1529,7 @@ logging.info(f'Number of bright field images = {bright_field["num"]}') logging.info(f'Bright field image start index = {bright_field["img_start"]}') if num_imgs and tiff_to_h5_flag and self.config['data_filetype'] == 'tif': - bright_files = msnc.combine_tiffs_in_h5(bright_files, num_imgs, + bright_files = combine_tiffs_in_h5(bright_files, num_imgs, f'{self.config["work_folder"]}/bright_field.h5') bright_field['data_path'] = bright_files[0] @@ -1528,7 +1537,7 @@ tomo_stack_files = [] for stack in self.config['stack_info']['stacks']: index = stack['index'] - img_start, num_imgs, tomo_files = msnc.findImageFiles( + img_start, num_imgs, tomo_files = findImageFiles( stack['data_path'], self.config['data_filetype'], f'tomography set {index}') if img_start < 0 or num_imgs < 1: logging.error('Unable to find suitable tomography images') @@ -1550,7 +1559,7 @@ logging.info(f'Number of tomography images for set {index} = {stack["num"]}') logging.info(f'Tomography set {index} image start index = {stack["img_start"]}') if num_imgs and tiff_to_h5_flag and self.config['data_filetype'] == 'tif': - tomo_files = msnc.combine_tiffs_in_h5(tomo_files, num_imgs, + tomo_files = combine_tiffs_in_h5(tomo_files, num_imgs, f'{self.config["work_folder"]}/tomo_field_{index}.h5') stack['data_path'] = tomo_files[0] tomo_stack_files.append(tomo_files) @@ -1709,15 +1718,15 @@ center_stack_index = find_center['center_stack_index'] if (not isinstance(center_stack_index, int) or center_stack_index not in available_stacks): - msnc.illegal_value('center_stack_index', center_stack_index, 'config file') + illegal_value(center_stack_index, 'find_center:center_stack_index', 'config file') else: if self.test_mode: assert(find_center.get('completed', False) == False) else: print('\nFound calibration center offset info for stack '+ f'{center_stack_index}') - if (pyip.inputYesNo('Do you want to use this again ([y]/n)? ', - blank=True) != 'no' and find_center.get('completed', False) == True): + if (input_yesno('Do you want to use this again (y/n)?', 'y') + and find_center.get('completed', False) == True): return # Load the required preprocessed stack @@ -1749,11 +1758,11 @@ else: while True: if not center_stack_index: - center_stack_index = pyip.inputInt('\nEnter tomography stack index to get ' - f'rotation axis centers {available_stacks}: ') + center_stack_index = input_int('\nEnter tomography stack index to get ' + f'rotation axis centers ({available_stacks})', inset=available_stacks) while center_stack_index not in available_stacks: - center_stack_index = pyip.inputInt('\nEnter tomography stack index to get ' - f'rotation axis centers {available_stacks}: ') + center_stack_index = input_int('\nEnter tomography stack index to get ' + f'rotation axis centers ({available_stacks})', inset=available_stacks) tomo_stack_index = available_stacks.index(center_stack_index) if not self.tomo_stacks[tomo_stack_index].size: self.tomo_stacks[tomo_stack_index], available = self._loadTomo( @@ -1785,8 +1794,8 @@ center_rows[0] = 0 if center_rows[1] == -1: center_rows[1] = center_stack.shape[0]-1 - if not msnc.is_index_range(row_bounds, 0, center_stack.shape[0]): - msnc.illegal_value('row_bounds', row_bounds) + if not is_index_range(row_bounds, 0, center_stack.shape[0]): + illegal_value(row_bounds, 'row_bounds', 'Tomo:findCenters') return # Set thetas (in degrees) @@ -1806,27 +1815,27 @@ eff_pixel_size = 100.*pixel_size/zoom_perc logging.debug(f'eff_pixel_size = {eff_pixel_size}') if num_tomo_stacks == 1: - accept = 'yes' + accept = True if not self.test_mode and not self.galaxy_flag: - accept = 'no' + accept = False print('\nSelect bounds for image reconstruction') - if msnc.is_index_range(row_bounds, 0, center_stack.shape[0]): + if is_index_range(row_bounds, 0, center_stack.shape[0]): a_tmp = np.copy(center_stack[:,0,:]) a_tmp_max = a_tmp.max() a_tmp[row_bounds[0],:] = a_tmp_max a_tmp[row_bounds[1]-1,:] = a_tmp_max print(f'lower bound = {row_bounds[0]} (inclusive)') print(f'upper bound = {row_bounds[1]} (exclusive)') - msnc.quickImshow(a_tmp, title=f'center stack theta={theta_start}', + quickImshow(a_tmp, title=f'center stack theta={theta_start}', aspect='auto') del a_tmp - accept = pyip.inputYesNo('Accept these bounds ([y]/n)?: ', blank=True) - if accept == 'no': - (n1, n2) = msnc.selectImageBounds(center_stack[:,0,:], 0, - title=f'center stack theta={theta_start}') - else: + accept = input_yesno('Accept these bounds (y/n)?', 'y') + if accept: n1 = row_bounds[0] n2 = row_bounds[1] + else: + n1, n2 = selectImageBounds(center_stack[:,0,:], 0, + title=f'center stack theta={theta_start}') else: tomo_ref_heights = [stack['ref_height'] for stack in stacks] n1 = int((1.+(tomo_ref_heights[0]+center_stack.shape[0]*eff_pixel_size- @@ -1838,16 +1847,16 @@ tmp_max = tmp.max() tmp[n1,:] = tmp_max tmp[n2-1,:] = tmp_max - if msnc.is_index_range(center_rows, 0, tmp.shape[0]): + if is_index_range(center_rows, 0, tmp.shape[0]): tmp[center_rows[0],:] = tmp_max tmp[center_rows[1]-1,:] = tmp_max extent = [0, tmp.shape[1], tmp.shape[0], 0] - msnc.quickImshow(tmp, title=f'center stack theta={theta_start}', + quickImshow(tmp, title=f'center stack theta={theta_start}', path=self.output_folder, extent=extent, save_fig=self.save_plots, save_only=self.save_plots_only, aspect='auto') del tmp #extent = [0, center_stack.shape[2], n2, n1] - #msnc.quickImshow(center_stack[n1:n2,0,:], title=f'center stack theta={theta_start}', + #quickImshow(center_stack[n1:n2,0,:], title=f'center stack theta={theta_start}', # path=self.output_folder, extent=extent, save_fig=self.save_plots, # save_only=self.save_plots_only, show_grid=True, aspect='auto') @@ -1859,36 +1868,35 @@ logging.info('Determine center offset at sample row boundaries') # Lower row center - use_row = 'no' - use_center = 'no' + use_row = False + use_center = False row = center_rows[0] if self.test_mode or self.galaxy_flag: - assert(msnc.is_int(row, n1, n2-2)) - if msnc.is_int(row, n1, n2-2): + assert(is_int(row, n1, n2-2)) + if is_int(row, n1, n2-2): if self.test_mode or self.galaxy_flag: - use_row = 'yes' + use_row = True else: - msnc.quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto') - use_row = pyip.inputYesNo('\nCurrent row index for lower center = ' - f'{row}, use this value ([y]/n)? ', blank=True) + quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto') + use_row = input_yesno('\nCurrent row index for lower center = ' + f'{row}, use this value (y/n)?', 'y') if self.save_plots_only: - msnc.clearFig(f'theta={theta_start}') - if use_row != 'no': + clearImshow(f'theta={theta_start}') + if use_row: center_offset = find_center.get('lower_center_offset') - if msnc.is_num(center_offset): - use_center = pyip.inputYesNo('Current lower center offset = '+ - f'{center_offset}, use this value ([y]/n)? ', blank=True) - if use_center == 'no': - if use_row == 'no': + if is_num(center_offset): + use_center = input_yesno('Current lower center offset = '+ + f'{center_offset}, use this value (y/n)?', 'y') + if not use_center: + if not use_row: if not self.test_mode: - msnc.quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', + quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto') - row = pyip.inputInt('\nEnter row index to find lower center '+ - f'[[{n1}], {n2-2}]: ', min=n1, max=n2-2, blank=True) + row = input_int('\nEnter row index to find lower center', n1, n2-2, n1) if row == '': row = n1 if self.save_plots_only: - msnc.clearFig(f'theta={theta_start}') + clearImshow(f'theta={theta_start}') # center_stack order: row,theta,column center_offset = self._findCenterOnePlane(center_stack[row,:,:], row, thetas_deg, eff_pixel_size, cross_sectional_dim, num_core=num_core, @@ -1903,36 +1911,35 @@ lower_row = row # Upper row center - use_row = 'no' - use_center = 'no' + use_row = False + use_center = False row = center_rows[1] if self.test_mode or self.galaxy_flag: - assert(msnc.is_int(row, lower_row+1, n2-1)) - if msnc.is_int(row, lower_row+1, n2-1): + assert(is_int(row, lower_row+1, n2-1)) + if is_int(row, lower_row+1, n2-1): if self.test_mode or self.galaxy_flag: - use_row = 'yes' + use_row = True else: - msnc.quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto') - use_row = pyip.inputYesNo('\nCurrent row index for upper center = ' - f'{row}, use this value ([y]/n)? ', blank=True) + quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto') + use_row = input_yesno('\nCurrent row index for upper center = ' + f'{row}, use this value (y/n)?', 'y') if self.save_plots_only: - msnc.clearFig(f'theta={theta_start}') - if use_row != 'no': + clearImshow(f'theta={theta_start}') + if use_row: center_offset = find_center.get('upper_center_offset') - if msnc.is_num(center_offset): - use_center = pyip.inputYesNo('Current upper center offset = '+ - f'{center_offset}, use this value ([y]/n)? ', blank=True) - if use_center == 'no': - if use_row == 'no': + if is_num(center_offset): + use_center = input_yesrnNo('Current upper center offset = '+ + f'{center_offset}, use this value (y/n)?', 'y') + if not use_center: + if not use_row: if not self.test_mode: - msnc.quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', + quickImshow(center_stack[:,0,:], title=f'theta={theta_start}', aspect='auto') - row = pyip.inputInt('\nEnter row index to find upper center '+ - f'[{lower_row+1}, [{n2-1}]]: ', min=lower_row+1, max=n2-1, blank=True) + row = input_int('\nEnter row index to find upper center', lower_row+1, n2-1, n2-1) if row == '': row = n2-1 if self.save_plots_only: - msnc.clearFig(f'theta={theta_start}') + clearImshow(f'theta={theta_start}') # center_stack order: row,theta,column center_offset = self._findCenterOnePlane(center_stack[row,:,:], row, thetas_deg, eff_pixel_size, cross_sectional_dim, num_core=num_core, @@ -1992,7 +1999,7 @@ for i in range(-2, 3): shift_i = shift+2*i plane1_shifted = spi.shift(plane2, [0, shift_i]) - msnc.quickPlot((plane1[0,:],), (plane1_shifted[0,:],), + quickPlot((plane1[0,:],), (plane1_shifted[0,:],), title=f'stacks {stack1} {stack2} shifted {2*i} theta={self.start_theta}', path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only) @@ -2011,14 +2018,14 @@ for i in range(-2, 3): shift_i = -shift+2*i plane1_shifted = spi.shift(plane2, [0, shift_i]) - msnc.quickPlot((plane1[0,:],), (plane1_shifted[0,:],), + quickPlot((plane1[0,:],), (plane1_shifted[0,:],), title=f'stacks {stack1} {stack2} shifted {2*i} theta={start_theta}', path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only) del plane1, plane2, plane1_shifted # Update config file - self.config = msnc.update('config.txt', 'check_centers', True, 'find_centers') + self.config = update('config.txt', 'check_centers', True, 'find_centers') def reconstructTomoStacks(self, galaxy_param=None, num_core=None): """Reconstruct tomography stacks. @@ -2041,9 +2048,9 @@ center_offsets = galaxy_param['center_offsets'] assert(isinstance(center_offsets, list) and len(center_offsets) == 2) lower_center_offset = center_offsets[0] - assert(msnc.is_num(lower_center_offset)) + assert(is_num(lower_center_offset)) upper_center_offset = center_offsets[1] - assert(msnc.is_num(upper_center_offset)) + assert(is_num(upper_center_offset)) else: if galaxy_param: logging.warning('Ignoring galaxy_param in reconstructTomoStacks (only for Galaxy)') @@ -2128,23 +2135,23 @@ if self.galaxy_flag: x_slice = int(self.tomo_stacks[i].shape[0]/2) title = f'{basetitle} {index} xslice{x_slice}' - msnc.quickImshow(self.tomo_recon_stacks[i][x_slice,:,:], title=title, + quickImshow(self.tomo_recon_stacks[i][x_slice,:,:], title=title, path='center_slice_pngs', save_fig=True, save_only=True) y_slice = int(self.tomo_stacks[i].shape[0]/2) title = f'{basetitle} {index} yslice{y_slice}' - msnc.quickImshow(self.tomo_recon_stacks[i][:,y_slice,:], title=title, + quickImshow(self.tomo_recon_stacks[i][:,y_slice,:], title=title, path='center_slice_pngs', save_fig=True, save_only=True) z_slice = int(self.tomo_stacks[i].shape[0]/2) title = f'{basetitle} {index} zslice{z_slice}' - msnc.quickImshow(self.tomo_recon_stacks[i][:,:,z_slice], title=title, + quickImshow(self.tomo_recon_stacks[i][:,:,z_slice], title=title, path='center_slice_pngs', save_fig=True, save_only=True) elif not self.test_mode: x_slice = int(self.tomo_stacks[i].shape[0]/2) title = f'{basetitle} {index} xslice{x_slice}' - msnc.quickImshow(self.tomo_recon_stacks[i][x_slice,:,:], title=title, + quickImshow(self.tomo_recon_stacks[i][x_slice,:,:], title=title, path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only) - msnc.quickPlot(self.tomo_recon_stacks[i] + quickPlot(self.tomo_recon_stacks[i] [x_slice,int(self.tomo_recon_stacks[i].shape[2]/2),:], title=f'{title} cut{int(self.tomo_recon_stacks[i].shape[2]/2)}', path=self.output_folder, save_fig=self.save_plots, @@ -2173,21 +2180,21 @@ tomosum = 0 [tomosum := tomosum+np.sum(tomo_recon_stack, axis=(0,2)) for tomo_recon_stack in self.tomo_recon_stacks] - msnc.quickPlot(tomosum, title='recon stack sum yz', path='center_slice_pngs', + quickPlot(tomosum, title='recon stack sum yz', path='center_slice_pngs', save_fig=True, save_only=True) # Create cross section profile in xz-plane tomosum = 0 [tomosum := tomosum+np.sum(tomo_recon_stack, axis=(0,1)) for tomo_recon_stack in self.tomo_recon_stacks] - msnc.quickPlot(tomosum, title='recon stack sum xz', path='center_slice_pngs', + quickPlot(tomosum, title='recon stack sum xz', path='center_slice_pngs', save_fig=True, save_only=True) # Create cross section profile in xy-plane num_tomo_stacks = len(stacks) row_bounds = self.config['find_center']['row_bounds'] - if not msnc.is_index_range(row_bounds, 0, self.tomo_recon_stacks[0].shape[0]): - msnc.illegal_value('row_bounds', row_bounds, 'config file') + if not is_index_range(row_bounds, 0, self.tomo_recon_stacks[0].shape[0]): + illegal_value(row_bounds, 'find_center:row_bounds', 'config file') return if num_tomo_stacks == 1: low_bound = row_bounds[0] @@ -2202,7 +2209,7 @@ tomosum = np.concatenate([tomosum, np.sum(self.tomo_recon_stacks[num_tomo_stacks-1][row_bounds[0]:,:,:], axis=(1,2))]) - msnc.quickPlot(tomosum, title='recon stack sum xy', path='center_slice_pngs', + quickPlot(tomosum, title='recon stack sum xy', path='center_slice_pngs', save_fig=True, save_only=True) def combineTomoStacks(self, galaxy_param=None): @@ -2250,8 +2257,8 @@ # Get center stack boundaries row_bounds = self.config['find_center']['row_bounds'] - if not msnc.is_index_range(row_bounds, 0, self.tomo_recon_stacks[0].shape[0]): - msnc.illegal_value('row_bounds', row_bounds, 'config file') + if not is_index_range(row_bounds, 0, self.tomo_recon_stacks[0].shape[0]): + illegal_value(row_bounds, 'find_center:row_bounds', 'config file') return # Selecting x bounds (in yz-plane) @@ -2265,41 +2272,57 @@ x_bounds[0] = 0 if x_bounds[1] == -1: x_bounds[1] = tomosum.size - if not msnc.is_index_range(x_bounds, 0, tomosum.size): - msnc.illegal_value('x_bounds', x_bounds, 'galaxy input') + if not is_index_range(x_bounds, 0, tomosum.size): + illegal_value(x_bounds, 'x_bounds', 'galaxy input') tomosum_min = tomosum.min() tomosum_max = tomosum.max() - msnc.quickPlot((range(tomosum.size), tomosum), + quickPlot((range(tomosum.size), tomosum), ([x_bounds[0], x_bounds[0]], [tomosum_min, tomosum_max], 'r-'), ([x_bounds[1]-1, x_bounds[1]-1], [tomosum_min, tomosum_max], 'r-'), title=f'recon stack sum yz', path='combine_pngs', save_fig=True, save_only=True) else: if combine_stacks and 'x_bounds' in combine_stacks: x_bounds = combine_stacks['x_bounds'] - if not msnc.is_index_range(x_bounds, 0, tomosum.size): - msnc.illegal_value('x_bounds', x_bounds, 'config file') + if not is_index_range(x_bounds, 0, tomosum.size): + illegal_value(x_bounds, 'combine_stacks:x_bounds', 'config file') elif not self.test_mode: - msnc.quickPlot(tomosum, title='recon stack sum yz') - if pyip.inputYesNo('\nCurrent image x-bounds: '+ - f'[{x_bounds[0]}, {x_bounds[1]}], use these values ([y]/n)? ', - blank=True) == 'no': - if pyip.inputYesNo( - 'Do you want to change the image x-bounds ([y]/n)? ', - blank=True) == 'no': - x_bounds = [0, tomosum.size] - else: - x_bounds = msnc.selectArrayBounds(tomosum, title='recon stack sum yz') + accept = False + while not accept: + mask, x_bounds = draw_mask_1d(tomosum, current_index_ranges=x_bounds, + title='select x data range', legend='recon stack sum yz') + while len(x_bounds) != 1: + print('Please select exactly one continuous range') + mask, x_bounds = draw_mask_1d(tomosum, title='select x data range', + legend='recon stack sum yz') + x_bounds = list(x_bounds[0]) + quickPlot(tomosum, vlines=x_bounds, title='recon stack sum yz') + print(f'x_bounds = {x_bounds} (lower bound inclusive, upper bound '+ + 'exclusive)') + accept = input_yesno('Accept these bounds (y/n)?', 'y') + if not accept: + x_bounds = None else: - msnc.quickPlot(tomosum, title='recon stack sum yz') - if pyip.inputYesNo('\nDo you want to change the image x-bounds (y/n)? ') == 'no': + if not input_yesno('\nDo you want to change the image x-bounds (y/n)?', 'n'): x_bounds = [0, tomosum.size] else: - x_bounds = msnc.selectArrayBounds(tomosum, title='recon stack sum yz') + accept = False + while not accept: + mask, x_bounds = draw_mask_1d(tomosum, title='select x data range', + legend='recon stack sum yz') + while len(x_bounds) != 1: + print('Please select exactly one continuous range') + mask, x_bounds = draw_mask_1d(tomosum, title='select x data range', + legend='recon stack sum yz') + x_bounds = list(x_bounds[0]) + quickPlot(tomosum, vlines=x_bounds, title='recon stack sum yz') + print(f'x_bounds = {x_bounds} (lower bound inclusive, upper bound '+ + 'exclusive)') + accept = input_yesno('Accept these bounds (y/n)?', 'y') if False and self.test_mode: np.savetxt(f'{self.output_folder}/recon_stack_sum_yz.txt', tomosum[x_bounds[0]:x_bounds[1]], fmt='%.6e') if self.save_plots_only: - msnc.clearFig('recon stack sum yz') + clearPlot('recon stack sum yz') # Selecting y bounds (in xz-plane) tomosum = 0 @@ -2311,41 +2334,57 @@ y_bounds[0] = 0 if y_bounds[1] == -1: y_bounds[1] = tomosum.size - if not msnc.is_index_range(y_bounds, 0, tomosum.size): - msnc.illegal_value('y_bounds', y_bounds, 'galaxy input') + if not is_index_range(y_bounds, 0, tomosum.size): + illegal_value(y_bounds, 'y_bounds', 'galaxy input') tomosum_min = tomosum.min() tomosum_max = tomosum.max() - msnc.quickPlot((range(tomosum.size), tomosum), + quickPlot((range(tomosum.size), tomosum), ([y_bounds[0], y_bounds[0]], [tomosum_min, tomosum_max], 'r-'), ([y_bounds[1]-1, y_bounds[1]-1], [tomosum_min, tomosum_max], 'r-'), title=f'recon stack sum xz', path='combine_pngs', save_fig=True, save_only=True) else: if combine_stacks and 'y_bounds' in combine_stacks: y_bounds = combine_stacks['y_bounds'] - if not msnc.is_index_range(y_bounds, 0, tomosum.size): - msnc.illegal_value('y_bounds', y_bounds, 'config file') + if not is_index_range(y_bounds, 0, tomosum.size): + illegal_value(y_bounds, 'combine_stacks:y_bounds', 'config file') elif not self.test_mode: - msnc.quickPlot(tomosum, title='recon stack sum xz') - if pyip.inputYesNo('\nCurrent image y-bounds: '+ - f'[{y_bounds[0]}, {y_bounds[1]}], use these values ([y]/n)? ', - blank=True) == 'no': - if pyip.inputYesNo( - 'Do you want to change the image y-bounds ([y]/n)? ', - blank=True) == 'no': - y_bounds = [0, tomosum.size] - else: - y_bounds = msnc.selectArrayBounds(tomosum, title='recon stack sum yz') + accept = False + while not accept: + mask, y_bounds = draw_mask_1d(tomosum, current_index_ranges=y_bounds, + title='select y data range', legend='recon stack sum xz') + while len(y_bounds) != 1: + print('Please select exactly one continuous range') + mask, y_bounds = draw_mask_1d(tomosum, title='select y data range', + legend='recon stack sum xz') + y_bounds = list(y_bounds[0]) + quickPlot(tomosum, vlines=y_bounds, title='recon stack sum xz') + print(f'y_bounds = {y_bounds} (lower bound inclusive, upper bound '+ + 'exclusive)') + accept = input_yesno('Accept these bounds (y/n)?', 'y') + if not accept: + y_bounds = None else: - msnc.quickPlot(tomosum, title='recon stack sum xz') - if pyip.inputYesNo('\nDo you want to change the image y-bounds (y/n)? ') == 'no': + if not input_yesno('\nDo you want to change the image y-bounds (y/n)?', 'n'): y_bounds = [0, tomosum.size] else: - y_bounds = msnc.selectArrayBounds(tomosum, title='recon stack sum xz') + accept = False + while not accept: + mask, y_bounds = draw_mask_1d(tomosum, title='select y data range', + legend='recon stack sum xz') + while len(y_bounds) != 1: + print('Please select exactly one continuous range') + mask, y_bounds = draw_mask_1d(tomosum, title='select y data range', + legend='recon stack sum xz') + y_bounds = list(y_bounds[0]) + quickPlot(tomosum, vlines=y_bounds, title='recon stack sum xz') + print(f'y_bounds = {y_bounds} (lower bound inclusive, upper bound '+ + 'exclusive)') + accept = input_yesno('Accept these bounds (y/n)?', 'y') if False and self.test_mode: np.savetxt(f'{self.output_folder}/recon_stack_sum_xz.txt', tomosum[y_bounds[0]:y_bounds[1]], fmt='%.6e') if self.save_plots_only: - msnc.clearFig('recon stack sum xz') + clearPlot('recon stack sum xz') # Combine reconstructed tomography stacks logging.info(f'Combining reconstructed stacks ...') @@ -2378,7 +2417,7 @@ filename += ' fullres.dat' else: filename += f' {zoom_perc}p.dat' - msnc.quickPlot(tomosum, title='recon combined sum xy', + quickPlot(tomosum, title='recon combined sum xy', path=self.output_folder, save_fig=self.save_plots, save_only=self.save_plots_only) if False: @@ -2404,24 +2443,32 @@ z_bounds[0] = 0 if z_bounds[1] == -1: z_bounds[1] = tomosum.size - if not msnc.is_index_range(z_bounds, 0, tomosum.size): - msnc.illegal_value('z_bounds', z_bounds, 'galaxy input') + if not is_index_range(z_bounds, 0, tomosum.size): + illegal_value(z_bounds, 'z_bounds', 'galaxy input') tomosum_min = tomosum.min() tomosum_max = tomosum.max() - msnc.quickPlot((range(tomosum.size), tomosum), + quickPlot((range(tomosum.size), tomosum), ([z_bounds[0], z_bounds[0]], [tomosum_min, tomosum_max], 'r-'), ([z_bounds[1]-1, z_bounds[1]-1], [tomosum_min, tomosum_max], 'r-'), title=f'recon stack sum xy', path='combine_pngs', save_fig=True, save_only=True) else: - msnc.quickPlot(tomosum, title='recon combined sum xy') - if pyip.inputYesNo( - '\nDo you want to change the image z-bounds (y/[n])? ', - blank=True) != 'yes': + if not input_yesno('\nDo you want to change the image z-bounds (y/n)?', 'n'): z_bounds = [0, tomosum.size] else: - z_bounds = msnc.selectArrayBounds(tomosum, title='recon combined sum xy') + accept = False + while not accept: + mask, z_bounds = draw_mask_1d(tomosum, title='select z data range', + legend='recon stack sum xy') + while len(z_bounds) != 1: + print('Please select exactly one continuous range') + mask, z_bounds = draw_mask_1d(tomosum, title='select z data range', + legend='recon stack sum xy') + z_bounds = list(z_bounds[0]) + quickPlot(tomosum, vlines=z_bounds, title='recon stack sum xy') + print(f'z_bounds = {z_bounds} (lower bound inclusive, upper bound exclusive)') + accept = input_yesno('Accept these bounds (y/n)?', 'y') if self.save_plots_only: - msnc.clearFig('recon combined sum xy') + clearPlot('recon combined sum xy') if z_bounds[0] != 0 or z_bounds[1] != tomosum.size: tomo_recon_combined = tomo_recon_combined[z_bounds[0]:z_bounds[1],:,:] @@ -2434,13 +2481,13 @@ path = self.output_folder save_fig = self.save_plots save_only = self.save_plots_only - msnc.quickImshow(tomo_recon_combined[int(tomo_recon_combined.shape[0]/2),:,:], + quickImshow(tomo_recon_combined[int(tomo_recon_combined.shape[0]/2),:,:], title=f'recon combined xslice{int(tomo_recon_combined.shape[0]/2)}', path=path, save_fig=save_fig, save_only=save_only) - msnc.quickImshow(tomo_recon_combined[:,int(tomo_recon_combined.shape[1]/2),:], + quickImshow(tomo_recon_combined[:,int(tomo_recon_combined.shape[1]/2),:], title=f'recon combined yslice{int(tomo_recon_combined.shape[1]/2)}', path=path, save_fig=save_fig, save_only=save_only) - msnc.quickImshow(tomo_recon_combined[:,:,int(tomo_recon_combined.shape[2]/2)], + quickImshow(tomo_recon_combined[:,:,int(tomo_recon_combined.shape[2]/2)], title=f'recon combined zslice{int(tomo_recon_combined.shape[2]/2)}', path=path, save_fig=save_fig, save_only=save_only)
--- a/tomo_combine.xml Wed Apr 27 17:28:26 2022 +0000 +++ b/tomo_combine.xml Thu Jul 28 16:05:24 2022 +0000 @@ -1,4 +1,4 @@ -<tool id="tomo_combine" name="Tomo Combine Reconstructed Stacks" version="0.1.0" python_template_version="3.9"> +<tool id="tomo_combine" name="Tomo Combine Reconstructed Stacks" version="0.2.0" python_template_version="3.9"> <description>Combine reconstructed tomography stacks</description> <macros> <import>tomo_macros.xml</import> @@ -38,7 +38,7 @@ <collection name="combine_pngs" type="list" label="Recontructed slices midway in each combined dimension"> <discover_datasets pattern="__name_and_ext__" directory="combine_pngs"/> </collection> - <data name="output_config" format="yaml" label="Output config reconstruction" from_work_dir="output_config.yaml"/> + <data name="output_config" format="yaml" label="Output config combine reconstruction" from_work_dir="output_config.yaml"/> </outputs> <help><![CDATA[ Combine reconstructed tomography images.
--- a/tomo_find_center.xml Wed Apr 27 17:28:26 2022 +0000 +++ b/tomo_find_center.xml Thu Jul 28 16:05:24 2022 +0000 @@ -1,4 +1,4 @@ -<tool id="tomo_find_center" name="Tomo Find Center Axis" version="0.1.2" python_template_version="3.9"> +<tool id="tomo_find_center" name="Tomo Find Center Axis" version="0.2.0" python_template_version="3.9"> <description>Find the center axis for a tomography reconstruction</description> <macros> <import>tomo_macros.xml</import> @@ -38,12 +38,14 @@ <option value="no" selected="true">No</option> <option value="yes">Yes</option> </param> + <when value="no"/> <when value="yes"> <conditional name="center_type"> <param name="center_type_selector" argument="--center_type_selector" type="select" label="Choose the center (C) of the set"> <option value="vo" selected="true">Use the center obtained by Nghia Vo’s method</option> <option value="user">Enter the center of the set</option> </param> + <when value="vo"/> <when value="user"> <param name="set_center" argument="--set_center" type="integer" value="0" label="Center (C) of the set in detector pixels (0: center of the detector row)"/> </when>
--- a/tomo_reconstruct.xml Wed Apr 27 17:28:26 2022 +0000 +++ b/tomo_reconstruct.xml Thu Jul 28 16:05:24 2022 +0000 @@ -1,4 +1,4 @@ -<tool id="tomo_reconstruct" name="Tomo Reconstruction" version="0.1.1" python_template_version="3.9"> +<tool id="tomo_reconstruct" name="Tomo Reconstruction" version="0.2.0" python_template_version="3.9"> <description>Perform a tomography reconstruction</description> <macros> <import>tomo_macros.xml</import>
--- a/tomo_setup.py Wed Apr 27 17:28:26 2022 +0000 +++ b/tomo_setup.py Thu Jul 28 16:05:24 2022 +0000 @@ -11,7 +11,6 @@ import tracemalloc from tomo import Tomo -import msnc_tools as msnc #from memory_profiler import profile #@profile @@ -21,20 +20,25 @@ parser = argparse.ArgumentParser( description='Setup tomography reconstruction') parser.add_argument('-i', '--inputfiles', + nargs='+', default='inputfiles.txt', - help='Input file collections') + help='Input file datasets or collections') parser.add_argument('-c', '--config', help='Input config') + parser.add_argument('-l', '--log', + type=argparse.FileType('w'), + default=sys.stdout, + help='Log file') + parser.add_argument('-t', '--inputfile_types', + nargs='+', + default='collection', + help='Input files type (collection or a list of set types: dark, bright, or data)') parser.add_argument('--theta_range', help='Theta range (lower bound, upper bound, number of angles)') parser.add_argument('--output_config', help='Output config') parser.add_argument('--output_data', help='Preprocessed tomography data') - parser.add_argument('-l', '--log', - type=argparse.FileType('w'), - default=sys.stdout, - help='Log file') parser.add_argument('tomo_ranges', metavar='N', type=int, nargs='+') args = parser.parse_args() @@ -51,43 +55,59 @@ handlers=[logging.StreamHandler()]) logging.debug(f'config = {args.config}') + logging.debug(f'inputfiles = {args.inputfiles}') + logging.debug(f'inputfile_types = {args.inputfile_types}') + logging.debug(f'log = {args.log}') + logging.debug(f'is log stdout? {args.log is sys.stdout}') logging.debug(f'theta_range = {args.theta_range.split()}') logging.debug(f'output_config = {args.output_config}') logging.debug(f'output_data = {args.output_data}') - logging.debug(f'log = {args.log}') - logging.debug(f'is log stdout? {args.log is sys.stdout}') logging.debug(f'tomoranges = {args.tomo_ranges}') - # Read input files and collect data files info + # Check input file type + if isinstance(args.inputfile_types, str) and args.inputfile_types == 'collection': + input_as_collection = True + elif isinstance(args.inputfile_types, list): + input_as_collection = False + else: + raise ValueError(f'Invalid args.inputfile_types: {args.inputfile_types} '+ + f'{type(args.inputfile_types)}') + datasets = [] - with open(args.inputfiles) as cf: - for line in cf: - if not line.strip() or line.startswith('#'): - continue - fields = [x.strip() for x in line.split('\t')] - filepath = fields[0] - element_identifier = fields[1] if len(fields) > 1 else fields[0].split('/')[-1] - datasets.append({'element_identifier' : fields[1], 'filepath' : filepath}) - logging.debug(f'datasets:\n{datasets}') - - # Read and sort data files collections = [] - for dataset in datasets: - element_identifier = [x.strip() for x in dataset['element_identifier'].split('_')] - if len(element_identifier) > 1: - name = element_identifier[0] - else: - name = 'other' - filepath = dataset['filepath'] - if not len(collections): - collections = [{'name' : name, 'filepaths' : [filepath]}] - else: - collection = [c for c in collections if c['name'] == name] - if len(collection): - collection[0]['filepaths'].append(filepath) + if input_as_collection: + # Read input file collections and collect data files info + with open(args.inputfiles) as cf: + for line in cf: + if not line.strip() or line.startswith('#'): + continue + fields = [x.strip() for x in line.split('\t')] + filepath = fields[0] + element_identifier = fields[1] if len(fields) > 1 else fields[0].split('/')[-1] + datasets.append({'element_identifier' : fields[1], 'filepath' : filepath}) + logging.debug(f'datasets:\n{datasets}') + + # Read and sort data files + for dataset in datasets: + element_identifier = [x.strip() for x in dataset['element_identifier'].split('_')] + if len(element_identifier) > 1: + name = element_identifier[0] else: - collection = {'name' : name, 'filepaths' : [filepath]} - collections.append(collection) + name = 'other' + filepath = dataset['filepath'] + if not len(collections): + collections = [{'name' : name, 'filepaths' : [filepath]}] + else: + collection = [c for c in collections if c['name'] == name] + if len(collection): + collection[0]['filepaths'].append(filepath) + else: + collection = {'name' : name, 'filepaths' : [filepath]} + collections.append(collection) + else: + # Collect input file datasets info + collections = [{'name' : filetype, 'filepaths' : [filepath]} + for filetype, filepath in zip(args.inputfile_types, args.inputfiles)] logging.debug(f'collections:\n{collections}') if len(args.tomo_ranges) != 2*len(collections): raise ValueError('Inconsistent tomo ranges size.') @@ -142,7 +162,12 @@ for stack in stack_info['stacks']: stack['img_offset'] = args.tomo_ranges[2*num_collections] stack['num'] = args.tomo_ranges[2*num_collections+1] - tomo_files = [c['filepaths'] for c in collections if c['name'] == f'set{stack["index"]}'] + if input_as_collection: + tomo_files = [c['filepaths'] for c in collections + if c['name'] == f'set{stack["index"]}'] + else: + assert(collections[num_collections]['name'] == 'data') + tomo_files = [collections[num_collections]['filepaths']] if len(tomo_files) != 1 or len(tomo_files[0]) < 1: exit(f'Unable to obtain tomography images for set {stack["index"]}') tomo_stack_files.append(tomo_files[0])
--- a/tomo_setup.xml Wed Apr 27 17:28:26 2022 +0000 +++ b/tomo_setup.xml Thu Jul 28 16:05:24 2022 +0000 @@ -1,28 +1,40 @@ -<tool id="tomo_setup" name="Tomo Setup" version="0.1.2" python_template_version="3.9"> +<tool id="tomo_setup" name="Tomo Setup" version="0.2.0" python_template_version="3.9"> <description>Preprocess tomography images</description> <macros> <import>tomo_macros.xml</import> </macros> <expand macro="requirements" /> - <command detect_errors="exit_code"><![CDATA[ - mkdir setup_pngs; - cp '$inputfiles' inputfiles.txt && - $__tool_directory__/tomo_setup.py - -i inputfiles.txt - -c '$config' - --theta_range '$thetas.theta_start $thetas.theta_end $thetas.num_thetas' - --output_data 'output_data.npz' - --output_config 'output_config.yaml' - -l '$log' -#for $s in $tomo_sets# ${s.offset} ${s.num} #end for - ]]></command> + <command detect_errors="exit_code"> + <![CDATA[ + mkdir setup_pngs; + #if str( $input.type_selector ) == "collection" + cp '$inputfiles' inputfiles.txt && + #end if + $__tool_directory__/tomo_setup.py + #if str( $input.type_selector ) == "collection" + -i inputfiles.txt + -t 'collection' + #else: + -i #for $s in $input.tomo_sets# '${s.inputs}' #end for# + -t #for $s in $input.tomo_sets# '${s.set_type.type_selector}' #end for# + #end if + -c '$config' + --theta_range '$thetas.theta_start $thetas.theta_end $thetas.num_thetas' + --output_data 'output_data.npz' + --output_config 'output_config.yaml' + -l '$log' + #for $s in $input.tomo_sets# ${s.offset} ${s.num} #end for + ]]> + </command> <configfiles> <configfile name="inputfiles"><![CDATA[#slurp -#for $s in $tomo_sets +#if str( $input.type_selector ) == "collection" +#for $s in $input.tomo_sets #for $input in $s.inputs #echo str($input) + '\t' + $input.element_identifier # #end for #end for +#end if ]]></configfile> </configfiles> <inputs> @@ -32,15 +44,43 @@ <param name="theta_end" type="float" min="0.0" max="360.0" value="0.0" label="Upper bound"/> <param name="num_thetas" type="integer" min="1" value="1" label="Number of angles"/> </section> - <repeat name='tomo_sets' title="Tomography image sets"> - <param name="inputs" type="data_collection" label="Image file collection"/> - <param name="offset" type="integer" min="0" value="0" label="Image index offset"/> - <param name="num" type="integer" min="1" value="1" label="Number of images"/> - </repeat> + + <conditional name="input"> + <param name="type_selector" type="select" label="Choose the dataset format"> + <option value="collection">datasets as collections</option> + <option value="file">datasets as files</option> + </param> + <when value="collection"> + <repeat name='tomo_sets' title="Tomography image collections"> + <param name="inputs" type="data_collection" label="Image file collection"/> + <param name="offset" type="integer" min="0" value="0" label="Image index offset"/> + <param name="num" type="integer" min="1" value="1" label="Number of images"/> + </repeat> + </when> + <when value="file"> + <repeat name='tomo_sets' title="Tomography image datasets"> + <conditional name="set_type"> + <param name="type_selector" type="select" label="Choose the dataset type"> + <option value="tdf">dark field</option> + <option value="tbf">bright field</option> + <option value="data">tomography field</option> + </param> + <when value="tdf"/> + <when value="tbf"/> + <when value="data"/> + </conditional> + <param name="inputs" type="data" format='h5' optional='false' label="Image file"/> + <param name="offset" type="integer" min="0" value="0" label="Image index offset"/> + <param name="num" type="integer" min="1" value="1" label="Number of images"/> + </repeat> + </when> + </conditional> </inputs> <outputs> <expand macro="common_outputs"/> - <data name="inputfiles" format="txt" label="Input files" from_work_dir="inputfiles.txt" hidden="true"/> + <data name="inputfiles" format="txt" label="Input files" from_work_dir="inputfiles.txt" hidden="true"> + <filter>input['type_selector'] == 'collection'</filter> + </data> <collection name="setup_pngs" type="list" label="Tomo setup images"> <discover_datasets pattern="__name_and_ext__" directory="setup_pngs"/> </collection>