Mercurial > repos > rv43 > tomo
diff fit.py @ 69:fba792d5f83b draft
planemo upload for repository https://github.com/rolfverberg/galaxytools commit ab9f412c362a4ab986d00e21d5185cfcf82485c1
author | rv43 |
---|---|
date | Fri, 10 Mar 2023 16:02:04 +0000 |
parents | f31ef7bfb430 |
children | 1cf15b61cd83 |
line wrap: on
line diff
--- a/fit.py Fri Aug 19 20:16:56 2022 +0000 +++ b/fit.py Fri Mar 10 16:02:04 2023 +0000 @@ -7,45 +7,118 @@ @author: rv43 """ -import sys -import re import logging -import numpy as np -from asteval import Interpreter +from asteval import Interpreter, get_ast_names from copy import deepcopy -#from lmfit import Minimizer from lmfit import Model, Parameters +from lmfit.model import ModelResult from lmfit.models import ConstantModel, LinearModel, QuadraticModel, PolynomialModel,\ - StepModel, RectangleModel, GaussianModel, LorentzianModel + ExponentialModel, StepModel, RectangleModel, ExpressionModel, GaussianModel,\ + LorentzianModel +import numpy as np +from os import cpu_count, getpid, listdir, mkdir, path +from re import compile, sub +from shutil import rmtree +from sympy import diff, simplify +try: + from joblib import Parallel, delayed + have_joblib = True +except: + have_joblib = False +try: + import xarray as xr + have_xarray = True +except: + have_xarray = False -from general import is_index, index_nearest, quickPlot +from .general import illegal_value, is_int, is_dict_series, is_index, index_nearest, \ + almost_equal, quick_plot #, eval_expr +#from sys import path as syspath +#syspath.append(f'/nfs/chess/user/rv43/msnctools/msnctools') +#from general import illegal_value, is_int, is_dict_series, is_index, index_nearest, \ +# almost_equal, quick_plot #, eval_expr + +from sys import float_info +float_min = float_info.min +float_max = float_info.max # sigma = fwhm_factor*fwhm fwhm_factor = { - 'gaussian' : f'fwhm/(2*sqrt(2*log(2)))', - 'lorentzian' : f'0.5*fwhm', - 'splitlorentzian' : f'0.5*fwhm', # sigma = sigma_r - 'voight' : f'0.2776*fwhm', # sigma = gamma - 'pseudovoight' : f'0.5*fwhm'} # fraction = 0.5 + 'gaussian': f'fwhm/(2*sqrt(2*log(2)))', + 'lorentzian': f'0.5*fwhm', + 'splitlorentzian': f'0.5*fwhm', # sigma = sigma_r + 'voight': f'0.2776*fwhm', # sigma = gamma + 'pseudovoight': f'0.5*fwhm'} # fraction = 0.5 # amplitude = height_factor*height*fwhm height_factor = { - 'gaussian' : f'height*fwhm*0.5*sqrt(pi/log(2))', - 'lorentzian' : f'height*fwhm*0.5*pi', - 'splitlorentzian' : f'height*fwhm*0.5*pi', # sigma = sigma_r - 'voight' : f'3.334*height*fwhm', # sigma = gamma - 'pseudovoight' : f'1.268*height*fwhm'} # fraction = 0.5 + 'gaussian': f'height*fwhm*0.5*sqrt(pi/log(2))', + 'lorentzian': f'height*fwhm*0.5*pi', + 'splitlorentzian': f'height*fwhm*0.5*pi', # sigma = sigma_r + 'voight': f'3.334*height*fwhm', # sigma = gamma + 'pseudovoight': f'1.268*height*fwhm'} # fraction = 0.5 class Fit: """Wrapper class for lmfit """ - def __init__(self, x, y, models=None, **kwargs): - self._x = x - self._y = y + def __init__(self, y, x=None, models=None, normalize=True, **kwargs): + if not isinstance(normalize, bool): + raise ValueError(f'Invalid parameter normalize ({normalize})') + self._mask = None self._model = None + self._norm = None + self._normalized = False self._parameters = Parameters() + self._parameter_bounds = None + self._parameter_norms = {} + self._linear_parameters = [] + self._nonlinear_parameters = [] self._result = None + self._try_linear_fit = True + self._y = None + self._y_norm = None + self._y_range = None + if 'try_linear_fit' in kwargs: + try_linear_fit = kwargs.pop('try_linear_fit') + if not isinstance(try_linear_fit, bool): + illegal_value(try_linear_fit, 'try_linear_fit', 'Fit.fit', raise_error=True) + self._try_linear_fit = try_linear_fit + if y is not None: + if isinstance(y, (tuple, list, np.ndarray)): + self._x = np.asarray(x) + elif have_xarray and isinstance(y, xr.DataArray): + if x is not None: + logging.warning('Ignoring superfluous input x ({x}) in Fit.__init__') + if y.ndim != 1: + illegal_value(y.ndim, 'DataArray dimensions', 'Fit:__init__', raise_error=True) + self._x = np.asarray(y[y.dims[0]]) + else: + illegal_value(y, 'y', 'Fit:__init__', raise_error=True) + self._y = y + if self._x.ndim != 1: + raise ValueError(f'Invalid dimension for input x ({self._x.ndim})') + if self._x.size != self._y.size: + raise ValueError(f'Inconsistent x and y dimensions ({self._x.size} vs '+ + f'{self._y.size})') + if 'mask' in kwargs: + self._mask = kwargs.pop('mask') + if self._mask is None: + y_min = float(self._y.min()) + self._y_range = float(self._y.max())-y_min + if normalize and self._y_range > 0.0: + self._norm = (y_min, self._y_range) + else: + self._mask = np.asarray(self._mask).astype(bool) + if self._x.size != self._mask.size: + raise ValueError(f'Inconsistent x and mask dimensions ({self._x.size} vs '+ + f'{self._mask.size})') + y_masked = np.asarray(self._y)[~self._mask] + y_min = float(y_masked.min()) + self._y_range = float(y_masked.max())-y_min + if normalize and self._y_range > 0.0: + if normalize and self._y_range > 0.0: + self._norm = (y_min, self._y_range) if models is not None: if callable(models) or isinstance(models, str): kwargs = self.add_model(models, **kwargs) @@ -55,73 +128,173 @@ self.fit(**kwargs) @classmethod - def fit_data(cls, x, y, models, **kwargs): - return cls(x, y, models, **kwargs) + def fit_data(cls, y, models, x=None, normalize=True, **kwargs): + return(cls(y, x=x, models=models, normalize=normalize, **kwargs)) @property def best_errors(self): if self._result is None: - return None - errors = {} - names = sorted(self._result.params) - for name in names: - par = self._result.params[name] - errors[name] = par.stderr - return errors + return(None) + return({name:self._result.params[name].stderr for name in sorted(self._result.params) + if name != 'tmp_normalization_offset_c'}) @property def best_fit(self): if self._result is None: - return None - return self._result.best_fit + return(None) + return(self._result.best_fit) @property def best_parameters(self): if self._result is None: - return None - parameters = [] - names = sorted(self._result.params) - for name in names: - par = self._result.params[name] - parameters.append({'name' : par.name, 'value' : par.value, 'error' : par.stderr, - 'init_value' : par.init_value, 'min' : par.min, 'max' : par.max, - 'vary' : par.vary, 'expr' : par.expr}) - return parameters + return(None) + parameters = {} + for name in sorted(self._result.params): + if name != 'tmp_normalization_offset_c': + par = self._result.params[name] + parameters[name] = {'value': par.value, 'error': par.stderr, + 'init_value': par.init_value, 'min': par.min, 'max': par.max, + 'vary': par.vary, 'expr': par.expr} + return(parameters) + + @property + def best_results(self): + """Convert the input data array to a data set and add the fit results. + """ + if self._result is None: + return(None) + if isinstance(self._y, xr.DataArray): + best_results = self._y.to_dataset() + dims = self._y.dims + fit_name = f'{self._y.name}_fit' + else: + coords = {'x': (['x'], self._x)} + dims = ('x') + best_results = xr.Dataset(coords=coords) + best_results['y'] = (dims, self._y) + fit_name = 'y_fit' + best_results[fit_name] = (dims, self.best_fit) + if self._mask is not None: + best_results['mask'] = self._mask + best_results.coords['par_names'] = ('peak', [name for name in self.best_values.keys()]) + best_results['best_values'] = (['par_names'], [v for v in self.best_values.values()]) + best_results['best_errors'] = (['par_names'], [v for v in self.best_errors.values()]) + best_results.attrs['components'] = self.components + return(best_results) @property def best_values(self): if self._result is None: - return None - return self._result.params.valuesdict() + return(None) + return({name:self._result.params[name].value for name in sorted(self._result.params) + if name != 'tmp_normalization_offset_c'}) @property def chisqr(self): - return self._result.chisqr + if self._result is None: + return(None) + return(self._result.chisqr) + + @property + def components(self): + components = {} + if self._result is None: + logging.warning('Unable to collect components in Fit.components') + return(components) + for component in self._result.components: + if 'tmp_normalization_offset_c' in component.param_names: + continue + parameters = {} + for name in component.param_names: + par = self._parameters[name] + parameters[name] = {'free': par.vary, 'value': self._result.params[name].value} + if par.expr is not None: + parameters[name]['expr'] = par.expr + expr = None + if isinstance(component, ExpressionModel): + name = component._name + if name[-1] == '_': + name = name[:-1] + expr = component.expr + else: + prefix = component.prefix + if len(prefix): + if prefix[-1] == '_': + prefix = prefix[:-1] + name = f'{prefix} ({component._name})' + else: + name = f'{component._name}' + if expr is None: + components[name] = {'parameters': parameters} + else: + components[name] = {'expr': expr, 'parameters': parameters} + return(components) @property def covar(self): - return self._result.covar + if self._result is None: + return(None) + return(self._result.covar) + + @property + def init_parameters(self): + if self._result is None or self._result.init_params is None: + return(None) + parameters = {} + for name in sorted(self._result.init_params): + if name != 'tmp_normalization_offset_c': + par = self._result.init_params[name] + parameters[name] = {'value': par.value, 'min': par.min, 'max': par.max, + 'vary': par.vary, 'expr': par.expr} + return(parameters) @property def init_values(self): + if self._result is None or self._result.init_params is None: + return(None) + return({name:self._result.init_params[name].value for name in + sorted(self._result.init_params) if name != 'tmp_normalization_offset_c'}) + + @property + def normalization_offset(self): if self._result is None: - return None - return self._result.init_values + return(None) + if self._norm is None: + return(0.0) + else: + if self._result.init_params is not None: + normalization_offset = self._result.init_params['tmp_normalization_offset_c'] + else: + normalization_offset = self._result.params['tmp_normalization_offset_c'] + return(normalization_offset) @property def num_func_eval(self): - return self._result.nfev + if self._result is None: + return(None) + return(self._result.nfev) + + @property + def parameters(self): + return({name:{'min': par.min, 'max': par.max, 'vary': par.vary, 'expr': par.expr} + for name, par in self._parameters.items() if name != 'tmp_normalization_offset_c'}) @property def redchi(self): - return self._result.redchi + if self._result is None: + return(None) + return(self._result.redchi) @property def residual(self): - return self._result.residual + if self._result is None: + return(None) + return(self._result.residual) @property def success(self): + if self._result is None: + return(None) if not self._result.success: # print(f'ier = {self._result.ier}') # print(f'lmdif_message = {self._result.lmdif_message}') @@ -133,183 +306,692 @@ logging.warning(f'ier = {self._result.ier}: {self._result.message}') else: logging.warning(f'ier = {self._result.ier}: {self._result.message}') - return True + return(True) # self.print_fit_report() # self.plot() - return self._result.success + return(self._result.success) @property def var_names(self): """Intended to be used with covar """ if self._result is None: - return None - return self._result.var_names + return(None) + return(getattr(self._result, 'var_names', None)) + + @property + def x(self): + return(self._x) - def print_fit_report(self, show_correl=False): - if self._result is not None: - print(self._result.fit_report(show_correl=show_correl)) + @property + def y(self): + return(self._y) + + def print_fit_report(self, result=None, show_correl=False): + if result is None: + result = self._result + if result is not None: + print(result.fit_report(show_correl=show_correl)) def add_parameter(self, **parameter): if not isinstance(parameter, dict): - illegal_value(parameter, 'parameter', 'add_parameter') - return + raise ValueError(f'Invalid parameter ({parameter})') + if parameter.get('expr') is not None: + raise KeyError(f'Illegal "expr" key in parameter {parameter}') + name = parameter['name'] + if not isinstance(name, str): + raise ValueError(f'Illegal "name" value ({name}) in parameter {parameter}') + if parameter.get('norm') is None: + self._parameter_norms[name] = False + else: + norm = parameter.pop('norm') + if self._norm is None: + logging.warning(f'Ignoring norm in parameter {name} in '+ + f'Fit.add_parameter (normalization is turned off)') + self._parameter_norms[name] = False + else: + if not isinstance(norm, bool): + raise ValueError(f'Illegal "norm" value ({norm}) in parameter {parameter}') + self._parameter_norms[name] = norm + vary = parameter.get('vary') + if vary is not None: + if not isinstance(vary, bool): + raise ValueError(f'Illegal "vary" value ({vary}) in parameter {parameter}') + if not vary: + if 'min' in parameter: + logging.warning(f'Ignoring min in parameter {name} in '+ + f'Fit.add_parameter (vary = {vary})') + parameter.pop('min') + if 'max' in parameter: + logging.warning(f'Ignoring max in parameter {name} in '+ + f'Fit.add_parameter (vary = {vary})') + parameter.pop('max') + if self._norm is not None and name not in self._parameter_norms: + raise ValueError(f'Missing parameter normalization type for paremeter {name}') self._parameters.add(**parameter) - def add_model(self, model, prefix=None, parameters=None, **kwargs): + def add_model(self, model, prefix=None, parameters=None, parameter_norms=None, **kwargs): # Create the new model -# print('\nAt start adding model:') -# self._parameters.pretty_print() +# print(f'at start add_model:\nself._parameters:\n{self._parameters}') +# print(f'at start add_model: kwargs = {kwargs}') +# print(f'parameters = {parameters}') +# print(f'parameter_norms = {parameter_norms}') +# if len(self._parameters.keys()): +# print('\nAt start adding model:') +# self._parameters.pretty_print() +# print(f'parameter_norms:\n{self._parameter_norms}') if prefix is not None and not isinstance(prefix, str): logging.warning('Ignoring illegal prefix: {model} {type(model)}') prefix = None + if prefix is None: + pprefix = '' + else: + pprefix = prefix + if parameters is not None: + if isinstance(parameters, dict): + parameters = (parameters, ) + elif not is_dict_series(parameters): + illegal_value(parameters, 'parameters', 'Fit.add_model', raise_error=True) + parameters = deepcopy(parameters) + if parameter_norms is not None: + if isinstance(parameter_norms, dict): + parameter_norms = (parameter_norms, ) + if not is_dict_series(parameter_norms): + illegal_value(parameter_norms, 'parameter_norms', 'Fit.add_model', raise_error=True) + new_parameter_norms = {} if callable(model): + # Linear fit not yet implemented for callable models + self._try_linear_fit = False + if parameter_norms is None: + if parameters is None: + raise ValueError('Either "parameters" or "parameter_norms" is required in '+ + f'{model}') + for par in parameters: + name = par['name'] + if not isinstance(name, str): + raise ValueError(f'Illegal "name" value ({name}) in input parameters') + if par.get('norm') is not None: + norm = par.pop('norm') + if not isinstance(norm, bool): + raise ValueError(f'Illegal "norm" value ({norm}) in input parameters') + new_parameter_norms[f'{pprefix}{name}'] = norm + else: + for par in parameter_norms: + name = par['name'] + if not isinstance(name, str): + raise ValueError(f'Illegal "name" value ({name}) in input parameters') + norm = par.get('norm') + if norm is None or not isinstance(norm, bool): + raise ValueError(f'Illegal "norm" value ({norm}) in input parameters') + new_parameter_norms[f'{pprefix}{name}'] = norm + if parameters is not None: + for par in parameters: + if par.get('expr') is not None: + raise KeyError(f'Illegal "expr" key ({par.get("expr")}) in parameter '+ + f'{name} for a callable model {model}') + name = par['name'] + if not isinstance(name, str): + raise ValueError(f'Illegal "name" value ({name}) in input parameters') +# RV FIX callable model will need partial deriv functions for any linear pars to get the linearized matrix, so for now skip linear solution option newmodel = Model(model, prefix=prefix) elif isinstance(model, str): - if model == 'constant': + if model == 'constant': # Par: c newmodel = ConstantModel(prefix=prefix) - elif model == 'linear': + new_parameter_norms[f'{pprefix}c'] = True + self._linear_parameters.append(f'{pprefix}c') + elif model == 'linear': # Par: slope, intercept newmodel = LinearModel(prefix=prefix) - elif model == 'quadratic': + new_parameter_norms[f'{pprefix}slope'] = True + new_parameter_norms[f'{pprefix}intercept'] = True + self._linear_parameters.append(f'{pprefix}slope') + self._linear_parameters.append(f'{pprefix}intercept') + elif model == 'quadratic': # Par: a, b, c newmodel = QuadraticModel(prefix=prefix) - elif model == 'gaussian': + new_parameter_norms[f'{pprefix}a'] = True + new_parameter_norms[f'{pprefix}b'] = True + new_parameter_norms[f'{pprefix}c'] = True + self._linear_parameters.append(f'{pprefix}a') + self._linear_parameters.append(f'{pprefix}b') + self._linear_parameters.append(f'{pprefix}c') + elif model == 'gaussian': # Par: amplitude, center, sigma (fwhm, height) newmodel = GaussianModel(prefix=prefix) - elif model == 'step': + new_parameter_norms[f'{pprefix}amplitude'] = True + new_parameter_norms[f'{pprefix}center'] = False + new_parameter_norms[f'{pprefix}sigma'] = False + self._linear_parameters.append(f'{pprefix}amplitude') + self._nonlinear_parameters.append(f'{pprefix}center') + self._nonlinear_parameters.append(f'{pprefix}sigma') + # parameter norms for height and fwhm are needed to get correct errors + new_parameter_norms[f'{pprefix}height'] = True + new_parameter_norms[f'{pprefix}fwhm'] = False + elif model == 'lorentzian': # Par: amplitude, center, sigma (fwhm, height) + newmodel = LorentzianModel(prefix=prefix) + new_parameter_norms[f'{pprefix}amplitude'] = True + new_parameter_norms[f'{pprefix}center'] = False + new_parameter_norms[f'{pprefix}sigma'] = False + self._linear_parameters.append(f'{pprefix}amplitude') + self._nonlinear_parameters.append(f'{pprefix}center') + self._nonlinear_parameters.append(f'{pprefix}sigma') + # parameter norms for height and fwhm are needed to get correct errors + new_parameter_norms[f'{pprefix}height'] = True + new_parameter_norms[f'{pprefix}fwhm'] = False + elif model == 'exponential': # Par: amplitude, decay + newmodel = ExponentialModel(prefix=prefix) + new_parameter_norms[f'{pprefix}amplitude'] = True + new_parameter_norms[f'{pprefix}decay'] = False + self._linear_parameters.append(f'{pprefix}amplitude') + self._nonlinear_parameters.append(f'{pprefix}decay') + elif model == 'step': # Par: amplitude, center, sigma form = kwargs.get('form') if form is not None: - del kwargs['form'] + kwargs.pop('form') if form is None or form not in ('linear', 'atan', 'arctan', 'erf', 'logistic'): - logging.error(f'Illegal form parameter for build-in step model ({form})') - return kwargs + raise ValueError(f'Invalid parameter form for build-in step model ({form})') newmodel = StepModel(prefix=prefix, form=form) - elif model == 'rectangle': + new_parameter_norms[f'{pprefix}amplitude'] = True + new_parameter_norms[f'{pprefix}center'] = False + new_parameter_norms[f'{pprefix}sigma'] = False + self._linear_parameters.append(f'{pprefix}amplitude') + self._nonlinear_parameters.append(f'{pprefix}center') + self._nonlinear_parameters.append(f'{pprefix}sigma') + elif model == 'rectangle': # Par: amplitude, center1, center2, sigma1, sigma2 form = kwargs.get('form') if form is not None: - del kwargs['form'] + kwargs.pop('form') if form is None or form not in ('linear', 'atan', 'arctan', 'erf', 'logistic'): - logging.error(f'Illegal form parameter for build-in rectangle model ({form})') - return kwargs + raise ValueError('Invalid parameter form for build-in rectangle model '+ + f'({form})') newmodel = RectangleModel(prefix=prefix, form=form) + new_parameter_norms[f'{pprefix}amplitude'] = True + new_parameter_norms[f'{pprefix}center1'] = False + new_parameter_norms[f'{pprefix}center2'] = False + new_parameter_norms[f'{pprefix}sigma1'] = False + new_parameter_norms[f'{pprefix}sigma2'] = False + self._linear_parameters.append(f'{pprefix}amplitude') + self._nonlinear_parameters.append(f'{pprefix}center1') + self._nonlinear_parameters.append(f'{pprefix}center2') + self._nonlinear_parameters.append(f'{pprefix}sigma1') + self._nonlinear_parameters.append(f'{pprefix}sigma2') + elif model == 'expression': # Par: by expression + expr = kwargs['expr'] + if not isinstance(expr, str): + raise ValueError(f'Illegal "expr" value ({expr}) in {model}') + kwargs.pop('expr') + if parameter_norms is not None: + logging.warning('Ignoring parameter_norms (normalization determined from '+ + 'linearity)}') + if parameters is not None: + for par in parameters: + if par.get('expr') is not None: + raise KeyError(f'Illegal "expr" key ({par.get("expr")}) in parameter '+ + f'({par}) for an expression model') + if par.get('norm') is not None: + logging.warning(f'Ignoring "norm" key in parameter ({par}) '+ + '(normalization determined from linearity)}') + par.pop('norm') + name = par['name'] + if not isinstance(name, str): + raise ValueError(f'Illegal "name" value ({name}) in input parameters') + ast = Interpreter() + expr_parameters = [name for name in get_ast_names(ast.parse(expr)) + if name != 'x' and name not in self._parameters + and name not in ast.symtable] +# print(f'\nexpr_parameters: {expr_parameters}') +# print(f'expr = {expr}') + if prefix is None: + newmodel = ExpressionModel(expr=expr) + else: + for name in expr_parameters: + expr = sub(rf'\b{name}\b', f'{prefix}{name}', expr) + expr_parameters = [f'{prefix}{name}' for name in expr_parameters] +# print(f'\nexpr_parameters: {expr_parameters}') +# print(f'expr = {expr}') + newmodel = ExpressionModel(expr=expr, name=name) +# print(f'\nnewmodel = {newmodel.__dict__}') +# print(f'params_names = {newmodel._param_names}') +# print(f'params_names = {newmodel.param_names}') + # Remove already existing names + for name in newmodel.param_names.copy(): + if name not in expr_parameters: + newmodel._func_allargs.remove(name) + newmodel._param_names.remove(name) +# print(f'params_names = {newmodel._param_names}') +# print(f'params_names = {newmodel.param_names}') else: - logging.error('Unknown build-in fit model') - return kwargs + raise ValueError(f'Unknown build-in fit model ({model})') else: - illegal_value(model, 'model', 'add_model') - return kwargs + illegal_value(model, 'model', 'Fit.add_model', raise_error=True) # Add the new model to the current one +# print('\nBefore adding model:') +# print(f'\nnewmodel = {newmodel.__dict__}') +# if len(self._parameters): +# self._parameters.pretty_print() if self._model is None: self._model = newmodel else: self._model += newmodel - if self._parameters is None: - self._parameters = newmodel.make_params() - else: - self._parameters += newmodel.make_params() + new_parameters = newmodel.make_params() + self._parameters += new_parameters # print('\nAfter adding model:') +# print(f'\nnewmodel = {newmodel.__dict__}') +# print(f'\nnew_parameters = {new_parameters}') # self._parameters.pretty_print() - # Initialize the model parameters + # Check linearity of expression model paremeters + if isinstance(newmodel, ExpressionModel): + for name in newmodel.param_names: + if not diff(newmodel.expr, name, name): + if name not in self._linear_parameters: + self._linear_parameters.append(name) + new_parameter_norms[name] = True +# print(f'\nADDING {name} TO LINEAR') + else: + if name not in self._nonlinear_parameters: + self._nonlinear_parameters.append(name) + new_parameter_norms[name] = False +# print(f'\nADDING {name} TO NONLINEAR') +# print(f'new_parameter_norms:\n{new_parameter_norms}') + + # Scale the default initial model parameters + if self._norm is not None: + for name, norm in new_parameter_norms.copy().items(): + par = self._parameters.get(name) + if par is None: + new_parameter_norms.pop(name) + continue + if par.expr is None and norm: + value = par.value*self._norm[1] + _min = par.min + _max = par.max + if not np.isinf(_min) and abs(_min) != float_min: + _min *= self._norm[1] + if not np.isinf(_max) and abs(_max) != float_min: + _max *= self._norm[1] + par.set(value=value, min=_min, max=_max) +# print('\nAfter norm defaults:') +# self._parameters.pretty_print() +# print(f'parameters:\n{parameters}') +# print(f'all_parameters:\n{list(self.parameters)}') +# print(f'new_parameter_norms:\n{new_parameter_norms}') +# print(f'parameter_norms:\n{self._parameter_norms}') + + # Initialize the model parameters from parameters if prefix is None: prefix = "" if parameters is not None: - if not isinstance(parameters, (tuple, list)): - illegal_value(parameters, 'parameters', 'add_model') - return kwargs for parameter in parameters: - if not isinstance(parameter, dict): - illegal_value(parameter, 'parameter in parameters', 'add_model') - return kwargs - parameter['name'] = prefix+parameter['name'] - self._parameters.add(**parameter) - for name, value in kwargs.items(): - if isinstance(value, (int, float)): - self._parameters.add(prefix+name, value=value) -# print('\nAt end add_model:') + name = parameter['name'] + if not isinstance(name, str): + raise ValueError(f'Illegal "name" value ({name}) in input parameters') + if name not in new_parameters: + name = prefix+name + parameter['name'] = name + if name not in new_parameters: + logging.warning(f'Ignoring superfluous parameter info for {name}') + continue + if name in self._parameters: + parameter.pop('name') + if 'norm' in parameter: + if not isinstance(parameter['norm'], bool): + illegal_value(parameter['norm'], 'norm', 'Fit.add_model', + raise_error=True) + new_parameter_norms[name] = parameter['norm'] + parameter.pop('norm') + if parameter.get('expr') is not None: + if 'value' in parameter: + logging.warning(f'Ignoring value in parameter {name} '+ + f'(set by expression: {parameter["expr"]})') + parameter.pop('value') + if 'vary' in parameter: + logging.warning(f'Ignoring vary in parameter {name} '+ + f'(set by expression: {parameter["expr"]})') + parameter.pop('vary') + if 'min' in parameter: + logging.warning(f'Ignoring min in parameter {name} '+ + f'(set by expression: {parameter["expr"]})') + parameter.pop('min') + if 'max' in parameter: + logging.warning(f'Ignoring max in parameter {name} '+ + f'(set by expression: {parameter["expr"]})') + parameter.pop('max') + if 'vary' in parameter: + if not isinstance(parameter['vary'], bool): + illegal_value(parameter['vary'], 'vary', 'Fit.add_model', + raise_error=True) + if not parameter['vary']: + if 'min' in parameter: + logging.warning(f'Ignoring min in parameter {name} in '+ + f'Fit.add_model (vary = {parameter["vary"]})') + parameter.pop('min') + if 'max' in parameter: + logging.warning(f'Ignoring max in parameter {name} in '+ + f'Fit.add_model (vary = {parameter["vary"]})') + parameter.pop('max') + self._parameters[name].set(**parameter) + parameter['name'] = name + else: + illegal_value(parameter, 'parameter name', 'Fit.model', raise_error=True) + self._parameter_norms = {**self._parameter_norms, **new_parameter_norms} +# print('\nAfter parameter init:') # self._parameters.pretty_print() +# print(f'parameters:\n{parameters}') +# print(f'new_parameter_norms:\n{new_parameter_norms}') +# print(f'parameter_norms:\n{self._parameter_norms}') +# print(f'kwargs:\n{kwargs}') - return kwargs + # Initialize the model parameters from kwargs + for name, value in {**kwargs}.items(): + full_name = f'{pprefix}{name}' + if full_name in new_parameter_norms and isinstance(value, (int, float)): + kwargs.pop(name) + if self._parameters[full_name].expr is None: + self._parameters[full_name].set(value=value) + else: + logging.warning(f'Ignoring parameter {name} in Fit.fit (set by expression: '+ + f'{self._parameters[full_name].expr})') +# print('\nAfter kwargs init:') +# self._parameters.pretty_print() +# print(f'parameter_norms:\n{self._parameter_norms}') +# print(f'kwargs:\n{kwargs}') + + # Check parameter norms (also need it for expressions to renormalize the errors) + if self._norm is not None and (callable(model) or model == 'expression'): + missing_norm = False + for name in new_parameters.valuesdict(): + if name not in self._parameter_norms: + print(f'new_parameters:\n{new_parameters.valuesdict()}') + print(f'self._parameter_norms:\n{self._parameter_norms}') + logging.error(f'Missing parameter normalization type for {name} in {model}') + missing_norm = True + if missing_norm: + raise ValueError + +# print(f'at end add_model:\nself._parameters:\n{list(self.parameters)}') +# print(f'at end add_model: kwargs = {kwargs}') +# print(f'\nat end add_model: newmodel:\n{newmodel.__dict__}\n') + return(kwargs) def fit(self, interactive=False, guess=False, **kwargs): + # Check inputs if self._model is None: logging.error('Undefined fit model') return - # Current parameter values - pars = self._parameters.valuesdict() - # Apply parameter updates through keyword arguments - for par in set(pars) & set(kwargs): - pars[par] = kwargs.pop(par) - self._parameters[par].set(value=pars[par]) - # Check for uninitialized parameters - for par, value in pars.items(): - if value is None or np.isinf(value) or np.isnan(value): - if interactive: - self._parameters[par].set(value= - input_num(f'Enter an initial value for {par}: ')) - else: - self._parameters[par].set(value=1.0) -# print('\nAt start actual fit:') -# print(f'kwargs = {kwargs}') -# self._parameters.pretty_print() -# print(f'parameters:\n{self._parameters}') -# print(f'x = {self._x}') -# print(f'len(x) = {len(self._x)}') -# print(f'y = {self._y}') -# print(f'len(y) = {len(self._y)}') + if not isinstance(interactive, bool): + illegal_value(interactive, 'interactive', 'Fit.fit', raise_error=True) + if not isinstance(guess, bool): + illegal_value(guess, 'guess', 'Fit.fit', raise_error=True) + if 'try_linear_fit' in kwargs: + try_linear_fit = kwargs.pop('try_linear_fit') + if not isinstance(try_linear_fit, bool): + illegal_value(try_linear_fit, 'try_linear_fit', 'Fit.fit', raise_error=True) + if not self._try_linear_fit: + logging.warning('Ignore superfluous keyword argument "try_linear_fit" (not '+ + 'yet supported for callable models)') + else: + self._try_linear_fit = try_linear_fit +# if self._result is None: +# if 'parameters' in kwargs: +# raise ValueError('Invalid parameter parameters ({kwargs["parameters"]})') +# else: + if self._result is not None: + if guess: + logging.warning('Ignoring input parameter guess in Fit.fit during refitting') + guess = False + + # Check for circular expressions + # FIX TODO +# for name1, par1 in self._parameters.items(): +# if par1.expr is not None: + + # Apply mask if supplied: + if 'mask' in kwargs: + self._mask = kwargs.pop('mask') + if self._mask is not None: + self._mask = np.asarray(self._mask).astype(bool) + if self._x.size != self._mask.size: + raise ValueError(f'Inconsistent x and mask dimensions ({self._x.size} vs '+ + f'{self._mask.size})') + + # Estimate initial parameters with build-in lmfit guess method (only for a single model) +# print(f'\nat start fit: kwargs = {kwargs}') +#RV print('\nAt start of fit:') +#RV self._parameters.pretty_print() +# print(f'parameter_norms:\n{self._parameter_norms}') if guess: - self._parameters = self._model.guess(self._y, x=self._x) - self._result = self._model.fit(self._y, self._parameters, x=self._x, **kwargs) -# print('\nAt end actual fit:') -# print(f'var_names:\n{self._result.var_names}') -# print(f'stderr:\n{np.sqrt(np.diagonal(self._result.covar))}') -# self._parameters.pretty_print() -# print(f'parameters:\n{self._parameters}') + if self._mask is None: + self._parameters = self._model.guess(self._y, x=self._x) + else: + self._parameters = self._model.guess(np.asarray(self._y)[~self._mask], + x=self._x[~self._mask]) +# print('\nAfter guess:') +# self._parameters.pretty_print() + + # Add constant offset for a normalized model + if self._result is None and self._norm is not None and self._norm[0]: + self.add_model('constant', prefix='tmp_normalization_offset_', parameters={'name': 'c', + 'value': -self._norm[0], 'vary': False, 'norm': True}) + #'value': -self._norm[0]/self._norm[1], 'vary': False, 'norm': False}) + + # Adjust existing parameters for refit: + if 'parameters' in kwargs: + parameters = kwargs.pop('parameters') + if isinstance(parameters, dict): + parameters = (parameters, ) + elif not is_dict_series(parameters): + illegal_value(parameters, 'parameters', 'Fit.fit', raise_error=True) + for par in parameters: + name = par['name'] + if name not in self._parameters: + raise ValueError(f'Unable to match {name} parameter {par} to an existing one') + if self._parameters[name].expr is not None: + raise ValueError(f'Unable to modify {name} parameter {par} (currently an '+ + 'expression)') + if par.get('expr') is not None: + raise KeyError(f'Illegal "expr" key in {name} parameter {par}') + self._parameters[name].set(vary=par.get('vary')) + self._parameters[name].set(min=par.get('min')) + self._parameters[name].set(max=par.get('max')) + self._parameters[name].set(value=par.get('value')) +#RV print('\nAfter adjust:') +#RV self._parameters.pretty_print() + + # Apply parameter updates through keyword arguments +# print(f'kwargs = {kwargs}') +# print(f'parameter_norms = {self._parameter_norms}') + for name in set(self._parameters) & set(kwargs): + value = kwargs.pop(name) + if self._parameters[name].expr is None: + self._parameters[name].set(value=value) + else: + logging.warning(f'Ignoring parameter {name} in Fit.fit (set by expression: '+ + f'{self._parameters[name].expr})') + + # Check for uninitialized parameters + for name, par in self._parameters.items(): + if par.expr is None: + value = par.value + if value is None or np.isinf(value) or np.isnan(value): + if interactive: + value = input_num(f'Enter an initial value for {name}', default=1.0) + else: + value = 1.0 + if self._norm is None or name not in self._parameter_norms: + self._parameters[name].set(value=value) + elif self._parameter_norms[name]: + self._parameters[name].set(value=value*self._norm[1]) - def plot(self): - if self._result is None: + # Check if model is linear + try: + linear_model = self._check_linearity_model() + except: + linear_model = False +# print(f'\n\n--------> linear_model = {linear_model}\n') + if kwargs.get('check_only_linearity') is not None: + return(linear_model) + + # Normalize the data and initial parameters +#RV print('\nBefore normalization:') +#RV self._parameters.pretty_print() +# print(f'parameter_norms:\n{self._parameter_norms}') + self._normalize() +# print(f'norm = {self._norm}') +#RV print('\nAfter normalization:') +#RV self._parameters.pretty_print() +# self.print_fit_report() +# print(f'parameter_norms:\n{self._parameter_norms}') + + if linear_model: + # Perform a linear fit by direct matrix solution with numpy + try: + if self._mask is None: + self._fit_linear_model(self._x, self._y_norm) + else: + self._fit_linear_model(self._x[~self._mask], + np.asarray(self._y_norm)[~self._mask]) + except: + linear_model = False + if not linear_model: + # Perform a non-linear fit with lmfit + # Prevent initial values from sitting at boundaries + self._parameter_bounds = {name:{'min': par.min, 'max': par.max} for name, par in + self._parameters.items() if par.vary} + for par in self._parameters.values(): + if par.vary: + par.set(value=self._reset_par_at_boundary(par, par.value)) +# print('\nAfter checking boundaries:') +# self._parameters.pretty_print() + + # Perform the fit +# fit_kws = None +# if 'Dfun' in kwargs: +# fit_kws = {'Dfun': kwargs.pop('Dfun')} +# self._result = self._model.fit(self._y_norm, self._parameters, x=self._x, +# fit_kws=fit_kws, **kwargs) + if self._mask is None: + self._result = self._model.fit(self._y_norm, self._parameters, x=self._x, **kwargs) + else: + self._result = self._model.fit(np.asarray(self._y_norm)[~self._mask], + self._parameters, x=self._x[~self._mask], **kwargs) +#RV print('\nAfter fit:') +# print(f'\nself._result ({self._result}):\n\t{self._result.__dict__}') +#RV self._parameters.pretty_print() +# self.print_fit_report() + + # Set internal parameter values to fit results upon success + if self.success: + for name, par in self._parameters.items(): + if par.expr is None and par.vary: + par.set(value=self._result.params[name].value) +# print('\nAfter update parameter values:') +# self._parameters.pretty_print() + + # Renormalize the data and results + self._renormalize() +#RV print('\nAfter renormalization:') +#RV self._parameters.pretty_print() +# self.print_fit_report() + + def plot(self, y=None, y_title=None, result=None, skip_init=False, plot_comp_legends=False, + plot_residual=False, plot_masked_data=True, **kwargs): + if result is None: + result = self._result + if result is None: return - components = self._result.eval_components() - plots = ((self._x, self._y, '.'), (self._x, self._result.best_fit, 'k-'), - (self._x, self._result.init_fit, 'g-')) - legend = ['data', 'best fit', 'init'] - if len(components) > 1: + plots = [] + legend = [] + if self._mask is None: + mask = np.zeros(self._x.size).astype(bool) + plot_masked_data = False + else: + mask = self._mask + if y is not None: + if not isinstance(y, (tuple, list, np.ndarray)): + illegal_value(y, 'y', 'Fit.plot') + if len(y) != len(self._x): + logging.warning('Ignoring parameter y in Fit.plot (wrong dimension)') + y = None + if y is not None: + if y_title is None or not isinstance(y_title, str): + y_title = 'data' + plots += [(self._x, y, '.')] + legend += [y_title] + if self._y is not None: + plots += [(self._x, np.asarray(self._y), 'b.')] + legend += ['data'] + if plot_masked_data: + plots += [(self._x[mask], np.asarray(self._y)[mask], 'bx')] + legend += ['masked data'] + if isinstance(plot_residual, bool) and plot_residual: + plots += [(self._x[~mask], result.residual, 'k-')] + legend += ['residual'] + plots += [(self._x[~mask], result.best_fit, 'k-')] + legend += ['best fit'] + if not skip_init and hasattr(result, 'init_fit'): + plots += [(self._x[~mask], result.init_fit, 'g-')] + legend += ['init'] + components = result.eval_components(x=self._x[~mask]) + num_components = len(components) + if 'tmp_normalization_offset_' in components: + num_components -= 1 + if num_components > 1: + eval_index = 0 for modelname, y in components.items(): + if modelname == 'tmp_normalization_offset_': + continue + if modelname == '_eval': + modelname = f'eval{eval_index}' + if len(modelname) > 20: + modelname = f'{modelname[0:16]} ...' if isinstance(y, (int, float)): - y *= np.ones(len(self._x)) - plots += ((self._x, y, '--'),) -# if modelname[-1] == '_': -# legend.append(modelname[:-1]) -# else: -# legend.append(modelname) - quickPlot(plots, legend=legend, block=True) + y *= np.ones(self._x[~mask].size) + plots += [(self._x[~mask], y, '--')] + if plot_comp_legends: + if modelname[-1] == '_': + legend.append(modelname[:-1]) + else: + legend.append(modelname) + title = kwargs.get('title') + if title is not None: + kwargs.pop('title') + quick_plot(tuple(plots), legend=legend, title=title, block=True, **kwargs) @staticmethod def guess_init_peak(x, y, *args, center_guess=None, use_max_for_center=True): """ Return a guess for the initial height, center and fwhm for a peak """ +# print(f'\n\nargs = {args}') +# print(f'center_guess = {center_guess}') +# quick_plot(x, y, vlines=center_guess, block=True) center_guesses = None + x = np.asarray(x) + y = np.asarray(y) if len(x) != len(y): - logging.error(f'Illegal x and y lengths ({len(x)}, {len(y)}), skip initial guess') - return None, None, None + logging.error(f'Invalid x and y lengths ({len(x)}, {len(y)}), skip initial guess') + return(None, None, None) if isinstance(center_guess, (int, float)): if len(args): logging.warning('Ignoring additional arguments for single center_guess value') + center_guesses = [center_guess] elif isinstance(center_guess, (tuple, list, np.ndarray)): if len(center_guess) == 1: logging.warning('Ignoring additional arguments for single center_guess value') if not isinstance(center_guess[0], (int, float)): - raise ValueError(f'Illegal center_guess type ({type(center_guess[0])})') + raise ValueError(f'Invalid parameter center_guess ({type(center_guess[0])})') center_guess = center_guess[0] else: if len(args) != 1: - raise ValueError(f'Illegal number of arguments ({len(args)})') + raise ValueError(f'Invalid number of arguments ({len(args)})') n = args[0] if not is_index(n, 0, len(center_guess)): - raise ValueError('Illegal argument') + raise ValueError('Invalid argument') center_guesses = center_guess center_guess = center_guesses[n] elif center_guess is not None: - raise ValueError(f'Illegal center_guess type ({type(center_guess)})') + raise ValueError(f'Invalid center_guess type ({type(center_guess)})') +# print(f'x = {x}') +# print(f'y = {y}') +# print(f'center_guess = {center_guess}') # Sort the inputs index = np.argsort(x) @@ -319,12 +1001,21 @@ # print(f'miny = {miny}') # print(f'x_range = {x[0]} {x[-1]} {len(x)}') # print(f'y_range = {y[0]} {y[-1]} {len(y)}') +# quick_plot(x, y, vlines=center_guess, block=True) # xx = x # yy = y # Set range for current peak +# print(f'n = {n}') # print(f'center_guesses = {center_guesses}') if center_guesses is not None: + if len(center_guesses) > 1: + index = np.argsort(center_guesses) + n = list(index).index(n) +# print(f'n = {n}') +# print(f'index = {index}') + center_guesses = np.asarray(center_guesses)[index] +# print(f'center_guesses = {center_guesses}') if n == 0: low = 0 upp = index_nearest(x, (center_guesses[0]+center_guesses[1])/2) @@ -338,7 +1029,7 @@ # print(f'upp = {upp}') x = x[low:upp] y = y[low:upp] -# quickPlot(x, y, vlines=(x[0], center_guess, x[-1]), block=True) +# quick_plot(x, y, vlines=(x[0], center_guess, x[-1]), block=True) # Estimate FHHM maxy = y.max() @@ -377,7 +1068,7 @@ fwhm_index2 = i break # print(f'fwhm_index2 = {fwhm_index2} {x[fwhm_index2]}') -# quickPlot((x,y,'o'), vlines=(x[fwhm_index1], center, x[fwhm_index2]), block=True) +# quick_plot((x,y,'o'), vlines=(x[fwhm_index1], center, x[fwhm_index2]), block=True) if fwhm_index1 == 0 and fwhm_index2 < len(x)-1: fwhm = 2*(x[fwhm_index2]-center) elif fwhm_index1 > 0 and fwhm_index2 == len(x)-1: @@ -389,53 +1080,442 @@ # print(f'fwhm = {fwhm}') # Return height, center and FWHM -# quickPlot((x,y,'o'), (xx,yy), vlines=(x[fwhm_index1], center, x[fwhm_index2]), block=True) - return height, center, fwhm +# quick_plot((x,y,'o'), (xx,yy), vlines=(x[fwhm_index1], center, x[fwhm_index2]), block=True) + return(height, center, fwhm) + + def _check_linearity_model(self): + """Identify the linearity of all model parameters and check if the model is linear or not + """ + if not self._try_linear_fit: + logging.info('Skip linearity check (not yet supported for callable models)') + return(False) + free_parameters = [name for name, par in self._parameters.items() if par.vary] + for component in self._model.components: + if 'tmp_normalization_offset_c' in component.param_names: + continue + if isinstance(component, ExpressionModel): + for name in free_parameters: + if diff(component.expr, name, name): +# print(f'\t\t{component.expr} is non-linear in {name}') + self._nonlinear_parameters.append(name) + if name in self._linear_parameters: + self._linear_parameters.remove(name) + else: + model_parameters = component.param_names.copy() + for basename, hint in component.param_hints.items(): + name = f'{component.prefix}{basename}' + if hint.get('expr') is not None: + model_parameters.remove(name) + for name in model_parameters: + expr = self._parameters[name].expr + if expr is not None: + for nname in free_parameters: + if name in self._nonlinear_parameters: + if diff(expr, nname): +# print(f'\t\t{component} is non-linear in {nname} (through {name} = "{expr}")') + self._nonlinear_parameters.append(nname) + if nname in self._linear_parameters: + self._linear_parameters.remove(nname) + else: + assert(name in self._linear_parameters) +# print(f'\n\nexpr ({type(expr)}) = {expr}\nnname ({type(nname)}) = {nname}\n\n') + if diff(expr, nname, nname): +# print(f'\t\t{component} is non-linear in {nname} (through {name} = "{expr}")') + self._nonlinear_parameters.append(nname) + if nname in self._linear_parameters: + self._linear_parameters.remove(nname) +# print(f'\nfree parameters:\n\t{free_parameters}') +# print(f'linear parameters:\n\t{self._linear_parameters}') +# print(f'nonlinear parameters:\n\t{self._nonlinear_parameters}\n') + if any(True for name in self._nonlinear_parameters if self._parameters[name].vary): + return(False) + return(True) + + def _fit_linear_model(self, x, y): + """Perform a linear fit by direct matrix solution with numpy + """ + # Construct the matrix and the free parameter vector +# print(f'\nparameters:') +# self._parameters.pretty_print() +# print(f'\nparameter_norms:\n\t{self._parameter_norms}') +# print(f'\nlinear_parameters:\n\t{self._linear_parameters}') +# print(f'nonlinear_parameters:\n\t{self._nonlinear_parameters}') + free_parameters = [name for name, par in self._parameters.items() if par.vary] +# print(f'free parameters:\n\t{free_parameters}\n') + expr_parameters = {name:par.expr for name, par in self._parameters.items() + if par.expr is not None} + model_parameters = [] + for component in self._model.components: + if 'tmp_normalization_offset_c' in component.param_names: + continue + model_parameters += component.param_names + for basename, hint in component.param_hints.items(): + name = f'{component.prefix}{basename}' + if hint.get('expr') is not None: + expr_parameters.pop(name) + model_parameters.remove(name) +# print(f'expr parameters:\n{expr_parameters}') +# print(f'model parameters:\n\t{model_parameters}\n') + norm = 1.0 + if self._normalized: + norm = self._norm[1] +# print(f'\n\nself._normalized = {self._normalized}\nnorm = {norm}\nself._norm = {self._norm}\n') + # Add expression parameters to asteval + ast = Interpreter() +# print(f'Adding to asteval sym table:') + for name, expr in expr_parameters.items(): +# print(f'\tadding {name} {expr}') + ast.symtable[name] = expr + # Add constant parameters to asteval + # (renormalize to use correctly in evaluation of expression models) + for name, par in self._parameters.items(): + if par.expr is None and not par.vary: + if self._parameter_norms[name]: +# print(f'\tadding {name} {par.value*norm}') + ast.symtable[name] = par.value*norm + else: +# print(f'\tadding {name} {par.value}') + ast.symtable[name] = par.value + A = np.zeros((len(x), len(free_parameters)), dtype='float64') + y_const = np.zeros(len(x), dtype='float64') + have_expression_model = False + for component in self._model.components: + if isinstance(component, ConstantModel): + name = component.param_names[0] +# print(f'\nConstant model: {name} {self._parameters[name]}\n') + if name in free_parameters: +# print(f'\t\t{name} is a free constant set matrix column {free_parameters.index(name)} to 1.0') + A[:,free_parameters.index(name)] = 1.0 + else: + if self._parameter_norms[name]: + delta_y_const = self._parameters[name]*np.ones(len(x)) + else: + delta_y_const = (self._parameters[name]*norm)*np.ones(len(x)) + y_const += delta_y_const +# print(f'\ndelta_y_const ({type(delta_y_const)}):\n{delta_y_const}\n') + elif isinstance(component, ExpressionModel): + have_expression_model = True + const_expr = component.expr +# print(f'\nExpression model:\nconst_expr: {const_expr}\n') + for name in free_parameters: + dexpr_dname = diff(component.expr, name) + if dexpr_dname: + const_expr = f'{const_expr}-({str(dexpr_dname)})*{name}' +# print(f'\tconst_expr: {const_expr}') + if not self._parameter_norms[name]: + dexpr_dname = f'({dexpr_dname})/{norm}' +# print(f'\t{component.expr} is linear in {name}\n\t\tadd "{str(dexpr_dname)}" to matrix as column {free_parameters.index(name)}') + fx = [(lambda _: ast.eval(str(dexpr_dname)))(ast(f'x={v}')) for v in x] +# print(f'\tfx:\n{fx}') + if len(ast.error): + raise ValueError(f'Unable to evaluate {dexpr_dname}') + A[:,free_parameters.index(name)] += fx +# if self._parameter_norms[name]: +# print(f'\t\t{component.expr} is linear in {name} add "{str(dexpr_dname)}" to matrix as column {free_parameters.index(name)}') +# A[:,free_parameters.index(name)] += fx +# else: +# print(f'\t\t{component.expr} is linear in {name} add "({str(dexpr_dname)})/{norm}" to matrix as column {free_parameters.index(name)}') +# A[:,free_parameters.index(name)] += np.asarray(fx)/norm + # FIX: find another solution if expr not supported by simplify + const_expr = str(simplify(f'({const_expr})/{norm}')) +# print(f'\nconst_expr: {const_expr}') + delta_y_const = [(lambda _: ast.eval(const_expr))(ast(f'x = {v}')) for v in x] + y_const += delta_y_const +# print(f'\ndelta_y_const ({type(delta_y_const)}):\n{delta_y_const}\n') + if len(ast.error): + raise ValueError(f'Unable to evaluate {const_expr}') + else: + free_model_parameters = [name for name in component.param_names + if name in free_parameters or name in expr_parameters] +# print(f'\nBuild-in model ({component}):\nfree_model_parameters: {free_model_parameters}\n') + if not len(free_model_parameters): + y_const += component.eval(params=self._parameters, x=x) + elif isinstance(component, LinearModel): + if f'{component.prefix}slope' in free_model_parameters: + A[:,free_parameters.index(f'{component.prefix}slope')] = x + else: + y_const += self._parameters[f'{component.prefix}slope'].value*x + if f'{component.prefix}intercept' in free_model_parameters: + A[:,free_parameters.index(f'{component.prefix}intercept')] = 1.0 + else: + y_const += self._parameters[f'{component.prefix}intercept'].value* \ + np.ones(len(x)) + elif isinstance(component, QuadraticModel): + if f'{component.prefix}a' in free_model_parameters: + A[:,free_parameters.index(f'{component.prefix}a')] = x**2 + else: + y_const += self._parameters[f'{component.prefix}a'].value*x**2 + if f'{component.prefix}b' in free_model_parameters: + A[:,free_parameters.index(f'{component.prefix}b')] = x + else: + y_const += self._parameters[f'{component.prefix}b'].value*x + if f'{component.prefix}c' in free_model_parameters: + A[:,free_parameters.index(f'{component.prefix}c')] = 1.0 + else: + y_const += self._parameters[f'{component.prefix}c'].value*np.ones(len(x)) + else: + # At this point each build-in model must be strictly proportional to each linear + # model parameter. Without this assumption, the model equation is needed + # For the current build-in lmfit models, this can only ever be the amplitude + assert(len(free_model_parameters) == 1) + name = f'{component.prefix}amplitude' + assert(free_model_parameters[0] == name) + assert(self._parameter_norms[name]) + expr = self._parameters[name].expr + if expr is None: +# print(f'\t{component} is linear in {name} add to matrix as column {free_parameters.index(name)}') + parameters = deepcopy(self._parameters) + parameters[name].set(value=1.0) + index = free_parameters.index(name) + A[:,free_parameters.index(name)] += component.eval(params=parameters, x=x) + else: + const_expr = expr +# print(f'\tconst_expr: {const_expr}') + parameters = deepcopy(self._parameters) + parameters[name].set(value=1.0) + dcomp_dname = component.eval(params=parameters, x=x) +# print(f'\tdcomp_dname ({type(dcomp_dname)}):\n{dcomp_dname}') + for nname in free_parameters: + dexpr_dnname = diff(expr, nname) + if dexpr_dnname: + assert(self._parameter_norms[name]) +# print(f'\t\td({expr})/d{nname} = {dexpr_dnname}') +# print(f'\t\t{component} is linear in {nname} (through {name} = "{expr}", add to matrix as column {free_parameters.index(nname)})') + fx = np.asarray(dexpr_dnname*dcomp_dname, dtype='float64') +# print(f'\t\tfx ({type(fx)}): {fx}') +# print(f'free_parameters.index({nname}): {free_parameters.index(nname)}') + if self._parameter_norms[nname]: + A[:,free_parameters.index(nname)] += fx + else: + A[:,free_parameters.index(nname)] += fx/norm + const_expr = f'{const_expr}-({dexpr_dnname})*{nname}' +# print(f'\t\tconst_expr: {const_expr}') + const_expr = str(simplify(f'({const_expr})/{norm}')) +# print(f'\tconst_expr: {const_expr}') + fx = [(lambda _: ast.eval(const_expr))(ast(f'x = {v}')) for v in x] +# print(f'\tfx: {fx}') + delta_y_const = np.multiply(fx, dcomp_dname) + y_const += delta_y_const +# print(f'\ndelta_y_const ({type(delta_y_const)}):\n{delta_y_const}\n') +# print(A) +# print(y_const) + solution, residual, rank, s = np.linalg.lstsq(A, y-y_const, rcond=None) +# print(f'\nsolution ({type(solution)} {solution.shape}):\n\t{solution}') +# print(f'\nresidual ({type(residual)} {residual.shape}):\n\t{residual}') +# print(f'\nrank ({type(rank)} {rank.shape}):\n\t{rank}') +# print(f'\ns ({type(s)} {s.shape}):\n\t{s}\n') + + # Assemble result (compensate for normalization in expression models) + for name, value in zip(free_parameters, solution): + self._parameters[name].set(value=value) + if self._normalized and (have_expression_model or len(expr_parameters)): + for name, norm in self._parameter_norms.items(): + par = self._parameters[name] + if par.expr is None and norm: + self._parameters[name].set(value=par.value*self._norm[1]) +# self._parameters.pretty_print() +# print(f'\nself._parameter_norms:\n\t{self._parameter_norms}') + self._result = ModelResult(self._model, deepcopy(self._parameters)) + self._result.best_fit = self._model.eval(params=self._parameters, x=x) + if self._normalized and (have_expression_model or len(expr_parameters)): + if 'tmp_normalization_offset_c' in self._parameters: + offset = self._parameters['tmp_normalization_offset_c'] + else: + offset = 0.0 + self._result.best_fit = (self._result.best_fit-offset-self._norm[0])/self._norm[1] + if self._normalized: + for name, norm in self._parameter_norms.items(): + par = self._parameters[name] + if par.expr is None and norm: + value = par.value/self._norm[1] + self._parameters[name].set(value=value) + self._result.params[name].set(value=value) +# self._parameters.pretty_print() + self._result.residual = self._result.best_fit-y + self._result.components = self._model.components + self._result.init_params = None +# quick_plot((x, y, '.'), (x, y_const, 'g'), (x, self._result.best_fit, 'k'), (x, self._result.residual, 'r'), block=True) + + def _normalize(self): + """Normalize the data and initial parameters + """ + if self._normalized: + return + if self._norm is None: + if self._y is not None and self._y_norm is None: + self._y_norm = np.asarray(self._y) + else: + if self._y is not None and self._y_norm is None: + self._y_norm = (np.asarray(self._y)-self._norm[0])/self._norm[1] + self._y_range = 1.0 + for name, norm in self._parameter_norms.items(): + par = self._parameters[name] + if par.expr is None and norm: + value = par.value/self._norm[1] + _min = par.min + _max = par.max + if not np.isinf(_min) and abs(_min) != float_min: + _min /= self._norm[1] + if not np.isinf(_max) and abs(_max) != float_min: + _max /= self._norm[1] + par.set(value=value, min=_min, max=_max) + self._normalized = True + + def _renormalize(self): + """Renormalize the data and results + """ + if self._norm is None or not self._normalized: + return + self._normalized = False + for name, norm in self._parameter_norms.items(): + par = self._parameters[name] + if par.expr is None and norm: + value = par.value*self._norm[1] + _min = par.min + _max = par.max + if not np.isinf(_min) and abs(_min) != float_min: + _min *= self._norm[1] + if not np.isinf(_max) and abs(_max) != float_min: + _max *= self._norm[1] + par.set(value=value, min=_min, max=_max) + if self._result is None: + return + self._result.best_fit = self._result.best_fit*self._norm[1]+self._norm[0] + for name, par in self._result.params.items(): + if self._parameter_norms.get(name, False): + if par.stderr is not None: + par.stderr *= self._norm[1] + if par.expr is None: + _min = par.min + _max = par.max + value = par.value*self._norm[1] + if par.init_value is not None: + par.init_value *= self._norm[1] + if not np.isinf(_min) and abs(_min) != float_min: + _min *= self._norm[1] + if not np.isinf(_max) and abs(_max) != float_min: + _max *= self._norm[1] + par.set(value=value, min=_min, max=_max) + if hasattr(self._result, 'init_fit'): + self._result.init_fit = self._result.init_fit*self._norm[1]+self._norm[0] + if hasattr(self._result, 'init_values'): + init_values = {} + for name, value in self._result.init_values.items(): + if name not in self._parameter_norms or self._parameters[name].expr is not None: + init_values[name] = value + elif self._parameter_norms[name]: + init_values[name] = value*self._norm[1] + self._result.init_values = init_values + for name, par in self._result.init_params.items(): + if par.expr is None and self._parameter_norms.get(name, False): + value = par.value + _min = par.min + _max = par.max + value *= self._norm[1] + if not np.isinf(_min) and abs(_min) != float_min: + _min *= self._norm[1] + if not np.isinf(_max) and abs(_max) != float_min: + _max *= self._norm[1] + par.set(value=value, min=_min, max=_max) + par.init_value = par.value + # Don't renormalize chisqr, it has no useful meaning in physical units + #self._result.chisqr *= self._norm[1]*self._norm[1] + if self._result.covar is not None: + for i, name in enumerate(self._result.var_names): + if self._parameter_norms.get(name, False): + for j in range(len(self._result.var_names)): + if self._result.covar[i,j] is not None: + self._result.covar[i,j] *= self._norm[1] + if self._result.covar[j,i] is not None: + self._result.covar[j,i] *= self._norm[1] + # Don't renormalize redchi, it has no useful meaning in physical units + #self._result.redchi *= self._norm[1]*self._norm[1] + if self._result.residual is not None: + self._result.residual *= self._norm[1] + + def _reset_par_at_boundary(self, par, value): + assert(par.vary) + name = par.name + _min = self._parameter_bounds[name]['min'] + _max = self._parameter_bounds[name]['max'] + if np.isinf(_min): + if not np.isinf(_max): + if self._parameter_norms.get(name, False): + upp = _max-0.1*self._y_range + elif _max == 0.0: + upp = _max-0.1 + else: + upp = _max-0.1*abs(_max) + if value >= upp: + return(upp) + else: + if np.isinf(_max): + if self._parameter_norms.get(name, False): + low = _min+0.1*self._y_range + elif _min == 0.0: + low = _min+0.1 + else: + low = _min+0.1*abs(_min) + if value <= low: + return(low) + else: + low = 0.9*_min+0.1*_max + upp = 0.1*_min+0.9*_max + if value <= low: + return(low) + elif value >= upp: + return(upp) + return(value) class FitMultipeak(Fit): """Fit data with multiple peaks """ - def __init__(self, x, y, normalize=True): - super().__init__(x, deepcopy(y)) - self._norm = None + def __init__(self, y, x=None, normalize=True): + super().__init__(y, x=x, normalize=normalize) self._fwhm_max = None self._sigma_max = None - if normalize: - self._normalize() - #quickPlot((self._x,self._y), block=True) @classmethod - def fit_multipeak(cls, x, y, centers, peak_models='gaussian', center_exprs=None, fit_type=None, - background_order=None, fwhm_max=None, plot_components=None): + def fit_multipeak(cls, y, centers, x=None, normalize=True, peak_models='gaussian', + center_exprs=None, fit_type=None, background_order=None, background_exp=False, + fwhm_max=None, plot_components=False): """Make sure that centers and fwhm_max are in the correct units and consistent with expr for a uniform fit (fit_type == 'uniform') """ - fit = cls(x, y) + fit = cls(y, x=x, normalize=normalize) success = fit.fit(centers, fit_type=fit_type, peak_models=peak_models, fwhm_max=fwhm_max, center_exprs=center_exprs, background_order=background_order, - plot_components=plot_components) + background_exp=background_exp, plot_components=plot_components) if success: - return fit.best_fit, fit.residual, fit.best_values, fit.best_errors, fit.redchi, \ - fit.success + return(fit.best_fit, fit.residual, fit.best_values, fit.best_errors, fit.redchi, \ + fit.success) else: - return np.array([]), np.array([]), {}, {}, sys.float_info.max, False + return(np.array([]), np.array([]), {}, {}, float_max, False) def fit(self, centers, fit_type=None, peak_models=None, center_exprs=None, fwhm_max=None, - background_order=None, plot_components=None, param_constraint=False): + background_order=None, background_exp=False, plot_components=False, + param_constraint=False): self._fwhm_max = fwhm_max # Create the multipeak model self._create_model(centers, fit_type, peak_models, center_exprs, background_order, - param_constraint) + background_exp, param_constraint) + + # RV: Obsolete Normalize the data and results +# print('\nBefore fit before normalization in FitMultipeak:') +# self._parameters.pretty_print() +# self._normalize() +# print('\nBefore fit after normalization in FitMultipeak:') +# self._parameters.pretty_print() # Perform the fit try: if param_constraint: - super().fit(fit_kws={'xtol' : 1.e-5, 'ftol' : 1.e-5, 'gtol' : 1.e-5}) + super().fit(fit_kws={'xtol': 1.e-5, 'ftol': 1.e-5, 'gtol': 1.e-5}) else: super().fit() except: - return False + return(False) # Check for valid fit parameter results fit_failure = self._check_validity() @@ -447,21 +1527,27 @@ else: logging.info(' -> Retry fitting with constraints') self.fit(centers, fit_type, peak_models, center_exprs, fwhm_max=fwhm_max, - background_order=background_order, plot_components=plot_components, - param_constraint=True) + background_order=background_order, background_exp=background_exp, + plot_components=plot_components, param_constraint=True) else: - # Renormalize the data and results - self._renormalize() + # RV: Obsolete Renormalize the data and results +# print('\nAfter fit before renormalization in FitMultipeak:') +# self._parameters.pretty_print() +# self.print_fit_report() +# self._renormalize() +# print('\nAfter fit after renormalization in FitMultipeak:') +# self._parameters.pretty_print() +# self.print_fit_report() # Print report and plot components if requested - if plot_components is not None: + if plot_components: self.print_fit_report() self.plot() - return success + return(success) def _create_model(self, centers, fit_type=None, peak_models=None, center_exprs=None, - background_order=None, param_constraint=False): + background_order=None, background_exp=False, param_constraint=False): """Create the multipeak model """ if isinstance(centers, (int, float)): @@ -476,10 +1562,10 @@ f'{num_peaks})') if num_peaks == 1: if fit_type is not None: - logging.warning('Ignoring fit_type input for fitting one peak') + logging.debug('Ignoring fit_type input for fitting one peak') fit_type = None if center_exprs is not None: - logging.warning('Ignoring center_exprs input for fitting one peak') + logging.debug('Ignoring center_exprs input for fitting one peak') center_exprs = None else: if fit_type == 'uniform': @@ -493,10 +1579,10 @@ logging.warning('Ignoring center_exprs input for unconstrained fit') center_exprs = None else: - raise ValueError(f'Illegal fit_type in fit_multigaussian {fit_type}') + raise ValueError(f'Invalid fit_type in fit_multigaussian {fit_type}') self._sigma_max = None if param_constraint: - min_value = sys.float_info.min + min_value = float_min if self._fwhm_max is not None: self._sigma_max = np.zeros(num_peaks) else: @@ -510,13 +1596,18 @@ # Add background model if background_order is not None: if background_order == 0: - self.add_model('constant', prefix='background', c=0.0) + self.add_model('constant', prefix='background', parameters= + {'name': 'c', 'value': float_min, 'min': min_value}) elif background_order == 1: self.add_model('linear', prefix='background', slope=0.0, intercept=0.0) elif background_order == 2: self.add_model('quadratic', prefix='background', a=0.0, b=0.0, c=0.0) else: - raise ValueError(f'background_order = {background_order}') + raise ValueError(f'Invalid parameter background_order ({background_order})') + if background_exp: + self.add_model('exponential', prefix='background', parameters=( + {'name': 'amplitude', 'value': float_min, 'min': min_value}, + {'name': 'decay', 'value': float_min, 'min': min_value})) # Add peaks and guess initial fit parameters ast = Interpreter() @@ -534,9 +1625,9 @@ sig_max = ast(fwhm_factor[peak_models[0]]) self._sigma_max[0] = sig_max self.add_model(peak_models[0], parameters=( - {'name' : 'amplitude', 'value' : amp_init, 'min' : min_value}, - {'name' : 'center', 'value' : cen_init, 'min' : min_value}, - {'name' : 'sigma', 'value' : sig_init, 'min' : min_value, 'max' : sig_max})) + {'name': 'amplitude', 'value': amp_init, 'min': min_value}, + {'name': 'center', 'value': cen_init, 'min': min_value}, + {'name': 'sigma', 'value': sig_init, 'min': min_value, 'max': sig_max})) else: if fit_type == 'uniform': self.add_parameter(name='scale_factor', value=1.0) @@ -556,107 +1647,921 @@ self._sigma_max[i] = sig_max if fit_type == 'uniform': self.add_model(peak_models[i], prefix=f'peak{i+1}_', parameters=( - {'name' : 'amplitude', 'value' : amp_init, 'min' : min_value}, - {'name' : 'center', 'expr' : center_exprs[i], 'min' : min_value}, - {'name' : 'sigma', 'value' : sig_init, 'min' : min_value, - 'max' : sig_max})) + {'name': 'amplitude', 'value': amp_init, 'min': min_value}, + {'name': 'center', 'expr': center_exprs[i]}, + {'name': 'sigma', 'value': sig_init, 'min': min_value, + 'max': sig_max})) else: self.add_model('gaussian', prefix=f'peak{i+1}_', parameters=( - {'name' : 'amplitude', 'value' : amp_init, 'min' : min_value}, - {'name' : 'center', 'value' : cen_init, 'min' : min_value}, - {'name' : 'sigma', 'value' : sig_init, 'min' : min_value, - 'max' : sig_max})) + {'name': 'amplitude', 'value': amp_init, 'min': min_value}, + {'name': 'center', 'value': cen_init, 'min': min_value}, + {'name': 'sigma', 'value': sig_init, 'min': min_value, + 'max': sig_max})) def _check_validity(self): """Check for valid fit parameter results """ fit_failure = False - index = re.compile(r'\d+') - for parameter in self.best_parameters: - name = parameter['name'] - if ((('amplitude' in name or 'height' in name) and parameter['value'] <= 0.0) or - (('sigma' in name or 'fwhm' in name) and parameter['value'] <= 0.0) or - ('center' in name and parameter['value'] <= 0.0) or - (name == 'scale_factor' and parameter['value'] <= 0.0)): - logging.info(f'Invalid fit result for {name} ({parameter["value"]})') + index = compile(r'\d+') + for name, par in self.best_parameters.items(): + if 'background' in name: +# if ((name == 'backgroundc' and par['value'] <= 0.0) or +# (name.endswith('amplitude') and par['value'] <= 0.0) or + if ((name.endswith('amplitude') and par['value'] <= 0.0) or + (name.endswith('decay') and par['value'] <= 0.0)): + logging.info(f'Invalid fit result for {name} ({par["value"]})') + fit_failure = True + elif (((name.endswith('amplitude') or name.endswith('height')) and + par['value'] <= 0.0) or + ((name.endswith('sigma') or name.endswith('fwhm')) and par['value'] <= 0.0) or + (name.endswith('center') and par['value'] <= 0.0) or + (name == 'scale_factor' and par['value'] <= 0.0)): + logging.info(f'Invalid fit result for {name} ({par["value"]})') fit_failure = True - if 'sigma' in name and self._sigma_max is not None: + if name.endswith('sigma') and self._sigma_max is not None: if name == 'sigma': sigma_max = self._sigma_max[0] else: sigma_max = self._sigma_max[int(index.search(name).group())-1] - i = int(index.search(name).group())-1 - if parameter['value'] > sigma_max: - logging.info(f'Invalid fit result for {name} ({parameter["value"]})') + if par['value'] > sigma_max: + logging.info(f'Invalid fit result for {name} ({par["value"]})') + fit_failure = True + elif par['value'] == sigma_max: + logging.warning(f'Edge result on for {name} ({par["value"]})') + if name.endswith('fwhm') and self._fwhm_max is not None: + if par['value'] > self._fwhm_max: + logging.info(f'Invalid fit result for {name} ({par["value"]})') fit_failure = True - elif parameter['value'] == sigma_max: - logging.warning(f'Edge result on for {name} ({parameter["value"]})') - if 'fwhm' in name and self._fwhm_max is not None: - if parameter['value'] > self._fwhm_max: - logging.info(f'Invalid fit result for {name} ({parameter["value"]})') - fit_failure = True - elif parameter['value'] == self._fwhm_max: - logging.warning(f'Edge result on for {name} ({parameter["value"]})') - return fit_failure + elif par['value'] == self._fwhm_max: + logging.warning(f'Edge result on for {name} ({par["value"]})') + return(fit_failure) + + +class FitMap(Fit): + """Fit a map of data + """ + def __init__(self, ymap, x=None, models=None, normalize=True, transpose=None, **kwargs): + super().__init__(None) + self._best_errors = None + self._best_fit = None + self._best_parameters = None + self._best_values = None + self._inv_transpose = None + self._max_nfev = None + self._memfolder = None + self._new_parameters = None + self._out_of_bounds = None + self._plot = False + self._print_report = False + self._redchi = None + self._redchi_cutoff = 0.1 + self._skip_init = True + self._success = None + self._transpose = None + self._try_no_bounds = True + + # At this point the fastest index should always be the signal dimension so that the slowest + # ndim-1 dimensions are the map dimensions + if isinstance(ymap, (tuple, list, np.ndarray)): + self._x = np.asarray(x) + elif have_xarray and isinstance(ymap, xr.DataArray): + if x is not None: + logging.warning('Ignoring superfluous input x ({x}) in Fit.__init__') + self._x = np.asarray(ymap[ymap.dims[-1]]) + else: + illegal_value(ymap, 'ymap', 'FitMap:__init__', raise_error=True) + self._ymap = ymap + + # Verify the input parameters + if self._x.ndim != 1: + raise ValueError(f'Invalid dimension for input x {self._x.ndim}') + if self._ymap.ndim < 2: + raise ValueError('Invalid number of dimension of the input dataset '+ + f'{self._ymap.ndim}') + if self._x.size != self._ymap.shape[-1]: + raise ValueError(f'Inconsistent x and y dimensions ({self._x.size} vs '+ + f'{self._ymap.shape[-1]})') + if not isinstance(normalize, bool): + logging.warning(f'Invalid value for normalize ({normalize}) in Fit.__init__: '+ + 'setting normalize to True') + normalize = True + if isinstance(transpose, bool) and not transpose: + transpose = None + if transpose is not None and self._ymap.ndim < 3: + logging.warning(f'Transpose meaningless for {self._ymap.ndim-1}D data maps: ignoring '+ + 'transpose') + if transpose is not None: + if self._ymap.ndim == 3 and isinstance(transpose, bool) and transpose: + self._transpose = (1, 0) + elif not isinstance(transpose, (tuple, list)): + logging.warning(f'Invalid data type for transpose ({transpose}, '+ + f'{type(transpose)}) in Fit.__init__: setting transpose to False') + elif len(transpose) != self._ymap.ndim-1: + logging.warning(f'Invalid dimension for transpose ({transpose}, must be equal to '+ + f'{self._ymap.ndim-1}) in Fit.__init__: setting transpose to False') + elif any(i not in transpose for i in range(len(transpose))): + logging.warning(f'Invalid index in transpose ({transpose}) '+ + f'in Fit.__init__: setting transpose to False') + elif not all(i==transpose[i] for i in range(self._ymap.ndim-1)): + self._transpose = transpose + if self._transpose is not None: + self._inv_transpose = tuple(self._transpose.index(i) + for i in range(len(self._transpose))) + + # Flatten the map (transpose if requested) + # Store the flattened map in self._ymap_norm, whether normalized or not + if self._transpose is not None: + self._ymap_norm = np.transpose(np.asarray(self._ymap), list(self._transpose)+ + [len(self._transpose)]) + else: + self._ymap_norm = np.asarray(self._ymap) + self._map_dim = int(self._ymap_norm.size/self._x.size) + self._map_shape = self._ymap_norm.shape[:-1] + self._ymap_norm = np.reshape(self._ymap_norm, (self._map_dim, self._x.size)) + + # Check if a mask is provided + if 'mask' in kwargs: + self._mask = kwargs.pop('mask') + if self._mask is None: + ymap_min = float(self._ymap_norm.min()) + ymap_max = float(self._ymap_norm.max()) + else: + self._mask = np.asarray(self._mask).astype(bool) + if self._x.size != self._mask.size: + raise ValueError(f'Inconsistent mask dimension ({self._x.size} vs '+ + f'{self._mask.size})') + ymap_masked = np.asarray(self._ymap_norm)[:,~self._mask] + ymap_min = float(ymap_masked.min()) + ymap_max = float(ymap_masked.max()) + + # Normalize the data + self._y_range = ymap_max-ymap_min + if normalize and self._y_range > 0.0: + self._norm = (ymap_min, self._y_range) + self._ymap_norm = (self._ymap_norm-self._norm[0])/self._norm[1] + else: + self._redchi_cutoff *= self._y_range**2 + if models is not None: + if callable(models) or isinstance(models, str): + kwargs = self.add_model(models, **kwargs) + elif isinstance(models, (tuple, list)): + for model in models: + kwargs = self.add_model(model, **kwargs) + self.fit(**kwargs) + + @classmethod + def fit_map(cls, x, ymap, models, normalize=True, **kwargs): + return(cls(x, ymap, models, normalize=normalize, **kwargs)) + + @property + def best_errors(self): + return(self._best_errors) + + @property + def best_fit(self): + return(self._best_fit) - def _normalize(self): - """Normalize the data + @property + def best_results(self): + """Convert the input data array to a data set and add the fit results. """ - y_min = self._y.min() - self._norm = (y_min, self._y.max()-y_min) - if self._norm[1] == 0.0: - self._norm = None + if self.best_values is None or self.best_errors is None or self.best_fit is None: + return(None) + if not have_xarray: + logging.warning('Unable to load xarray module') + return(None) + best_values = self.best_values + best_errors = self.best_errors + if isinstance(self._ymap, xr.DataArray): + best_results = self._ymap.to_dataset() + dims = self._ymap.dims + fit_name = f'{self._ymap.name}_fit' else: - self._y = (self._y-self._norm[0])/self._norm[1] + coords = {f'dim{n}_index':([f'dim{n}_index'], range(self._ymap.shape[n])) + for n in range(self._ymap.ndim-1)} + coords['x'] = (['x'], self._x) + dims = list(coords.keys()) + best_results = xr.Dataset(coords=coords) + best_results['y'] = (dims, self._ymap) + fit_name = 'y_fit' + best_results[fit_name] = (dims, self.best_fit) + if self._mask is not None: + best_results['mask'] = self._mask + for n in range(best_values.shape[0]): + best_results[f'{self._best_parameters[n]}_values'] = (dims[:-1], best_values[n]) + best_results[f'{self._best_parameters[n]}_errors'] = (dims[:-1], best_errors[n]) + best_results.attrs['components'] = self.components + return(best_results) + + @property + def best_values(self): + return(self._best_values) + + @property + def chisqr(self): + logging.warning('property chisqr not defined for fit.FitMap') + return(None) + + @property + def components(self): + components = {} + if self._result is None: + logging.warning('Unable to collect components in FitMap.components') + return(components) + for component in self._result.components: + if 'tmp_normalization_offset_c' in component.param_names: + continue + parameters = {} + for name in component.param_names: + if self._parameters[name].vary: + parameters[name] = {'free': True} + elif self._parameters[name].expr is not None: + parameters[name] = {'free': False, 'expr': self._parameters[name].expr} + else: + parameters[name] = {'free': False, 'value': self.init_parameters[name]['value']} + expr = None + if isinstance(component, ExpressionModel): + name = component._name + if name[-1] == '_': + name = name[:-1] + expr = component.expr + else: + prefix = component.prefix + if len(prefix): + if prefix[-1] == '_': + prefix = prefix[:-1] + name = f'{prefix} ({component._name})' + else: + name = f'{component._name}' + if expr is None: + components[name] = {'parameters': parameters} + else: + components[name] = {'expr': expr, 'parameters': parameters} + return(components) + + @property + def covar(self): + logging.warning('property covar not defined for fit.FitMap') + return(None) + + @property + def max_nfev(self): + return(self._max_nfev) + + @property + def num_func_eval(self): + logging.warning('property num_func_eval not defined for fit.FitMap') + return(None) - def _renormalize(self): - """Renormalize the data and results - """ - if self._norm is None: + @property + def out_of_bounds(self): + return(self._out_of_bounds) + + @property + def redchi(self): + return(self._redchi) + + @property + def residual(self): + if self.best_fit is None: + return(None) + if self._mask is None: + return(np.asarray(self._ymap)-self.best_fit) + else: + ymap_flat = np.reshape(np.asarray(self._ymap), (self._map_dim, self._x.size)) + ymap_flat_masked = ymap_flat[:,~self._mask] + ymap_masked = np.reshape(ymap_flat_masked, + list(self._map_shape)+[ymap_flat_masked.shape[-1]]) + return(ymap_masked-self.best_fit) + + @property + def success(self): + return(self._success) + + @property + def var_names(self): + logging.warning('property var_names not defined for fit.FitMap') + return(None) + + @property + def y(self): + logging.warning('property y not defined for fit.FitMap') + return(None) + + @property + def ymap(self): + return(self._ymap) + + def best_parameters(self, dims=None): + if dims is None: + return(self._best_parameters) + if not isinstance(dims, (list, tuple)) or len(dims) != len(self._map_shape): + illegal_value(dims, 'dims', 'FitMap.best_parameters', raise_error=True) + if self.best_values is None or self.best_errors is None: + logging.warning(f'Unable to obtain best parameter values for dims = {dims} in '+ + 'FitMap.best_parameters') + return({}) + # Create current parameters + parameters = deepcopy(self._parameters) + for n, name in enumerate(self._best_parameters): + if self._parameters[name].vary: + parameters[name].set(value=self.best_values[n][dims]) + parameters[name].stderr = self.best_errors[n][dims] + parameters_dict = {} + for name in sorted(parameters): + if name != 'tmp_normalization_offset_c': + par = parameters[name] + parameters_dict[name] = {'value': par.value, 'error': par.stderr, + 'init_value': self.init_parameters[name]['value'], 'min': par.min, + 'max': par.max, 'vary': par.vary, 'expr': par.expr} + return(parameters_dict) + + def freemem(self): + if self._memfolder is None: + return + try: + rmtree(self._memfolder) + self._memfolder = None + except: + logging.warning('Could not clean-up automatically.') + + def plot(self, dims, y_title=None, plot_residual=False, plot_comp_legends=False, + plot_masked_data=True): + if not isinstance(dims, (list, tuple)) or len(dims) != len(self._map_shape): + illegal_value(dims, 'dims', 'FitMap.plot', raise_error=True) + if self._result is None or self.best_fit is None or self.best_values is None: + logging.warning(f'Unable to plot fit for dims = {dims} in FitMap.plot') return - self._y = self._norm[0]+self._norm[1]*self._y - self._result.best_fit = self._norm[0]+self._norm[1]*self._result.best_fit - for name in self._result.params: - par = self._result.params[name] - if 'amplitude' in name or 'height' in name or 'background' in name: - par.value *= self._norm[1] - if par.stderr is not None: - par.stderr *= self._norm[1] - if par.init_value is not None: - par.init_value *= self._norm[1] - if par.min is not None and not np.isinf(par.min): - par.min *= self._norm[1] - if par.max is not None and not np.isinf(par.max): - par.max *= self._norm[1] - if 'intercept' in name or 'backgroundc' in name: - par.value += self._norm[0] - if par.init_value is not None: - par.init_value += self._norm[0] - if par.min is not None and not np.isinf(par.min): - par.min += self._norm[0] - if par.max is not None and not np.isinf(par.max): - par.max += self._norm[0] - self._result.init_fit = self._norm[0]+self._norm[1]*self._result.init_fit - init_values = {} - for name in self._result.init_values: - init_values[name] = self._result.init_values[name] - if init_values[name] is None: + if y_title is None or not isinstance(y_title, str): + y_title = 'data' + if self._mask is None: + mask = np.zeros(self._x.size).astype(bool) + plot_masked_data = False + else: + mask = self._mask + plots = [(self._x, np.asarray(self._ymap[dims]), 'b.')] + legend = [y_title] + if plot_masked_data: + plots += [(self._x[mask], np.asarray(self._ymap)[(*dims,mask)], 'bx')] + legend += ['masked data'] + plots += [(self._x[~mask], self.best_fit[dims], 'k-')] + legend += ['best fit'] + if plot_residual: + plots += [(self._x[~mask], self.residual[dims], 'k--')] + legend += ['residual'] + # Create current parameters + parameters = deepcopy(self._parameters) + for name in self._best_parameters: + if self._parameters[name].vary: + parameters[name].set(value= + self.best_values[self._best_parameters.index(name)][dims]) + for component in self._result.components: + if 'tmp_normalization_offset_c' in component.param_names: continue - if 'amplitude' in name or 'height' in name or 'background' in name: - init_values[name] *= self._norm[1] - if 'intercept' in name or 'backgroundc' in name: - init_values[name] += self._norm[0] - self._result.init_values = init_values - # Don't renormalized chisqr, it has no useful meaning in physical units - #self._result.chisqr *= self._norm[1]*self._norm[1] - if self._result.covar is not None: - for i, name in enumerate(self._result.var_names): - if 'amplitude' in name or 'height' in name or 'background' in name: - for j in range(len(self._result.var_names)): - if self._result.covar[i,j] is not None: - self._result.covar[i,j] *= self._norm[1] - if self._result.covar[j,i] is not None: - self._result.covar[j,i] *= self._norm[1] - # Don't renormalized redchi, it has no useful meaning in physical units - #self._result.redchi *= self._norm[1]*self._norm[1] - self._result.residual *= self._norm[1] + if isinstance(component, ExpressionModel): + prefix = component._name + if prefix[-1] == '_': + prefix = prefix[:-1] + modelname = f'{prefix}: {component.expr}' + else: + prefix = component.prefix + if len(prefix): + if prefix[-1] == '_': + prefix = prefix[:-1] + modelname = f'{prefix} ({component._name})' + else: + modelname = f'{component._name}' + if len(modelname) > 20: + modelname = f'{modelname[0:16]} ...' + y = component.eval(params=parameters, x=self._x[~mask]) + if isinstance(y, (int, float)): + y *= np.ones(self._x[~mask].size) + plots += [(self._x[~mask], y, '--')] + if plot_comp_legends: + legend.append(modelname) + quick_plot(tuple(plots), legend=legend, title=str(dims), block=True) + + def fit(self, **kwargs): +# t0 = time() + # Check input parameters + if self._model is None: + logging.error('Undefined fit model') + if 'num_proc' in kwargs: + num_proc = kwargs.pop('num_proc') + if not is_int(num_proc, ge=1): + illegal_value(num_proc, 'num_proc', 'FitMap.fit', raise_error=True) + else: + num_proc = cpu_count() + if num_proc > 1 and not have_joblib: + logging.warning(f'Missing joblib in the conda environment, running FitMap serially') + num_proc = 1 + if num_proc > cpu_count(): + logging.warning(f'The requested number of processors ({num_proc}) exceeds the maximum '+ + f'number of processors, num_proc reduced to ({cpu_count()})') + num_proc = cpu_count() + if 'try_no_bounds' in kwargs: + self._try_no_bounds = kwargs.pop('try_no_bounds') + if not isinstance(self._try_no_bounds, bool): + illegal_value(self._try_no_bounds, 'try_no_bounds', 'FitMap.fit', raise_error=True) + if 'redchi_cutoff' in kwargs: + self._redchi_cutoff = kwargs.pop('redchi_cutoff') + if not is_num(self._redchi_cutoff, gt=0): + illegal_value(self._redchi_cutoff, 'redchi_cutoff', 'FitMap.fit', raise_error=True) + if 'print_report' in kwargs: + self._print_report = kwargs.pop('print_report') + if not isinstance(self._print_report, bool): + illegal_value(self._print_report, 'print_report', 'FitMap.fit', raise_error=True) + if 'plot' in kwargs: + self._plot = kwargs.pop('plot') + if not isinstance(self._plot, bool): + illegal_value(self._plot, 'plot', 'FitMap.fit', raise_error=True) + if 'skip_init' in kwargs: + self._skip_init = kwargs.pop('skip_init') + if not isinstance(self._skip_init, bool): + illegal_value(self._skip_init, 'skip_init', 'FitMap.fit', raise_error=True) + + # Apply mask if supplied: + if 'mask' in kwargs: + self._mask = kwargs.pop('mask') + if self._mask is not None: + self._mask = np.asarray(self._mask).astype(bool) + if self._x.size != self._mask.size: + raise ValueError(f'Inconsistent x and mask dimensions ({self._x.size} vs '+ + f'{self._mask.size})') + + # Add constant offset for a normalized single component model + if self._result is None and self._norm is not None and self._norm[0]: + self.add_model('constant', prefix='tmp_normalization_offset_', parameters={'name': 'c', + 'value': -self._norm[0], 'vary': False, 'norm': True}) + #'value': -self._norm[0]/self._norm[1], 'vary': False, 'norm': False}) + + # Adjust existing parameters for refit: + if 'parameters' in kwargs: +# print('\nIn FitMap before adjusting existing parameters for refit:') +# self._parameters.pretty_print() +# if self._result is None: +# raise ValueError('Invalid parameter parameters ({parameters})') +# if self._best_values is None: +# raise ValueError('Valid self._best_values required for refitting in FitMap.fit') + parameters = kwargs.pop('parameters') +# print(f'\nparameters:\n{parameters}') + if isinstance(parameters, dict): + parameters = (parameters, ) + elif not is_dict_series(parameters): + illegal_value(parameters, 'parameters', 'Fit.fit', raise_error=True) + for par in parameters: + name = par['name'] + if name not in self._parameters: + raise ValueError(f'Unable to match {name} parameter {par} to an existing one') + if self._parameters[name].expr is not None: + raise ValueError(f'Unable to modify {name} parameter {par} (currently an '+ + 'expression)') + value = par.get('value') + vary = par.get('vary') + if par.get('expr') is not None: + raise KeyError(f'Illegal "expr" key in {name} parameter {par}') + self._parameters[name].set(value=value, vary=vary, min=par.get('min'), + max=par.get('max')) + # Overwrite existing best values for fixed parameters when a value is specified +# print(f'best values befored resetting:\n{self._best_values}') + if isinstance(value, (int, float)) and vary is False: + for i, nname in enumerate(self._best_parameters): + if nname == name: + self._best_values[i] = value +# print(f'best values after resetting (value={value}, vary={vary}):\n{self._best_values}') +#RV print('\nIn FitMap after adjusting existing parameters for refit:') +#RV self._parameters.pretty_print() + + # Check for uninitialized parameters + for name, par in self._parameters.items(): + if par.expr is None: + value = par.value + if value is None or np.isinf(value) or np.isnan(value): + value = 1.0 + if self._norm is None or name not in self._parameter_norms: + self._parameters[name].set(value=value) + elif self._parameter_norms[name]: + self._parameters[name].set(value=value*self._norm[1]) + + # Create the best parameter list, consisting of all varying parameters plus the expression + # parameters in order to collect their errors + if self._result is None: + # Initial fit + assert(self._best_parameters is None) + self._best_parameters = [name for name, par in self._parameters.items() + if par.vary or par.expr is not None] + num_new_parameters = 0 + else: + # Refit + assert(len(self._best_parameters)) + self._new_parameters = [name for name, par in self._parameters.items() + if name != 'tmp_normalization_offset_c' and name not in self._best_parameters and + (par.vary or par.expr is not None)] + num_new_parameters = len(self._new_parameters) + num_best_parameters = len(self._best_parameters) + + # Flatten and normalize the best values of the previous fit, remove the remaining results + # of the previous fit + if self._result is not None: +# print('\nBefore flatten and normalize:') +# print(f'self._best_values:\n{self._best_values}') + self._out_of_bounds = None + self._max_nfev = None + self._redchi = None + self._success = None + self._best_fit = None + self._best_errors = None + assert(self._best_values is not None) + assert(self._best_values.shape[0] == num_best_parameters) + assert(self._best_values.shape[1:] == self._map_shape) + if self._transpose is not None: + self._best_values = np.transpose(self._best_values, + [0]+[i+1 for i in self._transpose]) + self._best_values = [np.reshape(self._best_values[i], self._map_dim) + for i in range(num_best_parameters)] + if self._norm is not None: + for i, name in enumerate(self._best_parameters): + if self._parameter_norms.get(name, False): + self._best_values[i] /= self._norm[1] +#RV print('\nAfter flatten and normalize:') +#RV print(f'self._best_values:\n{self._best_values}') + + # Normalize the initial parameters (and best values for a refit) +# print('\nIn FitMap before normalize:') +# self._parameters.pretty_print() +# print(f'\nparameter_norms:\n{self._parameter_norms}\n') + self._normalize() +# print('\nIn FitMap after normalize:') +# self._parameters.pretty_print() +# print(f'\nparameter_norms:\n{self._parameter_norms}\n') + + # Prevent initial values from sitting at boundaries + self._parameter_bounds = {name:{'min': par.min, 'max': par.max} + for name, par in self._parameters.items() if par.vary} + for name, par in self._parameters.items(): + if par.vary: + par.set(value=self._reset_par_at_boundary(par, par.value)) +# print('\nAfter checking boundaries:') +# self._parameters.pretty_print() + + # Set parameter bounds to unbound (only use bounds when fit fails) + if self._try_no_bounds: + for name in self._parameter_bounds.keys(): + self._parameters[name].set(min=-np.inf, max=np.inf) + + # Allocate memory to store fit results + if self._mask is None: + x_size = self._x.size + else: + x_size = self._x[~self._mask].size + if num_proc == 1: + self._out_of_bounds_flat = np.zeros(self._map_dim, dtype=bool) + self._max_nfev_flat = np.zeros(self._map_dim, dtype=bool) + self._redchi_flat = np.zeros(self._map_dim, dtype=np.float64) + self._success_flat = np.zeros(self._map_dim, dtype=bool) + self._best_fit_flat = np.zeros((self._map_dim, x_size), + dtype=self._ymap_norm.dtype) + self._best_errors_flat = [np.zeros(self._map_dim, dtype=np.float64) + for _ in range(num_best_parameters+num_new_parameters)] + if self._result is None: + self._best_values_flat = [np.zeros(self._map_dim, dtype=np.float64) + for _ in range(num_best_parameters)] + else: + self._best_values_flat = self._best_values + self._best_values_flat += [np.zeros(self._map_dim, dtype=np.float64) + for _ in range(num_new_parameters)] + else: + self._memfolder = './joblib_memmap' + try: + mkdir(self._memfolder) + except FileExistsError: + pass + filename_memmap = path.join(self._memfolder, 'out_of_bounds_memmap') + self._out_of_bounds_flat = np.memmap(filename_memmap, dtype=bool, + shape=(self._map_dim), mode='w+') + filename_memmap = path.join(self._memfolder, 'max_nfev_memmap') + self._max_nfev_flat = np.memmap(filename_memmap, dtype=bool, + shape=(self._map_dim), mode='w+') + filename_memmap = path.join(self._memfolder, 'redchi_memmap') + self._redchi_flat = np.memmap(filename_memmap, dtype=np.float64, + shape=(self._map_dim), mode='w+') + filename_memmap = path.join(self._memfolder, 'success_memmap') + self._success_flat = np.memmap(filename_memmap, dtype=bool, + shape=(self._map_dim), mode='w+') + filename_memmap = path.join(self._memfolder, 'best_fit_memmap') + self._best_fit_flat = np.memmap(filename_memmap, dtype=self._ymap_norm.dtype, + shape=(self._map_dim, x_size), mode='w+') + self._best_errors_flat = [] + for i in range(num_best_parameters+num_new_parameters): + filename_memmap = path.join(self._memfolder, f'best_errors_memmap_{i}') + self._best_errors_flat.append(np.memmap(filename_memmap, dtype=np.float64, + shape=self._map_dim, mode='w+')) + self._best_values_flat = [] + for i in range(num_best_parameters): + filename_memmap = path.join(self._memfolder, f'best_values_memmap_{i}') + self._best_values_flat.append(np.memmap(filename_memmap, dtype=np.float64, + shape=self._map_dim, mode='w+')) + if self._result is not None: + self._best_values_flat[i][:] = self._best_values[i][:] + for i in range(num_new_parameters): + filename_memmap = path.join(self._memfolder, + f'best_values_memmap_{i+num_best_parameters}') + self._best_values_flat.append(np.memmap(filename_memmap, dtype=np.float64, + shape=self._map_dim, mode='w+')) + + # Update the best parameter list + if num_new_parameters: + self._best_parameters += self._new_parameters + + # Perform the first fit to get model component info and initial parameters + current_best_values = {} +# print(f'0 before:\n{current_best_values}') +# t1 = time() + self._result = self._fit(0, current_best_values, return_result=True, **kwargs) +# t2 = time() +# print(f'0 after:\n{current_best_values}') +# print('\nAfter the first fit:') +# self._parameters.pretty_print() +# print(self._result.fit_report(show_correl=False)) + + # Remove all irrelevant content from self._result + for attr in ('_abort', 'aborted', 'aic', 'best_fit', 'best_values', 'bic', 'calc_covar', + 'call_kws', 'chisqr', 'ci_out', 'col_deriv', 'covar', 'data', 'errorbars', + 'flatchain', 'ier', 'init_vals', 'init_fit', 'iter_cb', 'jacfcn', 'kws', + 'last_internal_values', 'lmdif_message', 'message', 'method', 'nan_policy', + 'ndata', 'nfev', 'nfree', 'params', 'redchi', 'reduce_fcn', 'residual', 'result', + 'scale_covar', 'show_candidates', 'calc_covar', 'success', 'userargs', 'userfcn', + 'userkws', 'values', 'var_names', 'weights', 'user_options'): + try: + delattr(self._result, attr) + except AttributeError: +# logging.warning(f'Unknown attribute {attr} in fit.FtMap._cleanup_result') + pass + +# t3 = time() + if num_proc == 1: + # Perform the remaining fits serially + for n in range(1, self._map_dim): +# print(f'{n} before:\n{current_best_values}') + self._fit(n, current_best_values, **kwargs) +# print(f'{n} after:\n{current_best_values}') + else: + # Perform the remaining fits in parallel + num_fit = self._map_dim-1 +# print(f'num_fit = {num_fit}') + if num_proc > num_fit: + logging.warning(f'The requested number of processors ({num_proc}) exceeds the '+ + f'number of fits, num_proc reduced to ({num_fit})') + num_proc = num_fit + num_fit_per_proc = 1 + else: + num_fit_per_proc = round((num_fit)/num_proc) + if num_proc*num_fit_per_proc < num_fit: + num_fit_per_proc +=1 +# print(f'num_fit_per_proc = {num_fit_per_proc}') + num_fit_batch = min(num_fit_per_proc, 40) +# print(f'num_fit_batch = {num_fit_batch}') + with Parallel(n_jobs=num_proc) as parallel: + parallel(delayed(self._fit_parallel)(current_best_values, num_fit_batch, + n_start, **kwargs) for n_start in range(1, self._map_dim, num_fit_batch)) +# t4 = time() + + # Renormalize the initial parameters for external use + if self._norm is not None and self._normalized: + init_values = {} + for name, value in self._result.init_values.items(): + if name not in self._parameter_norms or self._parameters[name].expr is not None: + init_values[name] = value + elif self._parameter_norms[name]: + init_values[name] = value*self._norm[1] + self._result.init_values = init_values + for name, par in self._result.init_params.items(): + if par.expr is None and self._parameter_norms.get(name, False): + _min = par.min + _max = par.max + value = par.value*self._norm[1] + if not np.isinf(_min) and abs(_min) != float_min: + _min *= self._norm[1] + if not np.isinf(_max) and abs(_max) != float_min: + _max *= self._norm[1] + par.set(value=value, min=_min, max=_max) + par.init_value = par.value + + # Remap the best results +# t5 = time() + self._out_of_bounds = np.copy(np.reshape(self._out_of_bounds_flat, self._map_shape)) + self._max_nfev = np.copy(np.reshape(self._max_nfev_flat, self._map_shape)) + self._redchi = np.copy(np.reshape(self._redchi_flat, self._map_shape)) + self._success = np.copy(np.reshape(self._success_flat, self._map_shape)) + self._best_fit = np.copy(np.reshape(self._best_fit_flat, + list(self._map_shape)+[x_size])) + self._best_values = np.asarray([np.reshape(par, list(self._map_shape)) + for par in self._best_values_flat]) + self._best_errors = np.asarray([np.reshape(par, list(self._map_shape)) + for par in self._best_errors_flat]) + if self._inv_transpose is not None: + self._out_of_bounds = np.transpose(self._out_of_bounds, self._inv_transpose) + self._max_nfev = np.transpose(self._max_nfev, self._inv_transpose) + self._redchi = np.transpose(self._redchi, self._inv_transpose) + self._success = np.transpose(self._success, self._inv_transpose) + self._best_fit = np.transpose(self._best_fit, + list(self._inv_transpose)+[len(self._inv_transpose)]) + self._best_values = np.transpose(self._best_values, + [0]+[i+1 for i in self._inv_transpose]) + self._best_errors = np.transpose(self._best_errors, + [0]+[i+1 for i in self._inv_transpose]) + del self._out_of_bounds_flat + del self._max_nfev_flat + del self._redchi_flat + del self._success_flat + del self._best_fit_flat + del self._best_values_flat + del self._best_errors_flat +# t6 = time() + + # Restore parameter bounds and renormalize the parameters + for name, par in self._parameter_bounds.items(): + self._parameters[name].set(min=par['min'], max=par['max']) + self._normalized = False + if self._norm is not None: + for name, norm in self._parameter_norms.items(): + par = self._parameters[name] + if par.expr is None and norm: + value = par.value*self._norm[1] + _min = par.min + _max = par.max + if not np.isinf(_min) and abs(_min) != float_min: + _min *= self._norm[1] + if not np.isinf(_max) and abs(_max) != float_min: + _max *= self._norm[1] + par.set(value=value, min=_min, max=_max) +# t7 = time() +# print(f'total run time in fit: {t7-t0:.2f} seconds') +# print(f'run time first fit: {t2-t1:.2f} seconds') +# print(f'run time remaining fits: {t4-t3:.2f} seconds') +# print(f'run time remapping results: {t6-t5:.2f} seconds') + +# print('\n\nAt end fit:') +# self._parameters.pretty_print() +# print(f'self._best_values:\n{self._best_values}\n\n') + + # Free the shared memory + self.freemem() + + def _fit_parallel(self, current_best_values, num, n_start, **kwargs): + num = min(num, self._map_dim-n_start) + for n in range(num): +# print(f'{n_start+n} before:\n{current_best_values}') + self._fit(n_start+n, current_best_values, **kwargs) +# print(f'{n_start+n} after:\n{current_best_values}') + + def _fit(self, n, current_best_values, return_result=False, **kwargs): +#RV print(f'\n\nstart FitMap._fit {n}\n') +#RV print(f'current_best_values = {current_best_values}') +#RV print(f'self._best_parameters = {self._best_parameters}') +#RV print(f'self._new_parameters = {self._new_parameters}\n\n') +# self._parameters.pretty_print() + # Set parameters to current best values, but prevent them from sitting at boundaries + if self._new_parameters is None: + # Initial fit + for name, value in current_best_values.items(): + par = self._parameters[name] + par.set(value=self._reset_par_at_boundary(par, value)) + else: + # Refit + for i, name in enumerate(self._best_parameters): + par = self._parameters[name] + if name in self._new_parameters: + if name in current_best_values: + par.set(value=self._reset_par_at_boundary(par, current_best_values[name])) + elif par.expr is None: + par.set(value=self._best_values[i][n]) +#RV print(f'\nbefore fit {n}') +#RV self._parameters.pretty_print() + if self._mask is None: + result = self._model.fit(self._ymap_norm[n], self._parameters, x=self._x, **kwargs) + else: + result = self._model.fit(self._ymap_norm[n][~self._mask], self._parameters, + x=self._x[~self._mask], **kwargs) +# print(f'\nafter fit {n}') +# self._parameters.pretty_print() +# print(result.fit_report(show_correl=False)) + out_of_bounds = False + for name, par in self._parameter_bounds.items(): + value = result.params[name].value + if not np.isinf(par['min']) and value < par['min']: + out_of_bounds = True + break + if not np.isinf(par['max']) and value > par['max']: + out_of_bounds = True + break + self._out_of_bounds_flat[n] = out_of_bounds + if self._try_no_bounds and out_of_bounds: + # Rerun fit with parameter bounds in place + for name, par in self._parameter_bounds.items(): + self._parameters[name].set(min=par['min'], max=par['max']) + # Set parameters to current best values, but prevent them from sitting at boundaries + if self._new_parameters is None: + # Initial fit + for name, value in current_best_values.items(): + par = self._parameters[name] + par.set(value=self._reset_par_at_boundary(par, value)) + else: + # Refit + for i, name in enumerate(self._best_parameters): + par = self._parameters[name] + if name in self._new_parameters: + if name in current_best_values: + par.set(value=self._reset_par_at_boundary(par, + current_best_values[name])) + elif par.expr is None: + par.set(value=self._best_values[i][n]) +# print('\nbefore fit') +# self._parameters.pretty_print() +# print(result.fit_report(show_correl=False)) + if self._mask is None: + result = self._model.fit(self._ymap_norm[n], self._parameters, x=self._x, **kwargs) + else: + result = self._model.fit(self._ymap_norm[n][~self._mask], self._parameters, + x=self._x[~self._mask], **kwargs) +# print(f'\nafter fit {n}') +# self._parameters.pretty_print() +# print(result.fit_report(show_correl=False)) + out_of_bounds = False + for name, par in self._parameter_bounds.items(): + value = result.params[name].value + if not np.isinf(par['min']) and value < par['min']: + out_of_bounds = True + break + if not np.isinf(par['max']) and value > par['max']: + out_of_bounds = True + break +# print(f'{n} redchi < redchi_cutoff = {result.redchi < self._redchi_cutoff} success = {result.success} out_of_bounds = {out_of_bounds}') + # Reset parameters back to unbound + for name in self._parameter_bounds.keys(): + self._parameters[name].set(min=-np.inf, max=np.inf) + assert(not out_of_bounds) + if result.redchi >= self._redchi_cutoff: + result.success = False + if result.nfev == result.max_nfev: +# print(f'Maximum number of function evaluations reached for n = {n}') +# logging.warning(f'Maximum number of function evaluations reached for n = {n}') + if result.redchi < self._redchi_cutoff: + result.success = True + self._max_nfev_flat[n] = True + if result.success: + assert(all(True for par in current_best_values if par in result.params.values())) + for par in result.params.values(): + if par.vary: + current_best_values[par.name] = par.value + else: + logging.warning(f'Fit for n = {n} failed: {result.lmdif_message}') + # Renormalize the data and results + self._renormalize(n, result) + if self._print_report: + print(result.fit_report(show_correl=False)) + if self._plot: + dims = np.unravel_index(n, self._map_shape) + if self._inv_transpose is not None: + dims= tuple(dims[self._inv_transpose[i]] for i in range(len(dims))) + super().plot(result=result, y=np.asarray(self._ymap[dims]), plot_comp_legends=True, + skip_init=self._skip_init, title=str(dims)) +#RV print(f'\n\nend FitMap._fit {n}\n') +#RV print(f'current_best_values = {current_best_values}') +# self._parameters.pretty_print() +# print(result.fit_report(show_correl=False)) +#RV print(f'\nself._best_values_flat:\n{self._best_values_flat}\n\n') + if return_result: + return(result) + + def _renormalize(self, n, result): + self._redchi_flat[n] = np.float64(result.redchi) + self._success_flat[n] = result.success + if self._norm is None or not self._normalized: + self._best_fit_flat[n] = result.best_fit + for i, name in enumerate(self._best_parameters): + self._best_values_flat[i][n] = np.float64(result.params[name].value) + self._best_errors_flat[i][n] = np.float64(result.params[name].stderr) + else: + pars = set(self._parameter_norms) & set(self._best_parameters) + for name, par in result.params.items(): + if name in pars and self._parameter_norms[name]: + if par.stderr is not None: + par.stderr *= self._norm[1] + if par.expr is None: + par.value *= self._norm[1] + if self._print_report: + if par.init_value is not None: + par.init_value *= self._norm[1] + if not np.isinf(par.min) and abs(par.min) != float_min: + par.min *= self._norm[1] + if not np.isinf(par.max) and abs(par.max) != float_min: + par.max *= self._norm[1] + self._best_fit_flat[n] = result.best_fit*self._norm[1]+self._norm[0] + for i, name in enumerate(self._best_parameters): + self._best_values_flat[i][n] = np.float64(result.params[name].value) + self._best_errors_flat[i][n] = np.float64(result.params[name].stderr) + if self._plot: + if not self._skip_init: + result.init_fit = result.init_fit*self._norm[1]+self._norm[0] + result.best_fit = np.copy(self._best_fit_flat[n])