Package flarefly
flarefly module
Sub-modules
flarefly.components-
Components module …
flarefly.custom_pdfs-
Module containing custom pdfs
flarefly.data_handler-
Simple module with a class to manage the data used in the analysis
flarefly.fitter-
Module containing the class used to perform mass fits
flarefly.pdf_builder-
PDFBuilder class to create signal and background PDFs using zfit.
flarefly.pdf_configs-
Helper functions to get the PDF configurations for signal and background PDFs.
flarefly.utils-
Simple module with a class to manage the data used in the analysis
Classes
class DataHandler (data=None, var_name='xaxis', limits=None, use_zfit=True, **kwargs)-
Expand source code
class DataHandler: """ Class for storing and managing the data of (ROOT tree, TH1, numpy array, etc.) """ def __init__(self, data=None, var_name='xaxis', limits=None, use_zfit=True, **kwargs): """ Initialize the DataHandler class Parameters ------------------------------------------------ data: numpy.array / pandas.DataFrame / uproot.behaviors.TH1.Histogram / ROOT.TH1 / string / zfit.data.Data / zfit.data.BinnedData Data or path to data to be used in the fit var_name: str Name of the variable used in the fit limits: list of floats Limits of the x axis used in the fit use_zfit: bool If True, zfit package is used to fit the data **kwargs: dict Additional optional arguments: - nbins: int Number of bins chosen by user to bin data in case of unbinned data - rebin: int Rebin factor in case of binned data - histoname: str Name of the histogram to be used in the fit in case of ROOT file - treename: str Name of the tree to be used in the fit in case of ROOT file """ # Default keyword arguments nbins = kwargs.get('nbins', 100) rebin = kwargs.get('rebin', 1) self._input_ = data self._var_name_ = var_name self._limits_ = [None, None] self._use_zfit_ = use_zfit self._obs_ = None self._data_ = None self._binned_data_ = None self._nbins_ = nbins self._isbinned_ = None self._norm_ = 1.0 self._rebin_ = rebin self.__format__ = None if use_zfit: data = self.__load_data(data, limits, **kwargs) # Update normalization: binned data sums over bin values, unbinned counts entries. if self._isbinned_: self._binned_data_ = data self._norm_ = float(sum(self._binned_data_.values())) else: self._data_ = data self._norm_ = float(len(self._data_.to_pandas())) else: Logger('Non-zfit data not available', 'FATAL') def __check_set_format(self, format_name): """ Checks and sets the data format for the handler. If the format is already set and does not match the provided format, logs a fatal error. If the format is not set, assigns the provided format. Parameters ------------------------------------------------ format_name: str The data format to check and set. """ if self.__format__ is not None and self.__format__ != format_name: Logger(f'Data format set to {self.__format__}, cannot use {format_name}', 'FATAL') elif self.__format__ is None: self.__format__ = format_name def __check_set_limits_unbinned_obs_(self, data, limits): """ Check and set the limits for unbinned observations. This method updates the limits and observation space for unbinned data if the limits are not already set. It sets the lower limit to the minimum value in the data and the upper limit to the maximum value in the data. The observation space is then updated with these new limits. Parameters ------------------------------------------------ data: iterable The unbinned data to determine the limits from. limits: list The limits provided by the user. """ if None in self._limits_ and limits is not None: self._limits_[0] = limits[0] self._limits_[1] = limits[1] elif None in self._limits_: if isinstance(data, pd.DataFrame): self._limits_[0] = min(data[self._var_name_]) self._limits_[1] = max(data[self._var_name_]) elif isinstance(data, zfit.core.data.Data): array = data.to_numpy() self._limits_[0] = min(array) self._limits_[1] = max(array) else: self._limits_[0] = min(data) self._limits_[1] = max(data) if self._obs_ is None: self._obs_ = zfit.Space(self._var_name_, lower=self._limits_[0], upper=self._limits_[1]) def __check_binned_unbinned_(self, isbinned): """ Checks and sets the binning status of the data. This method ensures that the binning status of the data is consistent. If the binning status has not been set, it sets it to the provided value. If the binning status has already been set and the provided value is different, it logs a fatal error indicating a data format mismatch. Parameters ------------------------------------------------ isbinned: bool The binning status to check against the current status. """ if self._isbinned_ is None: self._isbinned_ = isbinned elif self._isbinned_ is not None and self._isbinned_ != isbinned: Logger('Data format mismatch', 'FATAL') def __check_set_limits_binned_obs_(self, data, limits): """ Check and set the limits and binning for binned observations. This method checks if the limits for the binned observations are set. If not, it sets the number of bins, the lower limit, and the upper limit based on the provided data. It then creates a regular binning and observation space using zfit. If the limits are already set, it verifies that the bin edges match the provided data. If the bin edges do not match, it logs a fatal error. Parameters ------------------------------------------------ data: tuple A tuple where the second element is an array-like structure containing the bin edges. limits: list The limits provided by the user. """ if None in self._limits_ and limits is not None: idx_min = np.argmin(np.abs(data[1] - limits[0])) idx_max = np.argmin(np.abs(data[1] - limits[1])) self._limits_[0] = data[1][idx_min] self._limits_[1] = data[1][idx_max] elif None in self._limits_: self._limits_[0] = data[1][0] self._limits_[1] = data[1][-1] if self._obs_ is None: idx_min = np.argmin(np.abs(data[1] - self._limits_[0])) idx_max = np.argmin(np.abs(data[1] - self._limits_[1])) self._nbins_ = idx_max - idx_min binning = zfit.binned.RegularBinning( self._nbins_, self._limits_[0], self._limits_[1], name=self._var_name_ ) self._obs_ = zfit.Space(self._var_name_, binning=binning) else: idx_min = np.argmin(np.abs(data[1] - self._limits_[0])) idx_max = np.argmin(np.abs(data[1] - self._limits_[1])) binning = zfit.binned.RegularBinning( self._nbins_, self._limits_[0], self._limits_[1], name=self._var_name_ ) bin_edges = [] for i in range(self._nbins_): bin_edges.append(binning.bin(i)[0]) if not np.allclose(bin_edges, data[1][idx_min:idx_max]): Logger('Bin edges do not match', 'FATAL') def __load_data(self, data, limits, **kwargs): """ Load data from various formats. Parameters ------------------------------------------------ data: str, np.ndarray, pd.DataFrame, uproot.behaviors.TH1.Histogram, or ROOT.TH1 The data to be loaded. It can be a file path (str), a NumPy array, a Pandas DataFrame, or an uproot Histogram. limits: list The limits provided by the user. **kwargs: Additional keyword arguments to be passed to the specific data loading functions. Returns ------------------------------------------------- data: zfit.core.data.Data or zfit.data.BinnedData: The loaded data in the appropriate format. """ if isinstance(data, str): data = self.__load_from_file(data, limits, **kwargs) elif isinstance(data, np.ndarray): self.__check_set_format('numpy') data = self.__load_from_numpy(data, limits) elif isinstance(data, pd.DataFrame): self.__check_set_format('pandas') data = self.__load_from_pandas(data, limits) elif isinstance(data, zfit.data.Data): self.__check_set_format('zfit_data') data = self.__load_from_zfit_data(data, limits) elif isinstance(data, zfit.data.BinnedData): self.__check_set_format('zfit_data_binned') data = self.__load_from_zfit_data_binned(data, limits) elif isinstance(data, uproot.behaviors.TH1.Histogram): self.__check_set_format('uproot') data = self.__load_from_histogram(data, limits) # Care the position: "TH1" is also in uproot type elif "TH1" in str(type(data)): self.__check_set_format('root_hist') data = self.__load_from_histogram(data, limits) else: Logger('Data format not supported', 'FATAL') return data def __load_from_file(self, filename, limits, **kwargs): """ Load data from file-based sources (ROOT or parquet). Parameters ------------------------------------------------ filename: str The path to the file to be loaded. limits: list The limits provided by the user. **kwargs: Additional keyword arguments to be passed to the specific data loading functions. """ if filename.endswith('.root'): if self.__format__ is None: self.__check_set_format('root') if 'histoname' in kwargs: with uproot.open(filename, encoding="utf-8") as file: hist = file[kwargs['histoname']] return self.__load_from_histogram(hist, limits) if 'treename' in kwargs: with uproot.open(filename, encoding="utf-8") as file: df = file[kwargs['treename']].arrays(library='pd') return self.__load_from_pandas(df, limits) Logger('"histoname" not specified. Please specify the name of the histogram to be used', 'FATAL') return None if filename.endswith('.parquet') or filename.endswith('.parquet.gzip'): self.__check_set_format('parquet') df = pd.read_parquet(filename) return self.__load_from_pandas(df, limits) Logger('Data format not supported yet. Please use .root or .parquet', 'FATAL') return None def __load_from_numpy(self, data, limits): """Load a numpy array as unbinned data.""" self.__check_binned_unbinned_(False) self.__check_set_limits_unbinned_obs_(data, limits) return zfit.data.Data.from_numpy(obs=self._obs_, array=data) def __load_from_pandas(self, df, limits): """Load a pandas DataFrame as unbinned data.""" self.__check_binned_unbinned_(False) self.__check_set_limits_unbinned_obs_(df, limits) return zfit.data.Data.from_pandas(obs=self._obs_, df=df) def __load_from_zfit_data(self, data, limits): """Load a zfit Data object as unbinned data.""" self.__check_binned_unbinned_(False) self.__check_set_limits_unbinned_obs_(data, limits) return data def __load_from_zfit_data_binned(self, data, limits): """Load a zfit DataBinned object as binned data.""" self.__load_from_histogram(data, limits) return data def __load_from_histogram(self, hist_obj, limits): """ Load an uproot histogram object as binned data. """ self.__check_binned_unbinned_(True) if self.__format__ == 'root_hist': nbins = hist_obj.GetNbinsX() xmin = hist_obj.GetXaxis().GetXmin() xmax = hist_obj.GetXaxis().GetXmax() hist = Hist(Regular(nbins, xmin, xmax, name="x")) contents = np.array([hist_obj.GetBinContent(i+1) for i in range(nbins)]) errors2 = np.array([hist_obj.GetBinError(i+1)**2 for i in range(nbins)]) hist.view(flow=False)[...] = contents hist.variances(flow=False)[...] = errors2 else: hist = hist_obj.to_hist() hist = eval(f"hist[::{self._rebin_}j]") # pylint: disable=eval-used hist_array = hist.to_numpy() self.__check_set_limits_binned_obs_(hist_array, limits) idx_min = np.argmin(np.abs(hist_array[1] - self._limits_[0])) idx_max = np.argmin(np.abs(hist_array[1] - self._limits_[1])) data = zfit.data.BinnedData.from_tensor( self._obs_, hist.values()[idx_min:idx_max], hist.variances()[idx_min:idx_max] ) return data def add_data(self, data, **kwargs): """ Add data to the existing dataset. Parameters ------------------------------------------------ data: str, np.ndarray, pd.DataFrame, or uproot.behaviors.TH1.Histogram The data to be added. **kwargs: Additional keyword arguments to be passed to the specific data loading functions. """ if "limits" in kwargs: Logger('Limits not needed for adding data', 'FATAL') data = self.__load_data(data, limits=None, **kwargs) if self._isbinned_: self._binned_data_ = zfit.data.concat( [self._binned_data_.to_unbinned(), data.to_unbinned()] ).to_binned(self._obs_) self._norm_ = float(sum(self._binned_data_.values())) else: self._data_ = zfit.data.concat([self._data_, data]) self._norm_ = float(len(self._data_.to_pandas())) def get_data(self, input_data=False): """ Get the data Parameters ------------------------------------------------ input_data: bool If True, the input data is returned Returns ------------------------------------------------- data: zfit.core.data.Data The data instance """ if not input_data: return self._data_ return self._input_ def get_var_name(self): """ Get the variable name Returns ------------------------------------------------- var_name: str The variable name """ return self._var_name_ def get_limits(self): """ Get the limits of the x axis Returns ------------------------------------------------- limits: list The range limits of the x axis """ return self._limits_ def get_use_zfit(self): """ Get the use_zfit flag Returns ------------------------------------------------- limits: list The range limits of the x axis """ return self._use_zfit_ def get_obs(self): """ Get the observation space Returns ------------------------------------------------- obs: zfit.core.space.Space The observation space """ if self._use_zfit_: return self._obs_ Logger('Observable not available for non-zfit data', 'ERROR') return None def get_binned_obs_from_unbinned_data(self): """ Get the binned obs from unbinned obs Returns ------------------------------------------------- binned_obs: zfit.core.space.Space The observation space for unbinned data converted to binned data """ bins = self.get_nbins() limits = self.get_limits() binning = zfit.binned.RegularBinning(bins, limits[0], limits[1], name=self._var_name_) obs = zfit.Space(self._var_name_, binning=binning) return obs def get_unbinned_obs_from_binned_data(self): """ Get the unbinned obs from binned obs Returns ------------------------------------------------- unbinned_obs: zfit.core.space.Space The observation space for binned data converted to unbinned data """ limits = self.get_limits() obs = zfit.Space(self._var_name_, lower=limits[0], upper=limits[1]) return obs def get_norm(self): """ Get the integral of the data Returns ------------------------------------------------- norm: float The normalisation value """ return self._norm_ def get_bin_center(self): """ Get the center of the bins Returns ------------------------------------------------- binning: array The bin center """ if self.get_is_binned(): binning = self.get_obs().binning[0] else: binning = self.get_binned_obs_from_unbinned_data().binning[0] bin_center = [] for bin_ in binning: bin_center.append((bin_[0] + bin_[1])/2) return bin_center def get_bin_edges(self): """ Get the edges of the bins Returns ------------------------------------------------- bin_edges: list The bin edges """ if self.get_is_binned(): binning = self.get_obs().binning[0] else: binning = self.get_binned_obs_from_unbinned_data().binning[0] bin_edges = [] for bin_ in binning: bin_edges.append(bin_[0]) bin_edges.append(binning[-1][1]) return bin_edges def get_nbins(self): """ Get the number of bins Returns ------------------------------------------------- nbins: int The number of bins """ return self._nbins_ def get_is_binned(self): """ Get the data type (binned or not) Returns ------------------------------------------------- isbinnned: bool A flag that indicates if the data is binned """ return self._isbinned_ def get_binned_data(self): """ Get the binned data Returns ------------------------------------------------- binned_data: zfit.data.BinnedData The binned data """ return self._binned_data_ def get_binned_data_from_unbinned_data(self): """ Get the binned data from unbinned data Returns ------------------------------------------------- binned_data: float array The binned data obtained from unbinned data """ limits = self.get_limits() data_np = zfit.run(self.get_data()[self._var_name_]) data_values, _ = np.histogram(data_np, self.get_nbins(), range=(limits[0], limits[1])) return data_values def get_binned_data_handler_from_unbinned_data(self): """ Get the binned obs from unbinned obs Returns ------------------------------------------------- binned_obs: zfit.core.space.Space The observation space for unbinned data converted to binned data """ return DataHandler( self._data_.to_binned(self.get_binned_obs_from_unbinned_data()), var_name=self.get_var_name(), limits=self.get_limits(), rebin=1 ) def to_pandas(self): """ returns data in pandas df Returns ------------------------------------------------- data: pandas.DataFrame The data in a pandas DataFrame """ if self.__format__ == 'pandas': Logger('Data already in pandas format.', 'WARNING') return self._input_ if self.__format__ in ['numpy', 'parquet', 'root', 'zfit_data'] and not self._isbinned_: return self._data_.to_pandas() Logger('Data format not supported yet for pandas conversion.', 'ERROR') return None def to_hist(self, **kwargs): """ returns data in NamedHist Parameters ------------------------------------------------ **kwargs: dict Additional optional arguments: - lower_edge: float lower edge (only used in case of originally unbinned data) - upper_edge: float upper edge (only used in case of originally unbinned data) - nbins: int number of bins (only used in case of originally unbinned data) - axis_title: str label of x-axis (only used in case of originally unbinned data) - varname: str name of variable (needed in case of originally unbinned data) Returns ------------------------------------------------- hist: Hist The data in a hist.Hist """ if self._isbinned_: return self._binned_data_.to_hist() if 'varname' not in kwargs: Logger('Name of variable needed in case of unbinned data.', 'FATAL') varname = kwargs['varname'] df_unbinned = self._data_.to_pandas() data = df_unbinned[varname].to_numpy() nbins = kwargs.get('nbins', 100) lower_edge = kwargs.get('lower_edge', min(data)) upper_edge = kwargs.get('upper_edge', max(data)) axis_title = kwargs.get('axis_title', varname) hist = Hist.new.Reg(nbins, lower_edge, upper_edge, name="x", label=axis_title).Double() hist.fill(x=data) return histClass for storing and managing the data of (ROOT tree, TH1, numpy array, etc.)
Initialize the DataHandler class
Parameters
data:numpy.array / pandas.DataFrame / uproot.behaviors.TH1.Histogram / ROOT.TH1 / string /- zfit.data.Data / zfit.data.BinnedData Data or path to data to be used in the fit
var_name:str- Name of the variable used in the fit
limits:listoffloats- Limits of the x axis used in the fit
use_zfit:bool- If True, zfit package is used to fit the data
**kwargs:dict-
Additional optional arguments:
-
nbins: int Number of bins chosen by user to bin data in case of unbinned data
-
rebin: int Rebin factor in case of binned data
-
histoname: str Name of the histogram to be used in the fit in case of ROOT file
-
treename: str Name of the tree to be used in the fit in case of ROOT file
-
Methods
def add_data(self, data, **kwargs)-
Expand source code
def add_data(self, data, **kwargs): """ Add data to the existing dataset. Parameters ------------------------------------------------ data: str, np.ndarray, pd.DataFrame, or uproot.behaviors.TH1.Histogram The data to be added. **kwargs: Additional keyword arguments to be passed to the specific data loading functions. """ if "limits" in kwargs: Logger('Limits not needed for adding data', 'FATAL') data = self.__load_data(data, limits=None, **kwargs) if self._isbinned_: self._binned_data_ = zfit.data.concat( [self._binned_data_.to_unbinned(), data.to_unbinned()] ).to_binned(self._obs_) self._norm_ = float(sum(self._binned_data_.values())) else: self._data_ = zfit.data.concat([self._data_, data]) self._norm_ = float(len(self._data_.to_pandas()))Add data to the existing dataset.
Parameters
data:str, np.ndarray, pd.DataFrame,oruproot.behaviors.TH1.Histogram- The data to be added.
**kwargs: Additional keyword arguments to be passed to the specific data loading functions.
def get_bin_center(self)-
Expand source code
def get_bin_center(self): """ Get the center of the bins Returns ------------------------------------------------- binning: array The bin center """ if self.get_is_binned(): binning = self.get_obs().binning[0] else: binning = self.get_binned_obs_from_unbinned_data().binning[0] bin_center = [] for bin_ in binning: bin_center.append((bin_[0] + bin_[1])/2) return bin_centerGet the center of the bins
Returns
binning:array- The bin center
def get_bin_edges(self)-
Expand source code
def get_bin_edges(self): """ Get the edges of the bins Returns ------------------------------------------------- bin_edges: list The bin edges """ if self.get_is_binned(): binning = self.get_obs().binning[0] else: binning = self.get_binned_obs_from_unbinned_data().binning[0] bin_edges = [] for bin_ in binning: bin_edges.append(bin_[0]) bin_edges.append(binning[-1][1]) return bin_edgesGet the edges of the bins
Returns
bin_edges:list- The bin edges
def get_binned_data(self)-
Expand source code
def get_binned_data(self): """ Get the binned data Returns ------------------------------------------------- binned_data: zfit.data.BinnedData The binned data """ return self._binned_data_Get the binned data
Returns
binned_data:zfit.data.BinnedData- The binned data
def get_binned_data_from_unbinned_data(self)-
Expand source code
def get_binned_data_from_unbinned_data(self): """ Get the binned data from unbinned data Returns ------------------------------------------------- binned_data: float array The binned data obtained from unbinned data """ limits = self.get_limits() data_np = zfit.run(self.get_data()[self._var_name_]) data_values, _ = np.histogram(data_np, self.get_nbins(), range=(limits[0], limits[1])) return data_valuesGet the binned data from unbinned data
Returns
binned_data:float array- The binned data obtained from unbinned data
def get_binned_data_handler_from_unbinned_data(self)-
Expand source code
def get_binned_data_handler_from_unbinned_data(self): """ Get the binned obs from unbinned obs Returns ------------------------------------------------- binned_obs: zfit.core.space.Space The observation space for unbinned data converted to binned data """ return DataHandler( self._data_.to_binned(self.get_binned_obs_from_unbinned_data()), var_name=self.get_var_name(), limits=self.get_limits(), rebin=1 )Get the binned obs from unbinned obs
Returns
binned_obs:zfit.core.space.Space- The observation space for unbinned data converted to binned data
def get_binned_obs_from_unbinned_data(self)-
Expand source code
def get_binned_obs_from_unbinned_data(self): """ Get the binned obs from unbinned obs Returns ------------------------------------------------- binned_obs: zfit.core.space.Space The observation space for unbinned data converted to binned data """ bins = self.get_nbins() limits = self.get_limits() binning = zfit.binned.RegularBinning(bins, limits[0], limits[1], name=self._var_name_) obs = zfit.Space(self._var_name_, binning=binning) return obsGet the binned obs from unbinned obs
Returns
binned_obs:zfit.core.space.Space- The observation space for unbinned data converted to binned data
def get_data(self, input_data=False)-
Expand source code
def get_data(self, input_data=False): """ Get the data Parameters ------------------------------------------------ input_data: bool If True, the input data is returned Returns ------------------------------------------------- data: zfit.core.data.Data The data instance """ if not input_data: return self._data_ return self._input_Get the data
Parameters
input_data:bool- If True, the input data is returned
Returns
data:zfit.core.data.Data- The data instance
def get_is_binned(self)-
Expand source code
def get_is_binned(self): """ Get the data type (binned or not) Returns ------------------------------------------------- isbinnned: bool A flag that indicates if the data is binned """ return self._isbinned_Get the data type (binned or not)
Returns
isbinnned:bool- A flag that indicates if the data is binned
def get_limits(self)-
Expand source code
def get_limits(self): """ Get the limits of the x axis Returns ------------------------------------------------- limits: list The range limits of the x axis """ return self._limits_Get the limits of the x axis
Returns
limits:list- The range limits of the x axis
def get_nbins(self)-
Expand source code
def get_nbins(self): """ Get the number of bins Returns ------------------------------------------------- nbins: int The number of bins """ return self._nbins_Get the number of bins
Returns
nbins:int- The number of bins
def get_norm(self)-
Expand source code
def get_norm(self): """ Get the integral of the data Returns ------------------------------------------------- norm: float The normalisation value """ return self._norm_Get the integral of the data
Returns
norm:float- The normalisation value
def get_obs(self)-
Expand source code
def get_obs(self): """ Get the observation space Returns ------------------------------------------------- obs: zfit.core.space.Space The observation space """ if self._use_zfit_: return self._obs_ Logger('Observable not available for non-zfit data', 'ERROR') return NoneGet the observation space
Returns
obs:zfit.core.space.Space- The observation space
def get_unbinned_obs_from_binned_data(self)-
Expand source code
def get_unbinned_obs_from_binned_data(self): """ Get the unbinned obs from binned obs Returns ------------------------------------------------- unbinned_obs: zfit.core.space.Space The observation space for binned data converted to unbinned data """ limits = self.get_limits() obs = zfit.Space(self._var_name_, lower=limits[0], upper=limits[1]) return obsGet the unbinned obs from binned obs
Returns
unbinned_obs:zfit.core.space.Space- The observation space for binned data converted to unbinned data
def get_use_zfit(self)-
Expand source code
def get_use_zfit(self): """ Get the use_zfit flag Returns ------------------------------------------------- limits: list The range limits of the x axis """ return self._use_zfit_Get the use_zfit flag
Returns
limits:list- The range limits of the x axis
def get_var_name(self)-
Expand source code
def get_var_name(self): """ Get the variable name Returns ------------------------------------------------- var_name: str The variable name """ return self._var_name_Get the variable name
Returns
var_name:str- The variable name
def to_hist(self, **kwargs)-
Expand source code
def to_hist(self, **kwargs): """ returns data in NamedHist Parameters ------------------------------------------------ **kwargs: dict Additional optional arguments: - lower_edge: float lower edge (only used in case of originally unbinned data) - upper_edge: float upper edge (only used in case of originally unbinned data) - nbins: int number of bins (only used in case of originally unbinned data) - axis_title: str label of x-axis (only used in case of originally unbinned data) - varname: str name of variable (needed in case of originally unbinned data) Returns ------------------------------------------------- hist: Hist The data in a hist.Hist """ if self._isbinned_: return self._binned_data_.to_hist() if 'varname' not in kwargs: Logger('Name of variable needed in case of unbinned data.', 'FATAL') varname = kwargs['varname'] df_unbinned = self._data_.to_pandas() data = df_unbinned[varname].to_numpy() nbins = kwargs.get('nbins', 100) lower_edge = kwargs.get('lower_edge', min(data)) upper_edge = kwargs.get('upper_edge', max(data)) axis_title = kwargs.get('axis_title', varname) hist = Hist.new.Reg(nbins, lower_edge, upper_edge, name="x", label=axis_title).Double() hist.fill(x=data) return histreturns data in NamedHist
Parameters
**kwargs:dict-
Additional optional arguments:
-
lower_edge: float lower edge (only used in case of originally unbinned data)
-
upper_edge: float upper edge (only used in case of originally unbinned data)
-
nbins: int number of bins (only used in case of originally unbinned data)
-
axis_title: str label of x-axis (only used in case of originally unbinned data)
-
varname: str name of variable (needed in case of originally unbinned data)
-
Returns
hist:Hist- The data in a hist.Hist
def to_pandas(self)-
Expand source code
def to_pandas(self): """ returns data in pandas df Returns ------------------------------------------------- data: pandas.DataFrame The data in a pandas DataFrame """ if self.__format__ == 'pandas': Logger('Data already in pandas format.', 'WARNING') return self._input_ if self.__format__ in ['numpy', 'parquet', 'root', 'zfit_data'] and not self._isbinned_: return self._data_.to_pandas() Logger('Data format not supported yet for pandas conversion.', 'ERROR') return Nonereturns data in pandas df
Returns
data:pandas.DataFrame- The data in a pandas DataFrame
class F2MassFitter (data_handler, name_signal_pdf, name_background_pdf, **kwargs)-
Expand source code
class F2MassFitter: """ Class used to perform mass fits with the zfit library https://github.com/zfit/zfit """ # pylint: disable=too-many-statements, too-many-branches def __init__(self, data_handler, name_signal_pdf, name_background_pdf, **kwargs): """ Initialize the F2MassFitter class Parameters ------------------------------------------------- data_handler: flarefly.DataHandler The data handler containing the data to fit name_signal_pdf: list The list of names for the signal pdfs. The possible options are: - 'nosignal' - 'gaussian' - 'doublegaus' - 'gausexptail' - 'genergausexptail' - 'genergausexptailsymm' - 'bifurgaus' - 'crystalball' - 'doublecb' - 'doublecbsymm' - 'genercrystalball' - 'cauchy' - 'voigtian' - 'kde_exact' (requires to set the datasample and options) - 'kde_grid' (requires to set the datasample and options) - 'kde_fft' (requires to set the datasample and options) - 'kde_isj' (requires to set the datasample and options) - 'hist' (only for binned fits, requires to set the datasample) name_background_pdf: list The list of names of the background pdfs. The possible options are: - 'nobkg' - 'expo' - 'powlaw' - 'expopow' - 'expopowext' - 'chebpolN' (N is the order of the polynomial) - 'kde_exact' (requires to set the datasample and options) - 'kde_grid' (requires to set the datasample and options) - 'kde_fft' (requires to set the datasample and options) - 'kde_isj' (requires to set the datasample and options) - 'hist' (only for binned fits, requires to set the datasample) **kwargs: dict Additional optional arguments: - name_refl_pdf: list The list of names of the signal pdfs. It must have the same length as the signal list. The possible options are: - 'kde_exact' (requires to set the datasample and options) - 'kde_grid' (requires to set the datasample and options) - 'kde_fft' (requires to set the datasample and options) - 'kde_isj' (requires to set the datasample and options) - 'hist' (only for binned fits, requires to set the datasample) - name: str Optional name for the fitter, needed in case of multiple fitters defined in the same script - chi2_loss: bool chi2 minimization if True, nll minmization else, default value to False - extended: bool If True, the pdf is considered extended, i.e. the yield is a parameter to fit. default value to False - minuit_mode: A number used by minuit to define the internal minimization strategy, either 0, 1 or 2. 0 is the fastest, 2 is the slowest (see more details in https://zfit.readthedocs.io/en/latest/user_api/minimize/_generated/minimizers/zfit.minimize.Minuit.html#zfit.minimize.Minuit) Default value to 0 - limits: list list of fit limits to include in the fit - tol: float Termination value for the convergence/stopping criterion of the algorithm in order to determine if the minimum has been found. Default value to 0.001 - verbosity: int verbosity level (from 0 to 10) Default value to 0 - signal_at_threshold: list list of booleans which indicate whether the signal PDFs are at threshold or not. Each element corresponds to a signal pdf - label_signal_pdf: list list of labels for signal pdfs - label_bkg_pdf: list list of labels for background pdfs """ self._data_handler_ = data_handler signal_labels = kwargs.get( 'label_signal_pdf', [f'signal {idx}' for idx in range(len(name_signal_pdf))] ) background_labels = kwargs.get( 'label_bkg_pdf', [f'background {idx}' for idx in range(len(name_background_pdf))] ) signal_at_threshold = kwargs.get('signal_at_threshold', [False for _ in name_signal_pdf]) self._signal_pdfs_ = [F2PDFBase( name, label, "signal", at_threshold=at_threshold ) for name, label, at_threshold in zip(name_signal_pdf, signal_labels, signal_at_threshold)] self._background_pdfs_ = [F2PDFBase( name, label, "background" ) for name, label in zip(name_background_pdf, background_labels)] self.no_signal = any(pdf.kind == PDFType.NO_SIGNAL for pdf in self._signal_pdfs_) self.no_background = any(pdf.kind == PDFType.NO_BKG for pdf in self._background_pdfs_) refl_names = kwargs.get('name_refl_pdf', ["none" for _ in name_signal_pdf]) self._refl_pdfs_ = [F2PDFBase( name, f'reflection {idx}', "reflection" ) for idx, name in enumerate(refl_names)] if len(self._refl_pdfs_) != len(self._signal_pdfs_): Logger('List of pdfs for signals and reflections different! Exit', 'FATAL') if self.no_signal: # Remove all other signal pdfs self._signal_pdfs_ = [] if self.no_background: # Remove all other background pdfs self._background_pdfs_ = [] # workaround, add reflections to signal pdfs, to be removed in future versions self._n_refl_ = 0 self._refl_idx_ = [None] * len(self._refl_pdfs_) if not all(pdf.kind == PDFType.NONE for pdf in self._refl_pdfs_): Logger( 'Reflection pdfs will be deprecated in future versions, ' \ 'please use background pdfs instead and fix the normalisation ' \ 'with fix_bkg_frac_to_signal_pdf', 'WARNING' ) n_signal = len(self._signal_pdfs_) for ipdf, pdf in enumerate(self._refl_pdfs_): if pdf.kind != PDFType.NONE: self._signal_pdfs_.append(pdf) self._refl_idx_[ipdf] = n_signal + self._n_refl_ self._n_refl_ += 1 self._is_pdf_built_ = False self._total_pdf_ = None self._total_pdf_binned_ = None self._fit_result_ = None self._fix_fracs_to_pdfs_ = [] if not self.no_signal and self.no_background: if self._refl_pdfs_[0].kind != PDFType.NONE: Logger('Not possible to use reflections without background pdf', 'FATAL') self._fracs_ = [None for _ in range(len(self._signal_pdfs_) - 1)] elif self.no_signal and not self.no_background: self._fracs_ = [None for _ in range(len(self._background_pdfs_) - 1)] elif not self.no_signal and not self.no_background: self._fracs_ = [None for _ in range(len(self._signal_pdfs_) + len(self._background_pdfs_) - 1)] else: Logger('No signal nor background pdf defined', 'FATAL') self._total_yield_ = None self._yields_ = [None for _ in range(len(self._fracs_) + 1)] self._total_pdf_norm_ = None self._ratio_truncated_ = None self._rawyield_ = [0. for _ in name_signal_pdf] self._rawyield_err_ = [0. for _ in name_signal_pdf] self._minimizer_ = zfit.minimize.Minuit( verbosity=kwargs.get('verbosity', 7), mode=kwargs.get('minuit_mode', 0), tol=kwargs.get('tol', 0.001) ) if 'limits' in kwargs and self._data_handler_.get_is_binned(): Logger('Restriction of fit limits is not yet implemented in binned fits!', 'FATAL') if 'limits' in kwargs and not self.no_signal: Logger('Using signal PDFs while restricting fit limits!', 'WARNING') self._limits_ = kwargs.get( 'limits', self._data_handler_.get_limits()) if not isinstance(self._limits_[0], list): self._limits_ = [self._limits_] self._is_truncated_ = not np.allclose(self._limits_, self._data_handler_.get_limits()) self._name_ = kwargs.get('name', 'fitter') self._ndf_ = None self._chi2_loss_ = kwargs.get('chi2_loss', False) self._extended_ = kwargs.get('extended', False) if self._extended_ and self._data_handler_.get_is_binned(): Logger('Binned fit with extended pdf not yet supported!', 'FATAL') self._base_sgn_cmap_ = plt.colormaps.get_cmap('viridis') self._sgn_cmap_ = ListedColormap(self._base_sgn_cmap_(np.linspace(0.4, 0.65, len(self._signal_pdfs_)))) n_bkg_colors = len(self._background_pdfs_) if len(self._background_pdfs_) == 0: n_bkg_colors = 1 # to avoid crash self._base_bkg_cmap_ = plt.colormaps.get_cmap('Reds') self._bkg_cmap_ = ListedColormap(self._base_bkg_cmap_(np.linspace(0.8, 0.2, n_bkg_colors))) self._base_refl_cmap_ = plt.colormaps.get_cmap('summer') self._refl_cmap_ = ListedColormap(self._base_refl_cmap_(np.linspace(0., 0.6, len(self._refl_pdfs_)))) self._raw_residuals_ = [] self._raw_residual_variances_ = [] self._std_residuals_ = [] zfit.settings.advanced_warnings.all = False zfit.settings.changed_warnings.all = False self.pdg_api = pdg.connect() def cleanup(self): """ Cleanup function to clear the graph cache of zfit """ zfit.run.clear_graph_cache() def __del__(self): try: self.cleanup() except Exception: # pylint: disable=broad-exception-caught pass # pylint: disable=too-many-branches, too-many-statements def __build_signal_pdfs(self, obs): """ Helper function to compose the signal pdfs """ if self.no_signal: Logger('Performing fit with no signal pdf', 'WARNING') return for ipdf, pdf in enumerate(self._signal_pdfs_): PDFBuilder.build_signal_pdf( pdf, obs, self._name_, ipdf ) def __build_background_pdfs(self, obs): """ Helper function to compose the background pdfs """ if self.no_background: Logger('Performing fit with no background pdf', 'WARNING') return for ipdf, pdf in enumerate(self._background_pdfs_): PDFBuilder.build_bkg_pdf( pdf, obs, self._name_, ipdf ) if str(pdf.kind) in ['powlaw', 'expopow', 'expopowext'] and\ self._data_handler_.get_limits()[0] < pdf.get_init_par("mass"): Logger( 'The mass parameter in powlaw cannot be smaller than the lower fit limit, ' 'please fix it.', 'FATAL' ) def __get_constrained_frac_par(self, frac_par, factor_par, refl=False): """ Helper function to create a parameter constrained to a value """ def par_func(par, factor): return par * factor if refl: name_composed_param = f'{self._name_}_{factor_par.name.replace("factor", "frac_refl")}' else: name_composed_param = f'{self._name_}_{factor_par.name.replace("factor", "frac")}' frac_par_constrained = zfit.ComposedParameter(name_composed_param, par_func, params=[frac_par, factor_par], unpack_params=True ) return frac_par_constrained def __set_frac_constraints(self): for frac_fix_info in self._fix_fracs_to_pdfs_: if frac_fix_info['fixed_pdf_type'] == 'signal': if frac_fix_info['target_pdf_type'] == 'signal': self._fracs_[frac_fix_info['fixed_pdf_idx']] = self.__get_constrained_frac_par( self._fracs_[frac_fix_info['target_pdf_idx']], frac_fix_info['factor'] ) else: # fix to background PDF self._fracs_[frac_fix_info['fixed_pdf_idx']] = self.__get_constrained_frac_par( self._fracs_[frac_fix_info['target_pdf_idx'] + len(self._signal_pdfs_)], frac_fix_info['factor'] ) else: # fix background fraction if frac_fix_info['target_pdf_type'] == 'signal': self._fracs_[frac_fix_info['fixed_pdf_idx'] + len(self._signal_pdfs_)] = \ self.__get_constrained_frac_par( self._fracs_[frac_fix_info['target_pdf_idx']], frac_fix_info['factor'] ) else: # fix to background PDF self._fracs_[frac_fix_info['fixed_pdf_idx'] + len(self._signal_pdfs_)] = \ self.__get_constrained_frac_par( self._fracs_[frac_fix_info['target_pdf_idx'] + len(self._signal_pdfs_)], frac_fix_info['factor'] ) def __build_total_pdf(self): """ Helper function to compose the total pdf """ if self._data_handler_.get_is_binned(): # we need unbinned pdfs to sum them obs = self._data_handler_.get_unbinned_obs_from_binned_data() else: obs = self._data_handler_.get_obs() # order of the pdfs is signal, background self.__build_signal_pdfs(obs) self.__build_background_pdfs(obs) self.__get_total_pdf_norm() if self.no_signal and len(self._background_pdfs_) == 1: self._total_pdf_ = self._background_pdfs_[0].pdf.copy() if self._extended_: self._total_yield_ = zfit.Parameter( f'{self._name_}_yield', self._data_handler_.get_norm(), 0, floating=True ) self._total_pdf_.set_yield(self._total_yield_) if self._is_truncated_: self._total_pdf_ = self._total_pdf_.to_truncated(limits=self._limits_, obs=obs) return if self.no_background and len(self._signal_pdfs_) == 1: self._total_pdf_ = self._signal_pdfs_[0].pdf.copy() if self._extended_: self._total_yield_ = zfit.Parameter( f'{self._name_}_yield', self._data_handler_.get_norm(), 0, floating=True ) self._total_pdf_.set_yield(self._total_yield_) if self._is_truncated_: self._total_pdf_ = self._total_pdf_.to_truncated(limits=self._limits_, obs=obs) return for ipdf, pdf in enumerate(self._signal_pdfs_): pdf.set_default_par('frac', init=0.1, fix=False, limits=[0, 1.]) if self.no_background and ipdf == len(self._signal_pdfs_) - 1: continue self._fracs_[ipdf] = zfit.Parameter(f'{self._name_}_frac_signal{ipdf}', pdf.get_init_par('frac'), pdf.get_limits_par('frac')[0], pdf.get_limits_par('frac')[1], floating=not pdf.get_fix_par('frac')) if len(self._background_pdfs_) > 1: for ipdf, pdf in enumerate(self._background_pdfs_[:-1]): pdf.set_default_par('frac', init=0.1, fix=False, limits=[0, 1.]) self._fracs_[ipdf + len(self._signal_pdfs_)] = zfit.Parameter( f'{self._name_}_frac_bkg{ipdf}', pdf.get_init_par('frac'), pdf.get_limits_par('frac')[0], pdf.get_limits_par('frac')[1], floating=not pdf.get_fix_par('frac')) self.__set_frac_constraints() if self._extended_: self._total_yield_ = zfit.Parameter( f'{self._name_}_yield', self._data_handler_.get_norm(), 0, floating=True ) def par_func(par, factor): return par * factor for i_frac, frac in enumerate(self._fracs_): self._yields_[i_frac] = zfit.ComposedParameter( frac.name.replace('frac', 'yield'), par_func, params=[frac, self._total_yield_], unpack_params=True ) def frac_last_pdf(pars): return 1 - sum(par.value() for par in pars) frac_last = zfit.ComposedParameter( f'{self._name_}_frac_bkg_{len(self._background_pdfs_)-1}', frac_last_pdf, params=self._fracs_, unpack_params=False ) self._yields_[-1] = zfit.ComposedParameter( frac_last.name.replace('frac', 'yield'), par_func, params=[frac_last, self._total_yield_], unpack_params=True ) pdfs_sum = [pdf.pdf.copy() for pdf in self._signal_pdfs_ + self._background_pdfs_] for pdf, y in zip(pdfs_sum, self._yields_): pdf.set_yield(y) self._total_pdf_ = zfit.pdf.SumPDF(pdfs_sum) else: self._total_pdf_ = zfit.pdf.SumPDF([pdf.pdf for pdf in self._signal_pdfs_+self._background_pdfs_], self._fracs_) if not self._data_handler_.get_is_binned() and self._is_truncated_: self._total_pdf_ = self._total_pdf_.to_truncated(limits=self._limits_, obs=obs, norm=obs) def __build_total_pdf_binned(self): """ Helper function to compose the total pdf binned from unbinned """ # for binned data, obs already contains the wanted binning if self._data_handler_.get_is_binned(): obs = self._data_handler_.get_obs() # for unbinned data, one needs to impose a binning else: obs = self._data_handler_.get_binned_obs_from_unbinned_data() if self.no_signal and len(self._background_pdfs_) == 1: self._total_pdf_binned_ = zfit.pdf.BinnedFromUnbinnedPDF(self._background_pdfs_[0].pdf, obs) return if self.no_background and len(self._signal_pdfs_) == 1: self._total_pdf_binned_ = zfit.pdf.BinnedFromUnbinnedPDF(self._signal_pdfs_[0].pdf, obs) return self._total_pdf_binned_ = zfit.pdf.BinnedFromUnbinnedPDF(zfit.pdf.SumPDF( [pdf.pdf for pdf in self._signal_pdfs_+self._background_pdfs_], self._fracs_), obs) def __get_frac_and_err(self, frac_par, frac_type): par_name = frac_par.name if 'constrained' in par_name: split_par_name = par_name.split(sep='_constrained_to_') target_frac_name = split_par_name[0].replace(split_par_name[0].split('_')[-1], split_par_name[1]) # find the parameter containing target_frac_name target_par_name, target_par = [ (par.name, par) for par in self._fracs_ if target_frac_name in par.name ][0] if 'constrained' in target_par_name: if f'{self._name_}_frac_signal' in target_par_name: frac_type = 'signal' else: frac_type = 'bkg' frac_temp, err_temp, isgn = self.__get_frac_and_err(target_par, frac_type) return ( frac_temp * float(frac_par.params['param_1'].value()), err_temp * float(frac_par.params['param_1'].value()), isgn ) frac = self._fit_result_.params[target_frac_name]['value'] frac_err = self._fit_result_.params[target_frac_name]['hesse']['error'] return ( frac * float(frac_par.params['param_1'].value()), frac_err * float(frac_par.params['param_1'].value()), int(split_par_name[0][-1]) ) return ( self._fit_result_.params[par_name]['value'], self._fit_result_.params[par_name]['hesse']['error'], int(par_name.split(sep=f'{self._name_}_frac_{frac_type}')[-1]) ) def __get_frac_cov(self, frac_par, frac_type, other_par): par_name = frac_par.name other_par_name = other_par.name if 'constrained' in par_name: split_par_name = par_name.split(sep='_constrained_to_') target_frac_name = split_par_name[0].replace(split_par_name[0].split('_')[-1], split_par_name[1]) # find the parameter containing target_frac_name target_par_name, target_par = [ (par.name, par) for par in self._fracs_ if target_frac_name in par.name ][0] if 'constrained' in target_par_name: if f'{self._name_}_frac_signal' in target_par_name: frac_type = 'signal' else: frac_type = 'bkg' cov_temp = self.__get_frac_cov(target_par, frac_type, other_par) return cov_temp * float(frac_par.params['param_1'].value()) if 'constrained' in other_par_name: split_par_name = other_par_name.split(sep='_constrained_to_') target_frac_name = split_par_name[0].replace(split_par_name[0].split('_')[-1], split_par_name[1]) # find the parameter containing target_frac_name target_par_name, target_par = [ (par.name, par) for par in self._fracs_ if target_frac_name in par.name ][0] if 'constrained' in target_par_name: cov_temp = self.__get_frac_cov(frac_par, frac_type, target_par) return cov_temp * float(frac_par.params['param_1'].value()) return self._fit_result_.covariance(params=[frac_par, other_par], method='hesse_np')[0, 1] def __get_all_fracs(self): """ Helper function to get all fractions Returns ------------------------------------------------- signal_fracs: list fractions of the signal pdfs background_fracs: list fractions of the background pdfs refl_fracs: list fractions of the reflected signal pdfs signal_err_fracs: list errors of fractions of the signal pdfs bkg_err_fracs: list errors of fractions of the background pdfs refl_err_fracs: list errors of fractions of the reflected signal pdfs """ signal_fracs, bkg_fracs, refl_fracs, signal_err_fracs, bkg_err_fracs, refl_err_fracs = ([] for _ in range(6)) for frac_par in self._fracs_: if frac_par is None: continue par_name = frac_par.name if f'{self._name_}_frac_signal' in par_name: signal_frac, signal_err, _ = self.__get_frac_and_err(frac_par, 'signal') signal_fracs.append(signal_frac) signal_err_fracs.append(signal_err) elif f'{self._name_}_frac_bkg' in par_name: bkg_frac, bkg_err, _ = self.__get_frac_and_err(frac_par, 'bkg') bkg_fracs.append(bkg_frac) bkg_err_fracs.append(bkg_err) if len(signal_fracs) == len(bkg_fracs) == len(refl_fracs) == 0: if self.no_background: signal_fracs.append(1.) signal_err_fracs.append(0.) refl_fracs.append(0.) refl_err_fracs.append(0.) elif self.no_signal: bkg_fracs.append(1.) bkg_err_fracs.append(0.) else: if self.no_background: signal_fracs.append(1 - sum(signal_fracs) - sum(refl_fracs) - sum(bkg_fracs)) signal_err_fracs.append(np.sqrt(sum(list(err**2 for err in signal_err_fracs + bkg_err_fracs)))) else: bkg_fracs.append(1 - sum(signal_fracs) - sum(refl_fracs) - sum(bkg_fracs)) bkg_err_fracs.append(np.sqrt(sum(list(err**2 for err in signal_err_fracs + bkg_err_fracs)))) for refl_idx in self._refl_idx_: if refl_idx is None: continue refl_frac = signal_fracs.pop(refl_idx) refl_err_frac = signal_err_fracs.pop(refl_idx) refl_fracs.append(refl_frac) refl_err_fracs.append(refl_err_frac) return signal_fracs, bkg_fracs, refl_fracs, signal_err_fracs, bkg_err_fracs, refl_err_fracs def __get_total_pdf_norm(self): """ Get the normalization of the total pdf """ if self._data_handler_.get_is_binned(): self._total_pdf_norm_ = float(self._data_handler_.get_norm()) return norm_total_pdf = 0. for lim in self._limits_: norm_obs = self._data_handler_.get_obs().with_limits(lim) norm_total_pdf += float(self._data_handler_.get_data().with_obs(norm_obs).n_events) self._total_pdf_norm_ = norm_total_pdf def __get_ratio_truncated(self): """ Get the ratio of the sidebands integral over the total integral """ if not self._is_truncated_: self._ratio_truncated_ = 1. return signal_fracs, bkg_fracs, refl_fracs, _, _, _ = self.__get_all_fracs() fracs = signal_fracs + refl_fracs + bkg_fracs limits = self._data_handler_.get_limits() sidebands_integral, total_integral = 0., 0. for i_pdf, pdf in enumerate(self._signal_pdfs_ + self._background_pdfs_): sidebands_integral += float(sum(pdf.pdf.integrate(lim) * fracs[i_pdf] for lim in self._limits_)) total_integral += float(pdf.pdf.integrate(limits) * fracs[i_pdf]) self._ratio_truncated_ = sidebands_integral / total_integral def __get_raw_residuals(self): """ Get the raw residuals (data_value - bkg_model_value) for all bins """ bins = self._data_handler_.get_nbins() norm = self._total_pdf_norm_ self._raw_residuals_ = [None]*bins background_pdf_binned_ = [None for _ in enumerate(self._background_pdfs_)] model_bkg_values = [None for _ in enumerate(self._background_pdfs_)] # access normalized data values and errors for all bins if self._data_handler_.get_is_binned(): binned_data = self._data_handler_.get_binned_data() data_values = binned_data.values() self._raw_residual_variances_ = binned_data.variances() obs = self._data_handler_.get_obs() else: data_values = self._data_handler_.get_binned_data_from_unbinned_data() self._raw_residual_variances_ = data_values # poissonian errors obs = self._data_handler_.get_binned_obs_from_unbinned_data() # get background fractions if len(self._background_pdfs_) > 0: if len(self._background_pdfs_) == 1: _, bkg_fracs, _, _, _, _ = self.__get_all_fracs() else: _, bkg_fracs, _, _, _, _ = self.__get_all_fracs() # access model predicted values for background for ipdf, pdf in enumerate(self._background_pdfs_): background_pdf_binned_[ipdf] = zfit.pdf.BinnedFromUnbinnedPDF( self._background_pdfs_[ipdf].pdf, obs ) norm_bkg = norm if pdf.is_hist(): norm_bkg /= float(sum(background_pdf_binned_[ipdf].values())) model_bkg_values[ipdf] = background_pdf_binned_[ipdf].values() * bkg_fracs[ipdf] model_bkg_values[ipdf] *= norm_bkg / self._ratio_truncated_ # compute residuals for ibin, data in enumerate(data_values): self._raw_residuals_[ibin] = float(data) for ipdf, _ in enumerate(self._background_pdfs_): self._raw_residuals_[ibin] -= model_bkg_values[ipdf][ibin] else: for ibin, data in enumerate(data_values): self._raw_residuals_[ibin] = float(data) def __get_std_residuals(self): """ Get the standardized residuals (data_value - bkg_model_value)/ sigma_data for all bins """ bins = self._data_handler_.get_nbins() norm = self._total_pdf_norm_ self._std_residuals_ = [None]*bins # access normalized data values and errors for all bins if self._data_handler_.get_is_binned(): binned_data = self._data_handler_.get_binned_data() data_values = binned_data.values() variances = binned_data.variances() else: if self._limits_ != [self._data_handler_.get_limits()]: # restricted fit limits Logger('Standard residuals not yet implemented with restricted fit limits', 'FATAL') data_values = self._data_handler_.get_binned_data_from_unbinned_data() variances = data_values # poissonian errors # access model predicted values for background self.__build_total_pdf_binned() model_values = self._total_pdf_binned_.values()*norm/self._ratio_truncated_ for ibin, (data, model, variance) in enumerate(zip(data_values, model_values, variances)): if variance == 0: Logger('Null variance. Consider enlarging the bins.', 'FATAL') self._std_residuals_[ibin] = float((data - model)/np.sqrt(variance)) def __prefit(self, excluded_regions): """ Perform a prefit to the background distributions with the zfit library. The expected signal regions must be provided. Parameters ------------------------------------------------- excluded_regions: list List or list of lists with limits for the excluded regions """ if self.no_background: Logger('Prefit cannot be performed in case of no background, skip', 'WARNING') return if self._data_handler_.get_is_binned(): Logger('Prefit not yet implemented for binned fits, skip', 'WARNING') return if len(self._limits_) > 1: Logger('Prefit not supported for fits on disjoint intervals, skip', 'WARNING') return obs = self._data_handler_.get_obs() # we build the limits if not isinstance(excluded_regions[0], list): excluded_regions = [excluded_regions] # check if excluded regions overlap limits_sb = [] if excluded_regions[0][0] > self._limits_[0][0]: limits_sb.append([self._limits_[0][0], excluded_regions[0][0]]) for iregion, region in enumerate(excluded_regions[:-1]): if region[1] > self._limits_[0][0] and \ excluded_regions[iregion+1][0] < self._limits_[-1][-1] and \ region[1] < excluded_regions[iregion+1][0]: limits_sb.append([region[1], excluded_regions[iregion+1][0]]) if excluded_regions[-1][1] < self._limits_[-1][-1]: limits_sb.append([excluded_regions[-1][1], self._limits_[-1][-1]]) if len(limits_sb) == 0: Logger( 'Cannot perform prefit with the excluded regions set since no sidebands left, skip', 'WARNING' ) return fracs_bkg_prefit = [] for ipdf, _ in enumerate(self._background_pdfs_): fracs_bkg_prefit.append(zfit.Parameter(f'{self._name_}_frac_bkg{ipdf}_prefit', 0.1, 0., 1.)) prefit_background_pdf = None if len(self._background_pdfs_) > 1: prefit_background_pdf = zfit.pdf.SumPDF( [pdf.pdf for pdf in self._background_pdfs_], fracs_bkg_prefit ).to_truncated(limits=limits_sb, obs=obs, norm=obs) else: prefit_background_pdf = self._background_pdfs_[0].pdf.to_truncated( limits=limits_sb, obs=obs, norm=obs) prefit_loss = zfit.loss.UnbinnedNLL(model=prefit_background_pdf, data=self._data_handler_.get_data()) Logger("Performing background only PREFIT", "INFO") res_prefit = self._minimizer_.minimize(loss=prefit_loss) Logger(res_prefit, 'RESULT') # re-initialise the parameters from those obtained in the prefit for par in res_prefit.params: which_pdf = int(par.name[-1]) self._background_pdfs_[which_pdf].parameters[par.name].set_value(res_prefit.params[par.name]['value']) # pylint: disable=too-many-nested-blocks def mass_zfit(self, do_prefit=False, **kwargs): """ Perform a mass fit with the zfit library Parameters ------------------------------------------------- do_prefit: bool Flag to enable prefit to background only (supported for unbinned fits only) The expected signal regions must be provided. **kwargs: dict Additional optional arguments: - prefit_excluded_regions: list List or list of lists with limits for the excluded regions - prefit_exclude_nsigma: Alternative argument to prefit_excluded_regions that excludes the regions around the signals, using the initialised mean and sigma parameters Returns ------------------------------------------------- fit_result: zfit.minimizers.fitresult.FitResult The fit result """ if self._data_handler_ is None: Logger('Data handler not specified', 'FATAL') self._raw_residuals_ = [] self._raw_residual_variances_ = [] self._std_residuals_ = [] self.__build_total_pdf() self.__build_total_pdf_binned() self._is_pdf_built_ = True # do prefit if do_prefit: skip_prefit = False excluded_regions = [] exclude_signals_nsigma = kwargs.get('prefit_exclude_nsigma', None) if exclude_signals_nsigma is not None: for pdf in self._signal_pdfs_: if not pdf.kind.has_sigma(): Logger(f"Sigma parameter not defined for {str(pdf.kind)}, " "cannot use nsigma for excluded regions in prefit", "Error") skip_prefit = True else: mass_name = 'm' if pdf.kind.uses_m_not_mu() else 'mu' mass = pdf.get_init_par(mass_name) sigma = pdf.get_init_par('sigma') excluded_regions.append( [mass - exclude_signals_nsigma * sigma, mass + exclude_signals_nsigma * sigma]) else: excluded_regions = kwargs.get('prefit_excluded_regions', None) if excluded_regions is None: Logger("To perform the prefit, you should set the excluded regions, skip", "Error") skip_prefit = True if not skip_prefit: self.__prefit(excluded_regions) if self._data_handler_.get_is_binned(): # chi2 loss if self._chi2_loss_: loss = zfit.loss.BinnedChi2(self._total_pdf_binned_, self._data_handler_.get_binned_data()) # nll loss else: loss = zfit.loss.BinnedNLL(self._total_pdf_binned_, self._data_handler_.get_binned_data()) else: if self._extended_: loss = zfit.loss.ExtendedUnbinnedNLL(model=self._total_pdf_, data=self._data_handler_.get_data()) else: loss = zfit.loss.UnbinnedNLL(model=self._total_pdf_, data=self._data_handler_.get_data()) Logger("Performing FIT", "INFO") self._fit_result_ = self._minimizer_.minimize(loss=loss) Logger(self._fit_result_, 'RESULT') if self._fit_result_.hesse() == {}: if self._fit_result_.hesse(method='hesse_np') == {}: Logger('Impossible to compute hesse error', 'FATAL') self.__get_ratio_truncated() signal_fracs, _, _, signal_frac_errs, _, _ = self.__get_all_fracs() norm = self._total_pdf_norm_ if len(self._fracs_) == 0: self._rawyield_[0] = self._data_handler_.get_norm() self._rawyield_err_[0] = np.sqrt(self._rawyield_[0]) else: if self._extended_: for i_pdf, (_, frac, frac_err) in enumerate(zip(self._signal_pdfs_, signal_fracs, signal_frac_errs)): self._rawyield_[i_pdf] = self._total_pdf_.models[i_pdf].get_yield().value() # no background case: last signal fraction is 1 - sum(others) if not self.no_signal and\ self.no_background and\ i_pdf == len(self._signal_pdfs_) - 1: self._rawyield_err_[i_pdf] = np.sqrt( np.sum(np.square(signal_frac_errs)) + 2 * np.sum([ self.__get_frac_cov(self._fracs_[i], 'signal', self._fracs_[j]) for i in range(len(self._fracs_)-1) for j in range(i+1, len(self._fracs_)-1) ]) ) else: self._rawyield_err_[i_pdf] = self._rawyield_[i_pdf] * np.sqrt( (frac_err / frac)**2 + (self._fit_result_.hesse( params=self._total_yield_, method='hesse_np' )[self._total_yield_]['error'] / self._total_pdf_.get_yield().value())**2 + 2 * self.__get_frac_cov(self._fracs_[i_pdf], 'signal', self._total_yield_) / self._total_pdf_.get_yield().value() / frac ) else: for i_pdf, _ in enumerate(self._signal_pdfs_): if i_pdf in self._refl_idx_: continue self._rawyield_[i_pdf] = signal_fracs[i_pdf] * norm / self._ratio_truncated_ self._rawyield_err_[i_pdf] = signal_frac_errs[i_pdf] * norm / self._ratio_truncated_ return self._fit_result_ # pylint: disable=too-many-statements, too-many-locals def plot_mass_fit(self, **kwargs): """ Plot the mass fit Parameters ------------------------------------------------- **kwargs: dict Additional optional arguments: - style: str style to be used (see https://github.com/scikit-hep/mplhep for more details) - logy: bool log scale in y axis - figsize: tuple size of the figure - axis_title: str x-axis title - show_extra_info: bool show mu, sigma, chi2/ndf, signal, bkg, signal/bkg, significance - extra_info_massnsigma: float number of sigmas for extra info - extra_info_massnhwhm: float number of hwhms for extra info (alternative to extra_info_massnsigma) - extra_info_massrange: list mass range limits for extra info (alternative to extra_info_massnsigma) - extra_info_loc: list location of extra info (one for chi2 and one for other info) - legend_loc: str location of the legend - num: int number of bins to plot pdfs converted into histograms Returns ------------------------------------------------- fig: matplotlib.figure.Figure figure containing the mass fit plot """ style = kwargs.get('style', 'LHCb2') logy = kwargs.get('logy', False) figsize = kwargs.get('figsize', (7, 7)) bins = self._data_handler_.get_nbins() axis_title = kwargs.get('axis_title', self._data_handler_.get_var_name()) show_extra_info = kwargs.get('show_extra_info', False) num = kwargs.get('num', 10000) mass_range = kwargs.get('extra_info_massrange', None) nhwhm = kwargs.get('extra_info_massnhwhm', None) nsigma = kwargs.get('extra_info_massnsigma', 3) info_loc = kwargs.get('extra_info_loc', ['lower right', 'upper left']) legend_loc = kwargs.get('legend_loc', 'best') mplhep.style.use(style) obs = self._data_handler_.get_obs() limits = self._data_handler_.get_limits() fig, axs = plt.subplots(figsize=figsize) hdata = self._data_handler_.to_hist(lower_edge=limits[0], upper_edge=limits[1], nbins=bins, varname=self._data_handler_.get_var_name()) hdata.plot(yerr=True, color='black', histtype='errorbar', label='data') bin_sigma = (limits[1] - limits[0]) / bins norm_total_pdf = self._total_pdf_norm_ * bin_sigma x_plot = np.linspace(limits[0], limits[1], num=num) total_func = zfit.run(self._total_pdf_.pdf(x_plot, norm_range=obs)) signal_funcs, bkg_funcs = ([] for _ in range(2)) if not self.no_signal: for signal_pdf in self._signal_pdfs_: signal_funcs.append(zfit.run(signal_pdf.pdf.pdf(x_plot, norm_range=obs))) if not self.no_background: for bkg_pdf in self._background_pdfs_: bkg_funcs.append(zfit.run(bkg_pdf.pdf.pdf(x_plot, norm_range=obs))) signal_fracs, bkg_fracs, refl_fracs, _, _, _ = self.__get_all_fracs() # first draw backgrounds for ibkg, (bkg_func, bkg_frac) in enumerate(zip(bkg_funcs, bkg_fracs)): plt.plot(x_plot, bkg_func * norm_total_pdf * bkg_frac / self._ratio_truncated_, color=self._bkg_cmap_(ibkg), ls='--', label=self._background_pdfs_[ibkg].label) # then draw signals for isgn, (signal_func, frac) in enumerate(zip(signal_funcs, signal_fracs+refl_fracs)): plt.plot(x_plot, signal_func * norm_total_pdf * frac / self._ratio_truncated_, color=self._sgn_cmap_(isgn)) plt.fill_between(x_plot, signal_func * norm_total_pdf * frac / self._ratio_truncated_, color=self._sgn_cmap_(isgn), alpha=0.5, label=self._signal_pdfs_[isgn].label) plt.plot(x_plot, total_func * norm_total_pdf, color='xkcd:blue', label='total fit') plt.xlim(limits[0], limits[1]) plt.xlabel(axis_title) plt.ylabel(rf'counts / {(limits[1]-limits[0])/bins*1000:0.1f} MeV/$c^2$') if logy: plt.yscale('log') plt.ylim(min(total_func) * norm_total_pdf / 5, max(total_func) * norm_total_pdf * 5) else: plt.ylim(0., max(total_func) * norm_total_pdf * 1.5) if show_extra_info: # signal and background info for all signals text = [] for idx, signal_pdf in enumerate(self._signal_pdfs_): mass, mass_unc = self.get_mass(idx) sigma, sigma_unc = None, None gamma, gamma_unc = None, None rawyield, rawyield_err = self.get_raw_yield(idx=idx) if signal_pdf.has_sigma(): sigma, sigma_unc = self.get_sigma(idx) if signal_pdf.kind in (PDFType.CAUCHY, PDFType.VOIGTIAN): gamma, gamma_unc = self.get_signal_parameter(idx, 'gamma') extra_info = fr'{signal_pdf.label}''\n' \ + fr' $\mu = {mass*1000:.1f}\pm{mass_unc*1000:.1f}$ MeV$/c^2$''\n' if sigma is not None: extra_info += fr' $\sigma = {sigma*1000:.1f}\pm{sigma_unc*1000:.1f}$ MeV$/c^2$''\n' if gamma is not None: extra_info += fr' $\Gamma/2 = {gamma*1000:.1f}\pm{gamma_unc*1000:.1f}$ MeV$/c^2$''\n' extra_info += fr' $S={rawyield:.0f} \pm {rawyield_err:.0f}$''\n' if not self.no_background: if mass_range is not None: interval = f'[{mass_range[0]:.3f}, {mass_range[1]:.3f}]' bkg, bkg_err = self.get_background(idx=idx, min=mass_range[0], max=mass_range[1]) s_over_b, s_over_b_err = self.get_signal_over_background(idx=idx, min=mass_range[0], max=mass_range[1]) signif, signif_err = self.get_significance(idx=idx, min=mass_range[0], max=mass_range[1]) extra_info += fr' $B({interval})={bkg:.0f} \pm {bkg_err:.0f}$''\n' extra_info += fr' $S/B({interval})={s_over_b:.2f} \pm {s_over_b_err:.2f}$''\n' extra_info += fr' Signif.$({interval})={signif:.1f} \pm {signif_err:.1f}$' elif nhwhm is not None: bkg, bkg_err = self.get_background(idx=idx, nhwhm=nhwhm) s_over_b, s_over_b_err = self.get_signal_over_background(idx=idx, nhwhm=nhwhm) signif, signif_err = self.get_significance(idx=idx, nhwhm=nhwhm) extra_info += fr' $B({nhwhm}~\mathrm{{HWHM}})=${bkg:.0f} $\pm$ {bkg_err:.0f}''\n' extra_info += fr' $S/B({nhwhm}~\mathrm{{HWHM}})=${s_over_b:.2f} $\pm$ {s_over_b_err:.2f}''\n' extra_info += fr' Signif.$({nhwhm}~\mathrm{{HWHM}})=${signif:.1f} $\pm$ {signif_err:.1f}' else: bkg, bkg_err = self.get_background(idx=idx, nsigma=nsigma) s_over_b, s_over_b_err = self.get_signal_over_background(idx=idx, nsigma=nsigma) signif, signif_err = self.get_significance(idx=idx, nsigma=nsigma) extra_info += fr' $B({nsigma}\sigma)=${bkg:.0f} $\pm$ {bkg_err:.0f}''\n' extra_info += fr' $S/B({nsigma}\sigma)=${s_over_b:.2f} $\pm$ {s_over_b_err:.2f}''\n' extra_info += fr' Signif.$({nsigma}\sigma)=${signif:.1f} $\pm$ {signif_err:.1f}' text.append(extra_info) concatenated_text = '\n'.join(text) anchored_text_signal = AnchoredText(concatenated_text, loc=info_loc[1], frameon=False) if not self._is_truncated_: # chi2 not yet available for truncated fits # info on chi2/ndf chi2 = self.get_chi2() ndf = self.get_ndf() anchored_text_chi2 = AnchoredText(fr'$\chi^2 / \mathrm{{ndf}} =${chi2:.2f} / {ndf}', loc=info_loc[0], frameon=False) axs.add_artist(anchored_text_chi2) axs.add_artist(anchored_text_signal) plt.legend(loc=legend_loc) Logger('plot_mass_fit now returns a tuple (fig, axs) !', 'WARNING') return fig, axs # pylint: disable=too-many-statements, too-many-locals def dump_to_root(self, filename, **kwargs): """ Plot the mass fit Parameters ------------------------------------------------- filename: str Name of output ROOT file **kwargs: dict Additional optional arguments: - axis_title: str x-axis title - num: int number of bins to plot pdfs converted into histograms - option: str option (recreate or update) - suffix: str suffix to append to objects - folder: str folder in the ROOT file to store the objects """ num = kwargs.get('num', 10000) suffix = kwargs.get('suffix', '') option = kwargs.get('option', 'recreate') folder = kwargs.get('folder', '') bins = self._data_handler_.get_nbins() obs = self._data_handler_.get_obs() limits = self._data_handler_.get_limits() hdata = self._data_handler_.to_hist(lower_edge=limits[0], upper_edge=limits[1], nbins=bins, varname=self._data_handler_.get_var_name()) # write data self.__write_data(hdata, f'hdata{suffix}', folder, filename, option) bin_sigma = (limits[1] - limits[0]) / bins norm = self._total_pdf_norm_ * bin_sigma x_plot = np.linspace(limits[0], limits[1], num=num) total_func = zfit.run(self._total_pdf_.pdf(x_plot, norm_range=obs)) # write total_func self.__write_pdf(histname=f'total_func{suffix}', weight=total_func * norm, num=num, folder=folder, filename=filename, option='update') signal_funcs, bkg_funcs, refl_funcs = ([] for _ in range(3)) for signal_pdf in self._signal_pdfs_: signal_funcs.append(zfit.run(signal_pdf.pdf.pdf(x_plot, norm_range=obs))) for bkg_pdf in self._background_pdfs_: bkg_funcs.append(zfit.run(bkg_pdf.pdf.pdf(x_plot, norm_range=obs))) for refl_idx in self._refl_idx_: if refl_idx is None: continue refl_funcs.append(signal_funcs.pop(refl_idx)) signal_fracs, bkg_fracs, refl_fracs, _, _, _ = self.__get_all_fracs() # first write backgrounds for ibkg, (bkg_func, bkg_frac) in enumerate(zip(bkg_funcs, bkg_fracs)): self.__write_pdf(histname=f'bkg_{ibkg}{suffix}', weight=bkg_func * norm * bkg_frac / self._ratio_truncated_, num=num, folder=folder, filename=filename, option='update') # then write signals for isgn, (frac, signal_func) in enumerate(zip(signal_funcs, signal_fracs)): self.__write_pdf(histname=f'signal_{isgn}{suffix}', weight=signal_func * norm * frac / self._ratio_truncated_, num=num, folder=folder, filename=filename, option='update') # finally write reflected signals for irefl, (frac, refl_func) in enumerate(zip(refl_funcs, refl_fracs)): if self._signal_pdfs_[self._refl_idx_[irefl]] is None: continue self.__write_pdf(histname=f'refl_{irefl}{suffix}', weight=refl_func * norm * frac / self._ratio_truncated_, num=num, folder=folder, filename=filename, option='update') @property def get_fit_result(self): """ Get the fit result Returns ------------------------------------------------- fit_result: zfit.minimizers.fitresult.FitResult The fit result """ return self._fit_result_ def get_ndf(self): """ Get the number of degrees of freedom for chi2 fit ndf = nbins - nfreeparams - 1 -1 because the data sample size is fixed Returns ------------------------------------------------- ndf: int The number of degrees of freedom """ nbins = self._data_handler_.get_nbins() nfreeparams = len(self._fit_result_.params) self._ndf_ = nbins - nfreeparams - 1 return self._ndf_ def get_chi2(self): """ Get chi2 for binned data Returns ------------------------------------------------- chi2: float chi2 """ chi2 = 0 norm = self._data_handler_.get_norm() if self._data_handler_.get_is_binned(): # for chi2 loss, just retrieve loss value in fit result if self._chi2_loss_: return float(self._fit_result_.loss.value()) # for nll loss, compute chi2 "by hand" # access normalized data values and errors for all bins binned_data = self._data_handler_.get_binned_data() data_values = binned_data.values() data_variances = binned_data.variances() # access model predicted values model_values = self._total_pdf_binned_.values() * norm # compute chi2 for (data, model, data_variance) in zip(data_values, model_values, data_variances): chi2 += (data - model)**2/data_variance return chi2 if self._limits_ != [self._data_handler_.get_limits()]: # restricted fit limits Logger('chi2 not yet implemented with restricted fit limits', 'FATAL') # for unbinned data data_values = self._data_handler_.get_binned_data_from_unbinned_data() # access model predicted values model_values = self._total_pdf_binned_.values() * norm # compute chi2 for (data, model) in zip(data_values, model_values): if data == 0: continue chi2 += (data - model)**2/data return float(chi2) def get_chi2_ndf(self): """ Get the reduced chi2 (chi2 divided by number of degrees of freedom) for binned data Returns ------------------------------------------------- chi2_ndf: float The reduced chi2 """ return self.get_chi2()/self.get_ndf() def plot_raw_residuals(self, **kwargs): """ Plot the raw residuals Parameters ------------------------------------------------- **kwargs: dict Additional optional arguments: - style: str style to be used (see https://github.com/scikit-hep/mplhep for more details) - figsize: tuple size of the figure - axis_title: str x-axis title Returns ------------------------------------------------- fig: matplotlib.figure.Figure figure containing the raw residuals plot """ style = kwargs.get('style', 'LHCb2') figsize = kwargs.get('figsize', (7, 7)) axis_title = kwargs.get('axis_title', self._data_handler_.get_var_name()) mplhep.style.use(style) obs = self._data_handler_.get_obs() limits = self._data_handler_.get_limits() fig = plt.figure(figsize=figsize) if len(self._raw_residuals_) == 0: self.__get_raw_residuals() # draw residuals plt.errorbar( self._data_handler_.get_bin_center(), self._raw_residuals_, xerr=None, yerr=np.sqrt(self._raw_residual_variances_), linestyle="None", elinewidth=1, capsize=0, color="black", marker="o", markersize=5, label="residuals" ) bins = self._data_handler_.get_nbins() bin_sigma = (limits[1] - limits[0]) / bins norm = self._total_pdf_norm_ * bin_sigma x_plot = np.linspace(limits[0], limits[1], num=1000) signal_funcs, refl_funcs = ([] for _ in range(2)) for signal_pdf in self._signal_pdfs_: signal_funcs.append(zfit.run(signal_pdf.pdf.pdf(x_plot, norm_range=obs))) signal_fracs, _, refl_fracs, _, _, _ = self.__get_all_fracs() # draw signals for isgn, (signal_func, frac) in enumerate(zip(signal_funcs, signal_fracs)): plt.plot(x_plot, signal_func * norm * frac / self._ratio_truncated_, color=self._sgn_cmap_(isgn)) plt.fill_between(x_plot, signal_func * norm * frac / self._ratio_truncated_, color=self._sgn_cmap_(isgn), alpha=0.5, label=self._signal_pdfs_[isgn].label) # finally draw reflected signals (if any) is_there_refl = False for irefl, (refl_func, frac) in enumerate(zip(refl_funcs, refl_fracs)): if self._refl_pdfs_[irefl] is None: continue is_there_refl = True plt.plot(x_plot, refl_func * norm * frac / self._ratio_truncated_, color=self._refl_cmap_(irefl)) plt.fill_between(x_plot, refl_func * norm * frac / self._ratio_truncated_, color=self._refl_cmap_(irefl), alpha=0.5, label=f'reflected signal {irefl}') # draw signal + reflected signals (if any) if is_there_refl: for isgn, (signal_func, refl_func, frac_sgn, frac_refl) in enumerate( zip(signal_funcs, refl_funcs, signal_fracs, refl_fracs)): plt.plot(x_plot, (signal_func * frac_sgn + frac_refl * refl_func) * norm / self._ratio_truncated_, color='xkcd:blue', label='total - bkg') plt.xlim(limits[0], limits[1]) plt.xlabel(axis_title) plt.ylabel(rf'(data - fitted bkg) / {(limits[1]-limits[0])/bins*1000:0.1f} MeV/$c^2$') plt.legend(loc='best') return fig def plot_std_residuals(self, **kwargs): """ Plot the raw residuals Parameters ------------------------------------------------- **kwargs: dict Additional optional arguments: - style: str style to be used (see https://github.com/scikit-hep/mplhep for more details) - figsize: tuple size of the figure - axis_title: str x-axis title Returns ------------------------------------------------- fig: matplotlib.figure.Figure figure containing the raw residuals plot """ style = kwargs.get('style', 'LHCb2') figsize = kwargs.get('figsize', (7, 7)) axis_title = kwargs.get('axis_title', self._data_handler_.get_var_name()) mplhep.style.use(style) limits = self._data_handler_.get_limits() bins = self._data_handler_.get_nbins() bin_center = self._data_handler_.get_bin_center() xerr_lower = [bin_center[i] - bin_center[i-1] for i in range(1, len(bin_center))] xerr_lower.insert(0, bin_center[1] - bin_center[0]) xerr_upper = [bin_center[i+1] - bin_center[i] for i in range(len(bin_center)-1)] xerr_upper.append(bin_center[-1] - bin_center[-2]) fig = plt.figure(figsize=figsize) if len(self._std_residuals_) == 0: self.__get_std_residuals() # draw residuals plt.errorbar(bin_center, self._std_residuals_, xerr=(xerr_lower, xerr_upper), yerr=None, linestyle="None", elinewidth=1, capsize=0, color="black", marker="o", markersize=5, label=None) # line at 0 plt.plot([bin_center[0], bin_center[-1]], [0., 0.], lw=2, color='xkcd:blue') # line at -3 sigma plt.plot([bin_center[0], bin_center[-1]], [-3., -3.], lw=2, color='xkcd:red') # line at 3 sigma plt.plot([bin_center[0], bin_center[-1]], [3., 3.], lw=2, color='xkcd:red') plt.xlim(limits[0], limits[1]) plt.xlabel(axis_title) plt.ylabel(fr"$\dfrac{{ \mathrm{{data}} - \mathrm{{total \ fit}} }}{{ \sigma_{{ \mathrm{{data}} }} }}$" fr"/ {(limits[1]-limits[0])/bins*1000:0.1f} MeV/$c^2$") return fig def get_raw_yield(self, idx=0): """ Get the raw yield and its error Parameters ------------------------------------------------- idx: int Index of the raw yield to be returned (default: 0) Returns ------------------------------------------------- raw_yield: float The raw yield obtained from the fit raw_yield_err: float The raw yield error obtained from the fit """ return self._rawyield_[idx], self._rawyield_err_[idx] def get_raw_yield_bincounting(self, idx=0, **kwargs): """ Get the raw yield and its error via the bin-counting method Parameters ------------------------------------------------- idx: int Index of the raw yield to be returned (default: 0) **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for signal counting - nhwhm: float number of hwhm invariant-mass window around mean for signal counting (alternative to nsigma) - min: float minimum value of invariant-mass for signal counting (alternative to nsigma) - max: float maximum value of invariant-mass for signal counting (alternative to nsigma) Returns ------------------------------------------------- raw_yield: float The raw yield obtained from the bin counting raw_yield_err: float The raw yield error obtained from the bin counting """ nsigma = kwargs.get('nsigma', 3.) nhwhm = kwargs.get('nhwhm', None) min_value = kwargs.get('min', None) max_value = kwargs.get('max', None) use_nsigma = True if nhwhm is not None and (min_value is not None or max_value is not None): Logger('I cannot compute the signal within a fixed mass interval and a number of HWFM', 'ERROR') return 0., 0. if min_value is not None and max_value is not None: use_nsigma = False if nhwhm is not None: use_nsigma = False if not self._signal_pdfs_[idx].has_hwhm(): Logger('HWHM not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) hwhm, _ = self.get_hwhm(idx) min_value = mass - nhwhm * hwhm max_value = mass + nhwhm * hwhm if use_nsigma: if not self._signal_pdfs_[idx].has_sigma(): Logger('Sigma not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) sigma, _ = self.get_sigma(idx) min_value = mass - nsigma * sigma max_value = mass + nsigma * sigma if len(self._raw_residuals_) == 0: self.__get_raw_residuals() bin_centers = self._data_handler_.get_bin_center() bin_width = (bin_centers[1] - bin_centers[0]) / 2 raw_yield, raw_yield_err = 0., 0. for residual, variance, bin_center in zip(self._raw_residuals_, self._raw_residual_variances_, bin_centers): if bin_center - bin_width >= min_value and bin_center + bin_width <= max_value: raw_yield += residual raw_yield_err += variance raw_yield = float(raw_yield) raw_yield_err = np.sqrt(float(raw_yield_err)) return raw_yield, raw_yield_err def get_mass(self, idx=0): """ Get the mass and its error Parameters ------------------------------------------------- idx: int Index of the mass to be returned (default: 0) Returns ------------------------------------------------- mass: float The mass value obtained from the fit mass_err: float The mass error obtained from the fit """ if self._signal_pdfs_[idx].is_hist(): hist = self._signal_pdfs_[idx].to_hist() bin_limits = hist.to_numpy()[1] centres = [0.5 * (minn + maxx) for minn, maxx in zip(bin_limits[1:], bin_limits[:-1])] counts = hist.values() mass = np.average(centres, weights=counts) mass_err = 0. else: mass_name = 'm' if self._signal_pdfs_[idx].uses_m_not_mu() else 'mu' if self._signal_pdfs_[idx].get_fix_par(mass_name): mass = self._signal_pdfs_[idx].get_init_par(mass_name) mass_err = 0. else: mass = self._fit_result_.params[f'{self._name_}_{mass_name}_signal{idx}']['value'] mass_err = self._fit_result_.params[f'{self._name_}_{mass_name}_signal{idx}']['hesse']['error'] return mass, mass_err def get_sigma(self, idx=0): """ Get the sigma and its error Parameters ------------------------------------------------- idx: int Index of the sigma to be returned (default: 0) Returns ------------------------------------------------- sigma: float The sigma value obtained from the fit sigma_err: float The sigma error obtained from the fit """ if not self._signal_pdfs_[idx].has_sigma(): Logger(f'Sigma parameter not defined for {self._signal_pdfs_[idx].kind} pdf!', 'ERROR') return 0., 0. # if histogram, the rms is used as proxy if self._signal_pdfs_[idx].is_hist(): Logger(f'RMS used as proxy for sigma parameter of {self._signal_pdfs_[idx].kind} pdf!', 'WARNING') mean = self.get_mass(idx)[0] hist = self._signal_pdfs_[idx].to_hist() bin_limits = hist.to_numpy()[1] centres = [0.5 * (minn + maxx) for minn, maxx in zip(bin_limits[1:], bin_limits[:-1])] counts = hist.values() sigma = np.sqrt(np.average((centres - mean)**2, weights=counts)) sigma_err = 0. else: if self._signal_pdfs_[idx].get_fix_par('sigma'): sigma = self._signal_pdfs_[idx].get_init_par('sigma') sigma_err = 0. else: sigma = self._fit_result_.params[f'{self._name_}_sigma_signal{idx}']['value'] sigma_err = self._fit_result_.params[f'{self._name_}_sigma_signal{idx}']['hesse']['error'] return sigma, sigma_err def get_hwhm(self, idx=0): """ Get the half width half maximum and its error Parameters ------------------------------------------------- idx: int Index of the sigma to be returned (default: 0) Returns ------------------------------------------------- hwhm: float The sigma value obtained from the fit hwhm_err: float The sigma error obtained from the fit """ if not self._signal_pdfs_[idx].has_hwhm(): Logger(f'HFWM parameter not defined for {self._signal_pdfs_[idx].kind} pdf!', 'ERROR') return 0., 0. if self._signal_pdfs_[idx].kind == PDFKind.GAUSSIAN: mult_fact = np.sqrt(2 * np.log(2)) hwhm, hwhm_err = self.get_sigma(idx) hwhm *= mult_fact hwhm_err *= mult_fact elif self._signal_pdfs_[idx].kind == PDFKind.CAUCHY: hwhm, hwhm_err = self.get_signal_parameter(idx, 'gamma') elif self._signal_pdfs_[idx].kind == PDFKind.VOIGTIAN: mult_fact = np.sqrt(2 * np.log(2)) sigma, sigma_err = self.get_sigma(idx) sigma *= mult_fact sigma_err *= mult_fact gamma, gamma_err = self.get_signal_parameter(idx, 'gamma') hwhm = 0.5346 * gamma + np.sqrt(0.2166 * gamma**2 + sigma**2) # we neglect the correlation between sigma and gamma der_sigma = sigma / np.sqrt(0.0721663 + sigma**2) der_gamma = 0.5346 + (0.2166 * gamma) / np.sqrt(0.2166 * gamma**2 + sigma**2) hwhm_err = np.sqrt(der_sigma**2 * sigma_err**2 + der_gamma**2 * gamma_err**2) return hwhm, hwhm_err def get_signal_parameter(self, idx, par_name): """ Get a signal parameter and its error Parameters ------------------------------------------------- idx: int Index of the parameter to be returned (default: 0) par_name: str parameter to return Returns ------------------------------------------------- parameter: float The parameter value obtained from the fit parameter_err: float The parameter error obtained from the fit """ if par_name == 'gamma': Logger('The gamma parameter that you are getting is half of the width of a resonance,' ' for more info check the Cauchy pdf defined here ' 'https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/basic/zfit.pdf.Cauchy.html', 'WARNING') if self._signal_pdfs_[idx].get_fix_par(par_name): parameter = self._signal_pdfs_[idx].get_init_par(par_name) parameter_err = 0. else: parameter = self._fit_result_.params[f'{self._name_}_{par_name}_signal{idx}']['value'] parameter_err = self._fit_result_.params[f'{self._name_}_{par_name}_signal{idx}']['hesse']['error'] return parameter, parameter_err def get_background_parameter(self, idx, par_name): """ Get a background parameter and its error Parameters ------------------------------------------------- idx: int Index of the parameter to be returned (default: 0) par_name: str parameter to return Returns ------------------------------------------------- parameter: float The parameter value obtained from the fit parameter_err: float The parameter error obtained from the fit """ if self._background_pdfs_[idx].get_fix_par(par_name): parameter = self._background_pdfs_[idx].get_init_par(par_name) parameter_err = 0. else: parameter = self._fit_result_.params[f'{self._name_}_{par_name}_bkg{idx}']['value'] parameter_err = self._fit_result_.params[f'{self._name_}_{par_name}_bkg{idx}']['hesse']['error'] return parameter, parameter_err def get_signal(self, idx=0, **kwargs): """ Get the signal and its error in a given invariant-mass region Parameters ------------------------------------------------- idx: int Index of the signal to be returned **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for signal computation - nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma) - min: float minimum value of invariant-mass for signal computation (alternative to nsigma) - max: float maximum value of invariant-mass for signal computation (alternative to nsigma) Returns ------------------------------------------------- signal: float The signal value obtained from the fit signal_err: float The signal error obtained from the fit """ nsigma = kwargs.get('nsigma', 3.) nhwhm = kwargs.get('nhwhm', None) min_value = kwargs.get('min', None) max_value = kwargs.get('max', None) use_nsigma = True if nhwhm is not None and (min_value is not None or max_value is not None): Logger('I cannot compute the signal within a fixed mass interval and a number of HWFM', 'ERROR') return 0., 0. if min_value is not None and max_value is not None: use_nsigma = False if nhwhm is not None: use_nsigma = False if not self._signal_pdfs_[idx].has_hwhm(): Logger('HWHM not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) hwhm, _ = self.get_hwhm(idx) min_value = mass - nhwhm * hwhm max_value = mass + nhwhm * hwhm if use_nsigma: if not self._signal_pdfs_[idx].has_sigma(): Logger('Sigma not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) sigma, _ = self.get_sigma(idx) min_value = mass - nsigma * sigma max_value = mass + nsigma * sigma # pylint: disable=missing-kwoa signal = self._signal_pdfs_[idx].pdf.integrate((min_value, max_value)) signal_fracs, _, refl_fracs, signal_err_fracs, _, _ = self.__get_all_fracs() if len(self._background_pdfs_) > 0: frac = signal_fracs[idx] frac_err = signal_err_fracs[idx] else: if len(self._signal_pdfs_) == 1: frac = 1. frac_err = 0. if idx < len(signal_fracs): frac = signal_fracs[idx] frac_err = signal_err_fracs[idx] else: frac = 1. - sum(signal_fracs) - sum(refl_fracs) frac_err = np.sqrt(sum(list(err**2 for err in signal_err_fracs))) norm = self._data_handler_.get_norm() norm_err = norm * frac_err norm *= frac return float(signal * norm), float(signal * norm_err) def get_background(self, idx=0, **kwargs): """ Get the background and its error in a given invariant-mass region Parameters ------------------------------------------------- idx: int Index of the signal to be used to compute nsigma window **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for background computation - nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma) - min: float minimum value of invariant-mass for background computation (alternative to nsigma) - max: float maximum value of invariant-mass for background computation (alternative to nsigma) Returns ------------------------------------------------- background: float The background value obtained from the fit background_err: float The background error obtained from the fit """ if not self._background_pdfs_: Logger('Background not fitted', 'ERROR') return 0., 0. nsigma = kwargs.get('nsigma', 3.) nhwhm = kwargs.get('nhwhm', None) min_value = kwargs.get('min', None) max_value = kwargs.get('max', None) use_nsigma = True if nhwhm is not None and (min_value is not None or max_value is not None): Logger('I cannot compute the signal within a fixed mass interval and a number of HWFM', 'ERROR') return 0., 0. if min_value is not None and max_value is not None: use_nsigma = False if nhwhm is not None: use_nsigma = False if not self._signal_pdfs_[idx].has_hwhm(): Logger('HWHM not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) hwhm, _ = self.get_hwhm(idx) min_value = mass - nhwhm * hwhm max_value = mass + nhwhm * hwhm if use_nsigma: if not self._signal_pdfs_[idx].has_sigma(): Logger('Sigma not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) sigma, _ = self.get_sigma(idx) min_value = mass - nsigma * sigma max_value = mass + nsigma * sigma _, bkg_fracs, _, _, bkg_err_fracs, _ = self.__get_all_fracs() limits = self._data_handler_.get_limits() min_value = max(min_value, limits[0]) max_value = min(max_value, limits[1]) # pylint: disable=missing-kwoa background, background_err = 0., 0. for idx2, bkg in enumerate(self._background_pdfs_): norm = self._total_pdf_norm_ * bkg_fracs[idx2] norm_err = norm * bkg_err_fracs[idx2] bkg_int = float(bkg.pdf.integrate((min_value, max_value))) background += bkg_int * norm background_err += (bkg_int * norm_err)**2 background /= self._ratio_truncated_ background_err = np.sqrt(background_err) / self._ratio_truncated_ return float(background), float(background_err) def get_signal_over_background(self, idx=0, **kwargs): """ Get the S/B ratio and its error in a given invariant-mass region Parameters ------------------------------------------------- idx: int Index of the signal to be used to compute nsigma window **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for signal and background computation - nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma) - min: float minimum value of invariant-mass for signal and background computation (alternative to nsigma) - max: float maximum value of invariant-mass for signal and background computation (alternative to nsigma) Returns ------------------------------------------------- signal_over_background: float The S/B value obtained from the fit signal_over_background_err: float The S/B error obtained from the fit """ signal = self.get_signal(idx, **kwargs) bkg = self.get_background(idx, **kwargs) signal_over_background = signal[0]/bkg[0] signal_over_background_err = np.sqrt(signal[1]**2/signal[0]**2 + bkg[1]**2/bkg[0]**2) signal_over_background_err *= signal_over_background return signal_over_background, signal_over_background_err def get_significance(self, idx=0, **kwargs): """ Get the significance and its error in a given invariant-mass region Parameters ------------------------------------------------- idx: int Index of the signal to be used to compute nsigma window **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for signal and background computation - nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma) - min: float minimum value of invariant-mass for signal and background computation (alternative to nsigma) - max: float maximum value of invariant-mass for signal and background computation (alternative to nsigma) Returns ------------------------------------------------- significance: float The significance value obtained from the fit significance_err: float The significance error obtained from the fit """ signal = self.get_signal(idx, **kwargs) bkg = self.get_background(idx, **kwargs) significance = signal[0]/np.sqrt(signal[0]+bkg[0]) sig_plus_bkg = signal[0] + bkg[0] significance_err = significance*np.sqrt( (signal[1]**2 + bkg[1]**2) / (4. * sig_plus_bkg**2) + ( bkg[0]/sig_plus_bkg) * signal[1]**2 / signal[0]**2) return significance, significance_err def get_sweights(self): """ Calculate sWeights for signal and background components. Returns: dict: A dictionary containing sWeights for 'signal' and 'bkg' components. """ if self._is_truncated_: Logger('sWeights not available for truncated fit', 'ERROR') return {'signal': None, 'bkg': None} if self._extended_: model = self._total_pdf_.get_models()[0] # Get the SumPDF object (not the truncated one) sweights_model = [] for i_pdf, pdf in enumerate(self._signal_pdfs_): sweights_model.append(i_pdf) for i_pdf, pdf in enumerate(self._background_pdfs_): sweights_model.append(i_pdf + len(self._signal_pdfs_)) model = zfit.pdf.SumPDF( [model.get_models()[i] for i in sweights_model], extended=self._data_handler_.get_norm(), obs=self._data_handler_.get_obs(), name=self._name_ + '_sweights' ) sweights = compute_sweights(model, self._data_handler_.get_data()) names = self._signal_pdfs_+self._background_pdfs_ names = [names[i].kind.value for i in sweights_model] for new_name, old_name in zip( self._signal_pdfs_+self._background_pdfs_, list(sweights.keys()) ): sweights[new_name.kind.value] = sweights.pop(old_name) return sweights # Not extended case signal_pdf_extended = [] refl_pdf_extended = [] bkg_pdf_extended = [] norm = self._data_handler_.get_norm() signal_fracs, bkg_fracs, refl_fracs, _, _, _ = self.__get_all_fracs() bkg_fracs.append(1-sum(bkg_fracs)-sum(signal_fracs)-sum(refl_fracs)) total_signal_yields = 0. total_bkg_yields = 0. for pdf, frac in zip(self._signal_pdfs_, signal_fracs+refl_fracs): signal_pdf_extended.append(pdf.create_extended(frac * norm)) total_signal_yields += frac * norm for pdf, frac in zip(self._background_pdfs_, bkg_fracs): bkg_pdf_extended.append(pdf.create_extended(frac * norm)) total_bkg_yields += frac * norm total_signal_yields_par = zfit.Parameter('signal_yield', total_signal_yields) total_bkg_yields_par = zfit.Parameter('bkg_yield', total_bkg_yields) if len(signal_pdf_extended + refl_pdf_extended) > 1: signal_pdf_for_sweights = zfit.pdf.SumPDF( signal_pdf_extended, extended=total_signal_yields_par ) else: signal_pdf_for_sweights = signal_pdf_extended[0] if len(bkg_pdf_extended) > 1: bkg_pdf_for_sweights = zfit.pdf.SumPDF( bkg_pdf_extended + refl_pdf_extended, extended=total_bkg_yields_par ) else: bkg_pdf_for_sweights = bkg_pdf_extended[0] total_pdf_for_sweights = zfit.pdf.SumPDF([signal_pdf_for_sweights, bkg_pdf_for_sweights]) sweights = compute_sweights(total_pdf_for_sweights, self._data_handler_.get_data()) sweights_labels = list(sweights.keys()) return {'signal': sweights[sweights_labels[0]], 'bkg': sweights[sweights_labels[1]]} def set_particle_mass(self, idx, **kwargs): """ Set the particle mass Parameters ------------------------------------------------- idx: int Index of the signal **kwargs: dict Additional optional arguments: - mass: float The mass of the particle - pdg_id: int PDG ID of the particle (alternative to mass) - pdg_name: str Name of the particle (alternative to mass) - limits: list minimum and maximum limits for the mass parameter - fix: bool fix the mass parameter """ mass_name = 'm' if self._signal_pdfs_[idx].uses_m_not_mu() else 'mu' mass = 0. if 'mass' in kwargs: mass = kwargs['mass'] elif 'pdg_id' in kwargs: mass = self.pdg_api.get_particle_by_mcid(kwargs['pdg_id']).mass elif 'pdg_name' in kwargs: mass = self.pdg_api.get_particle_by_name(kwargs['pdg_name']).mass else: Logger(f'"mass", "pdg_id", and "pdg_name" not provided, mass value for signal {idx} will not be set', 'ERROR') self._signal_pdfs_[idx].set_init_par(mass_name, mass) if 'limits' in kwargs: self._signal_pdfs_[idx].set_limits_par(mass_name, kwargs['limits']) if 'fix' in kwargs: self._signal_pdfs_[idx].set_fix_par(mass_name, kwargs['fix']) def set_signal_initpar(self, idx, par_name, init_value, **kwargs): """ Set a signal parameter Parameters ------------------------------------------------- idx: int Index of the signal par_name: str The name of the parameter to be set init_value: float The value of parameter to be set **kwargs: dict Additional optional arguments: - limits: list minimum and maximum limits for the parameter - fix: bool fix the parameter to init_value """ if par_name == 'gamma': Logger('The gamma parameter that you are setting is half of the width of a resonance,' ' for more info check the Cauchy pdf defined here ' 'https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/basic/zfit.pdf.Cauchy.html', 'WARNING') self._signal_pdfs_[idx].set_init_par(par_name, init_value) if 'limits' in kwargs: self._signal_pdfs_[idx].set_limits_par(par_name, kwargs['limits']) if 'fix' in kwargs: self._signal_pdfs_[idx].set_fix_par(par_name, kwargs['fix']) def set_background_initpar(self, idx, par_name, init_value, **kwargs): """ Set a background parameter Parameters ------------------------------------------------- idx: int Index of the background par_name: str The name of the parameter to be set init_value: float The value of parameter to be set **kwargs: dict Additional optional arguments: - limits: list minimum and maximum limits for the parameter - fix: bool fix the mass parameter """ self._background_pdfs_[idx].set_init_par(par_name, init_value) if 'limits' in kwargs: self._background_pdfs_[idx].set_limits_par(par_name, kwargs['limits']) if 'fix' in kwargs: self._background_pdfs_[idx].set_fix_par(par_name, kwargs['fix']) def __get_parameter_fix_frac(self, factor, name): """ Get the parameter fix for the fraction Parameters ------------------------------------------------- idx_pdf: int Index of the pdf target_pdf: int Index of the target pdf factor: float Factor to multiply the fraction parameter """ return zfit.Parameter(name, factor, floating=False) def __check_consistency_fix_frac(self, idx_pdf, target_pdf, idx_pdf_type, target_pdf_type): """ Checks the consistency of fixing the fraction between PDFs. Parameters ------------------------------------------------- idx_pdf: int The index of the PDF to check. target_pdf: int The target PDF to compare against. idx_pdf_type: 'signal' or 'bkg' The type of the PDF to check. target_pdf_type: 'signal' or 'bkg' The type of the target PDF to compare against. """ if idx_pdf_type == target_pdf_type and idx_pdf == target_pdf: Logger( f'Index {idx_pdf} is the same as {target_pdf},' 'cannot constrain the fraction to itself', 'FATAL' ) if target_pdf_type == 'signal' and target_pdf > len(self._signal_pdfs_): Logger( f'Target signal index {target_pdf} is out of range', 'FATAL' ) if target_pdf_type == 'bkg' and target_pdf > len(self._background_pdfs_): Logger( f'Target background index {target_pdf} is out of range', 'FATAL' ) def fix_signal_frac_to_signal_pdf(self, idx_pdf, target_pdf, factor=1): """ Fix the frac parameter of a signal to the frac parameter of another signal Parameters ------------------------------------------------- idx_pdf: int Index of the signal fraction to be fixed target_pdf: int Index of the signal fraction to be used as reference factor: float Factor to multiply the frac parameter of the target signal """ self.__check_consistency_fix_frac(idx_pdf, target_pdf, 'signal', 'signal') factor_par = self.__get_parameter_fix_frac( factor, f'factor_signal{idx_pdf}_constrained_to_signal{target_pdf}' ) self._fix_fracs_to_pdfs_.append({ 'fixed_pdf_idx': idx_pdf, 'target_pdf_idx': target_pdf, 'factor': factor_par, 'fixed_pdf_type': 'signal', 'target_pdf_type': 'signal' }) def fix_signal_frac_to_bkg_pdf(self, idx_pdf, target_pdf, factor=1): """ Fix the frac parameter of a signal to the frac parameter of a background Parameters ------------------------------------------------- idx_pdf: int Index of the signal fraction to be fixed target_pdf: int Index of the background fraction to be used as reference factor: float Factor to multiply the frac parameter of the target background """ self.__check_consistency_fix_frac(idx_pdf, target_pdf, 'signal', 'bkg') factor_par = self.__get_parameter_fix_frac( factor, f'factor_signal{idx_pdf}_constrained_to_bkg{target_pdf}' ) self._fix_fracs_to_pdfs_.append({ 'fixed_pdf_idx': idx_pdf, 'target_pdf_idx': target_pdf, 'factor': factor_par, 'fixed_pdf_type': 'signal', 'target_pdf_type': 'bkg' }) def fix_bkg_frac_to_signal_pdf(self, idx_pdf, target_pdf, factor=1): """ Fix the frac parameter of a background to the frac parameter of a signal Parameters ------------------------------------------------- idx_pdf: int Index of the background fraction to be fixed target_pdf: int Index of the signal fraction to be used as reference factor: float Factor to multiply the frac parameter of the target signal """ self.__check_consistency_fix_frac(idx_pdf, target_pdf, 'bkg', 'signal') factor_par = self.__get_parameter_fix_frac( factor, f'factor_bkg{idx_pdf}_constrained_to_signal{target_pdf}' ) self._fix_fracs_to_pdfs_.append({ 'fixed_pdf_idx': idx_pdf, 'target_pdf_idx': target_pdf, 'factor': factor_par, 'fixed_pdf_type': 'bkg', 'target_pdf_type': 'signal' }) def fix_bkg_frac_to_bkg_pdf(self, idx_pdf, target_pdf, factor=1): """ Fix the frac parameter of a background to the frac parameter of another background Parameters ------------------------------------------------- idx_pdf: int Index of the background fraction to be fixed target_pdf: int Index of the background fraction to be used as reference factor: float Factor to multiply the frac parameter of the target background """ self.__check_consistency_fix_frac(idx_pdf, target_pdf, 'bkg', 'bkg') factor_par = self.__get_parameter_fix_frac( factor, f'factor_bkg{idx_pdf}_constrained_to_bkg{target_pdf}' ) self._fix_fracs_to_pdfs_.append({ 'fixed_pdf_idx': idx_pdf, 'target_pdf_idx': target_pdf, 'factor': factor_par, 'fixed_pdf_type': 'bkg', 'target_pdf_type': 'bkg' }) # pylint: disable=line-too-long def set_signal_template(self, idx, sample): """ Set sample and options for signal template Parameters ------------------------------------------------- idx: int Index of the signal sample: flarefly.DataHandler Data sample for histogram template """ edges_sgn = sample.get_bin_edges() edges_data = self._data_handler_.get_bin_edges() if len(edges_sgn) != len(edges_data): Logger(f'The data and the signal template {idx} have different number of bins:' f' \n -> signal template: {len(edges_sgn)-1}, data -> {len(edges_data)-1}', 'FATAL') if not np.allclose(edges_sgn, edges_data): Logger(f'The data and the signal template {idx} have different bin edges:' f' \n -> signal template: {edges_sgn}, data -> {edges_data}', 'FATAL') self._signal_pdfs_[idx].hist_sample = sample # pylint: disable=line-too-long def set_signal_kde(self, idx, sample, **kwargs): """ Set sample and options for signal kde Parameters ------------------------------------------------- idx: int Index of the signal sample: flarefly.DataHandler Data sample for Kernel Density Estimation **kwargs: dict Arguments for kde options. See https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/kde_api/zfit.pdf.KDE1DimGrid.html#zfit.pdf.KDE1DimGrid for more details """ self._signal_pdfs_[idx].kde_sample = sample self._signal_pdfs_[idx].kde_option = kwargs # pylint: disable=line-too-long def set_reflection_template(self, idx, sample, r_over_s): """ Set sample and options for reflected signal template Parameters ------------------------------------------------- idx: int Index of the reflected signal sample: flarefly.DataHandler Data sample for histogram template r_over_s: float R/S ratio """ edges_refl = sample.get_bin_edges() edges_data = self._data_handler_.get_bin_edges() if len(edges_refl) != len(edges_data): Logger(f'The data and the reflection template {idx} have different number of bins:' f' \n -> reflection template: {len(edges_refl)-1}, data -> {len(edges_data)-1}', 'FATAL') if not np.allclose(edges_refl, edges_data): Logger(f'The data and the reflection template {idx} have different bin edges:' f' \n -> reflection template: {edges_refl}, data -> {edges_data}', 'FATAL') self._signal_pdfs_[self._refl_idx_[idx]].hist_sample = sample self.fix_signal_frac_to_signal_pdf(self._refl_idx_[idx], idx, factor=r_over_s) # pylint: disable=line-too-long def set_reflection_kde(self, idx, sample, r_over_s, **kwargs): """ Set sample and options for reflected signal kde Parameters ------------------------------------------------- idx: int Index of the signal sample: flarefly.DataHandler Data sample for Kernel Density Estimation r_over_s: float R/S ratio **kwargs: dict Arguments for kde options. See https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/kde_api/zfit.pdf.KDE1DimGrid.html#zfit.pdf.KDE1DimGrid for more details """ self._signal_pdfs_[self._refl_idx_[idx]].kde_sample = sample self._signal_pdfs_[self._refl_idx_[idx]].kde_option = kwargs self.fix_signal_frac_to_signal_pdf(idx, self._refl_idx_[idx], factor=r_over_s) # pylint: disable=line-too-long def set_background_template(self, idx, sample): """ Set sample and options for background template histogram Parameters ------------------------------------------------- idx: int Index of the background sample: flarefly.DataHandler Data sample for template histogram """ edges_bkg = sample.get_bin_edges() edges_data = self._data_handler_.get_bin_edges() if len(edges_bkg) != len(edges_data): Logger(f'The data and the background template {idx} have different number of bins:' f' \n -> background template: {len(edges_bkg)-1}, data -> {len(edges_data)-1}', 'FATAL') if not np.allclose(edges_bkg, edges_data): Logger(f'The data and the background template {idx} have different bin edges:' f' \n -> background template: {edges_bkg}, data -> {edges_data}', 'FATAL') self._background_pdfs_[idx].hist_sample = sample # pylint: disable=line-too-long def set_background_kde(self, idx, sample, **kwargs): """ Set sample and options for background kde Parameters ------------------------------------------------- idx: int Index of the background sample: flarefly.DataHandler Data sample for Kernel Density Estimation **kwargs: dict Arguments for kde options. See https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/kde_api/zfit.pdf.KDE1DimGrid.html#zfit.pdf.KDE1DimGrid for more details """ self._background_pdfs_[idx].kde_sample = sample self._background_pdfs_[idx].kde_option = kwargs def __write_data(self, hdata, histname='hdata', folder='', filename='output.root', option='recreate'): """ Helper method to save a data histogram in a .root file (TH1D format) Parameters ------------------------------------------------- hdata: hist Histogram containing the data histname: str Name of the histogram folder: str Folder in the ROOT file filename: str Name of the ROOT file option: str Option (recreate or update) """ if option not in ['recreate', 'update']: Logger('Illegal option to save outputs in ROOT file!', 'FATAL') data_path = f"{folder}/{histname}" if folder != "" else histname if option == 'recreate': with uproot.recreate(filename) as ofile: ofile[data_path] = hdata else: with uproot.update(filename) as ofile: ofile[data_path] = hdata def __write_pdf(self, histname, weight, num, folder="", filename='output.root', option='recreate'): """ Helper method to save a pdf histogram in a .root file (TH1D format) Parameters ------------------------------------------------- histname: str Name of the histogram weight: array[float] Array of weights for histogram bins num: int Number of bins to plot pdfs converted into histograms filename: str ROOT file name option: str Option (recreate or update) """ if option not in ['recreate', 'update']: Logger('Illegal option to save outputs in ROOT file!', 'FATAL') limits = self._data_handler_.get_limits() x_plot = np.linspace(limits[0], limits[1], num=num) histo = Hist.new.Reg(num, limits[0], limits[1], name=self._data_handler_.get_var_name()).Double() histo.fill(x_plot, weight=weight) pdf_path = f"{folder}/{histname}" if folder != "" else histname if option == 'recreate': with uproot.recreate(filename) as ofile: ofile[pdf_path] = histo else: with uproot.update(filename) as ofile: ofile[pdf_path] = histo def get_name_signal_pdf(self): """ Return the signal pdf names """ signal_slice = self._signal_pdfs_[:-self._n_refl_] if self._n_refl_ > 0 else self._signal_pdfs_ return [str(pdf.kind) for pdf in signal_slice] def get_name_background_pdf(self): """ Return the background pdf names """ return [str(pdf.kind) for pdf in self._background_pdfs_] def get_name_refl_pdf(self): """ Return the reflection pdf names """ return [str(pdf.kind) for pdf in self._signal_pdfs_[-self._n_refl_:]] def get_signal_pars(self): """ Return the signal parameters """ fracs = self.__get_all_fracs() signal_pars = [] signal_slice = self._signal_pdfs_[:-self._n_refl_] if self._n_refl_ > 0 else self._signal_pdfs_ for i_sgn, pdf in enumerate(signal_slice): signal_pars.append({}) for key, value in pdf.parameters.items(): par_name = key.split(f'{self._name_}_')[-1] par_name = par_name.split(f'_signal{i_sgn}')[0] signal_pars[-1][par_name] = value.numpy() signal_pars[-1]['frac'] = fracs[0][i_sgn] return signal_pars def get_signal_pars_uncs(self): """ Return the signal parameters uncertainties """ fracs = self.__get_all_fracs() signal_pars_uncs = [] signal_slice = self._signal_pdfs_[:-self._n_refl_] if self._n_refl_ > 0 else self._signal_pdfs_ for i_sgn, pdf in enumerate(signal_slice): signal_pars_uncs.append({}) for key in pdf.parameters: par_name = key.split(f'{self._name_}_')[-1] par_name = par_name.split(f'_signal{i_sgn}')[0] try: par_unc = self._fit_result_.params[key]['hesse']['error'] except KeyError: # fixed parameter par_unc = 0. signal_pars_uncs[-1][par_name] = par_unc signal_pars_uncs[-1]['frac'] = fracs[3][i_sgn] return signal_pars_uncs def get_bkg_pars(self): """ Return the background parameters """ fracs = self.__get_all_fracs() bkg_pars = [] for i_bkg, pdf in enumerate(self._background_pdfs_): bkg_pars.append({}) for key, value in pdf.parameters.items(): par_name = key.split(f'{self._name_}_')[-1] par_name = par_name.split(f'_bkg{i_bkg}')[0] bkg_pars[-1][par_name] = value.numpy() bkg_pars[-1]['frac'] = fracs[1][i_bkg] return bkg_pars def get_bkg_pars_uncs(self): """ Return the background parameters uncertainties """ fracs = self.__get_all_fracs() bkg_pars_uncs = [] for i_bkg, pdf in enumerate(self._background_pdfs_): bkg_pars_uncs.append({}) for key in pdf.parameters: par_name = key.split(f'{self._name_}_')[-1] par_name = par_name.split(f'_bkg{i_bkg}')[0] try: par_unc = self._fit_result_.params[key]['hesse']['error'] except KeyError: # fixed parameter par_unc = 0. bkg_pars_uncs[-1][par_name] = par_unc bkg_pars_uncs[-1]['frac'] = fracs[4][i_bkg] return bkg_pars_uncs def sample_pdf(self, num=1000): """ Sample the total pdf Parameters ------------------------------------------------- num: int Number of points to sample Returns ------------------------------------------------- samples: numpy array Array of sampled points """ if not self._is_pdf_built_: self.__build_total_pdf() self.__build_total_pdf_binned() self._is_pdf_built_ = True sample = self._total_pdf_.sample(num).to_numpy() return sample.flatten()Class used to perform mass fits with the zfit library https://github.com/zfit/zfit
Initialize the F2MassFitter class Parameters
data_handler:DataHandler- The data handler containing the data to fit
name_signal_pdf:list-
The list of names for the signal pdfs. The possible options are:
-
'nosignal'
-
'gaussian'
-
'doublegaus'
-
'gausexptail'
-
'genergausexptail'
-
'genergausexptailsymm'
-
'bifurgaus'
-
'crystalball'
-
'doublecb'
-
'doublecbsymm'
-
'genercrystalball'
-
'cauchy'
-
'voigtian'
-
'kde_exact' (requires to set the datasample and options)
-
'kde_grid' (requires to set the datasample and options)
-
'kde_fft' (requires to set the datasample and options)
-
'kde_isj' (requires to set the datasample and options)
-
'hist' (only for binned fits, requires to set the datasample)
-
name_background_pdf:list-
The list of names of the background pdfs. The possible options are:
-
'nobkg'
-
'expo'
-
'powlaw'
-
'expopow'
-
'expopowext'
-
'chebpolN' (N is the order of the polynomial)
-
'kde_exact' (requires to set the datasample and options)
-
'kde_grid' (requires to set the datasample and options)
-
'kde_fft' (requires to set the datasample and options)
-
'kde_isj' (requires to set the datasample and options)
-
'hist' (only for binned fits, requires to set the datasample)
-
**kwargs:dict-
Additional optional arguments:
-
name_refl_pdf: list The list of names of the signal pdfs. It must have the same length as the signal list. The possible options are:
-
'kde_exact' (requires to set the datasample and options)
-
'kde_grid' (requires to set the datasample and options)
-
'kde_fft' (requires to set the datasample and options)
-
'kde_isj' (requires to set the datasample and options)
-
'hist' (only for binned fits, requires to set the datasample)
-
-
name: str Optional name for the fitter, needed in case of multiple fitters defined in the same script
-
chi2_loss: bool chi2 minimization if True, nll minmization else, default value to False
-
extended: bool If True, the pdf is considered extended, i.e. the yield is a parameter to fit. default value to False
-
minuit_mode: A number used by minuit to define the internal minimization strategy, either 0, 1 or 2. 0 is the fastest, 2 is the slowest (see more details in https://zfit.readthedocs.io/en/latest/user_api/minimize/_generated/minimizers/zfit.minimize.Minuit.html#zfit.minimize.Minuit) Default value to 0
-
limits: list list of fit limits to include in the fit
-
tol: float Termination value for the convergence/stopping criterion of the algorithm in order to determine if the minimum has been found. Default value to 0.001
-
verbosity: int verbosity level (from 0 to 10) Default value to 0
-
signal_at_threshold: list list of booleans which indicate whether the signal PDFs are at threshold or not. Each element corresponds to a signal pdf
-
label_signal_pdf: list list of labels for signal pdfs
-
label_bkg_pdf: list list of labels for background pdfs
-
Instance variables
prop get_fit_result-
Expand source code
@property def get_fit_result(self): """ Get the fit result Returns ------------------------------------------------- fit_result: zfit.minimizers.fitresult.FitResult The fit result """ return self._fit_result_Get the fit result
Returns
fit_result:zfit.minimizers.fitresult.FitResult- The fit result
Methods
def cleanup(self)-
Expand source code
def cleanup(self): """ Cleanup function to clear the graph cache of zfit """ zfit.run.clear_graph_cache()Cleanup function to clear the graph cache of zfit
def dump_to_root(self, filename, **kwargs)-
Expand source code
def dump_to_root(self, filename, **kwargs): """ Plot the mass fit Parameters ------------------------------------------------- filename: str Name of output ROOT file **kwargs: dict Additional optional arguments: - axis_title: str x-axis title - num: int number of bins to plot pdfs converted into histograms - option: str option (recreate or update) - suffix: str suffix to append to objects - folder: str folder in the ROOT file to store the objects """ num = kwargs.get('num', 10000) suffix = kwargs.get('suffix', '') option = kwargs.get('option', 'recreate') folder = kwargs.get('folder', '') bins = self._data_handler_.get_nbins() obs = self._data_handler_.get_obs() limits = self._data_handler_.get_limits() hdata = self._data_handler_.to_hist(lower_edge=limits[0], upper_edge=limits[1], nbins=bins, varname=self._data_handler_.get_var_name()) # write data self.__write_data(hdata, f'hdata{suffix}', folder, filename, option) bin_sigma = (limits[1] - limits[0]) / bins norm = self._total_pdf_norm_ * bin_sigma x_plot = np.linspace(limits[0], limits[1], num=num) total_func = zfit.run(self._total_pdf_.pdf(x_plot, norm_range=obs)) # write total_func self.__write_pdf(histname=f'total_func{suffix}', weight=total_func * norm, num=num, folder=folder, filename=filename, option='update') signal_funcs, bkg_funcs, refl_funcs = ([] for _ in range(3)) for signal_pdf in self._signal_pdfs_: signal_funcs.append(zfit.run(signal_pdf.pdf.pdf(x_plot, norm_range=obs))) for bkg_pdf in self._background_pdfs_: bkg_funcs.append(zfit.run(bkg_pdf.pdf.pdf(x_plot, norm_range=obs))) for refl_idx in self._refl_idx_: if refl_idx is None: continue refl_funcs.append(signal_funcs.pop(refl_idx)) signal_fracs, bkg_fracs, refl_fracs, _, _, _ = self.__get_all_fracs() # first write backgrounds for ibkg, (bkg_func, bkg_frac) in enumerate(zip(bkg_funcs, bkg_fracs)): self.__write_pdf(histname=f'bkg_{ibkg}{suffix}', weight=bkg_func * norm * bkg_frac / self._ratio_truncated_, num=num, folder=folder, filename=filename, option='update') # then write signals for isgn, (frac, signal_func) in enumerate(zip(signal_funcs, signal_fracs)): self.__write_pdf(histname=f'signal_{isgn}{suffix}', weight=signal_func * norm * frac / self._ratio_truncated_, num=num, folder=folder, filename=filename, option='update') # finally write reflected signals for irefl, (frac, refl_func) in enumerate(zip(refl_funcs, refl_fracs)): if self._signal_pdfs_[self._refl_idx_[irefl]] is None: continue self.__write_pdf(histname=f'refl_{irefl}{suffix}', weight=refl_func * norm * frac / self._ratio_truncated_, num=num, folder=folder, filename=filename, option='update')Plot the mass fit
Parameters
filename:str- Name of output ROOT file
**kwargs:dict-
Additional optional arguments:
-
axis_title: str x-axis title
-
num: int number of bins to plot pdfs converted into histograms
-
option: str option (recreate or update)
-
suffix: str suffix to append to objects
-
folder: str folder in the ROOT file to store the objects
-
def fix_bkg_frac_to_bkg_pdf(self, idx_pdf, target_pdf, factor=1)-
Expand source code
def fix_bkg_frac_to_bkg_pdf(self, idx_pdf, target_pdf, factor=1): """ Fix the frac parameter of a background to the frac parameter of another background Parameters ------------------------------------------------- idx_pdf: int Index of the background fraction to be fixed target_pdf: int Index of the background fraction to be used as reference factor: float Factor to multiply the frac parameter of the target background """ self.__check_consistency_fix_frac(idx_pdf, target_pdf, 'bkg', 'bkg') factor_par = self.__get_parameter_fix_frac( factor, f'factor_bkg{idx_pdf}_constrained_to_bkg{target_pdf}' ) self._fix_fracs_to_pdfs_.append({ 'fixed_pdf_idx': idx_pdf, 'target_pdf_idx': target_pdf, 'factor': factor_par, 'fixed_pdf_type': 'bkg', 'target_pdf_type': 'bkg' })Fix the frac parameter of a background to the frac parameter of another background
Parameters
idx_pdf:int- Index of the background fraction to be fixed
target_pdf:int- Index of the background fraction to be used as reference
factor:float- Factor to multiply the frac parameter of the target background
def fix_bkg_frac_to_signal_pdf(self, idx_pdf, target_pdf, factor=1)-
Expand source code
def fix_bkg_frac_to_signal_pdf(self, idx_pdf, target_pdf, factor=1): """ Fix the frac parameter of a background to the frac parameter of a signal Parameters ------------------------------------------------- idx_pdf: int Index of the background fraction to be fixed target_pdf: int Index of the signal fraction to be used as reference factor: float Factor to multiply the frac parameter of the target signal """ self.__check_consistency_fix_frac(idx_pdf, target_pdf, 'bkg', 'signal') factor_par = self.__get_parameter_fix_frac( factor, f'factor_bkg{idx_pdf}_constrained_to_signal{target_pdf}' ) self._fix_fracs_to_pdfs_.append({ 'fixed_pdf_idx': idx_pdf, 'target_pdf_idx': target_pdf, 'factor': factor_par, 'fixed_pdf_type': 'bkg', 'target_pdf_type': 'signal' })Fix the frac parameter of a background to the frac parameter of a signal
Parameters
idx_pdf:int- Index of the background fraction to be fixed
target_pdf:int- Index of the signal fraction to be used as reference
factor:float- Factor to multiply the frac parameter of the target signal
def fix_signal_frac_to_bkg_pdf(self, idx_pdf, target_pdf, factor=1)-
Expand source code
def fix_signal_frac_to_bkg_pdf(self, idx_pdf, target_pdf, factor=1): """ Fix the frac parameter of a signal to the frac parameter of a background Parameters ------------------------------------------------- idx_pdf: int Index of the signal fraction to be fixed target_pdf: int Index of the background fraction to be used as reference factor: float Factor to multiply the frac parameter of the target background """ self.__check_consistency_fix_frac(idx_pdf, target_pdf, 'signal', 'bkg') factor_par = self.__get_parameter_fix_frac( factor, f'factor_signal{idx_pdf}_constrained_to_bkg{target_pdf}' ) self._fix_fracs_to_pdfs_.append({ 'fixed_pdf_idx': idx_pdf, 'target_pdf_idx': target_pdf, 'factor': factor_par, 'fixed_pdf_type': 'signal', 'target_pdf_type': 'bkg' })Fix the frac parameter of a signal to the frac parameter of a background
Parameters
idx_pdf:int- Index of the signal fraction to be fixed
target_pdf:int- Index of the background fraction to be used as reference
factor:float- Factor to multiply the frac parameter of the target background
def fix_signal_frac_to_signal_pdf(self, idx_pdf, target_pdf, factor=1)-
Expand source code
def fix_signal_frac_to_signal_pdf(self, idx_pdf, target_pdf, factor=1): """ Fix the frac parameter of a signal to the frac parameter of another signal Parameters ------------------------------------------------- idx_pdf: int Index of the signal fraction to be fixed target_pdf: int Index of the signal fraction to be used as reference factor: float Factor to multiply the frac parameter of the target signal """ self.__check_consistency_fix_frac(idx_pdf, target_pdf, 'signal', 'signal') factor_par = self.__get_parameter_fix_frac( factor, f'factor_signal{idx_pdf}_constrained_to_signal{target_pdf}' ) self._fix_fracs_to_pdfs_.append({ 'fixed_pdf_idx': idx_pdf, 'target_pdf_idx': target_pdf, 'factor': factor_par, 'fixed_pdf_type': 'signal', 'target_pdf_type': 'signal' })Fix the frac parameter of a signal to the frac parameter of another signal
Parameters
idx_pdf:int- Index of the signal fraction to be fixed
target_pdf:int- Index of the signal fraction to be used as reference
factor:float- Factor to multiply the frac parameter of the target signal
def get_background(self, idx=0, **kwargs)-
Expand source code
def get_background(self, idx=0, **kwargs): """ Get the background and its error in a given invariant-mass region Parameters ------------------------------------------------- idx: int Index of the signal to be used to compute nsigma window **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for background computation - nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma) - min: float minimum value of invariant-mass for background computation (alternative to nsigma) - max: float maximum value of invariant-mass for background computation (alternative to nsigma) Returns ------------------------------------------------- background: float The background value obtained from the fit background_err: float The background error obtained from the fit """ if not self._background_pdfs_: Logger('Background not fitted', 'ERROR') return 0., 0. nsigma = kwargs.get('nsigma', 3.) nhwhm = kwargs.get('nhwhm', None) min_value = kwargs.get('min', None) max_value = kwargs.get('max', None) use_nsigma = True if nhwhm is not None and (min_value is not None or max_value is not None): Logger('I cannot compute the signal within a fixed mass interval and a number of HWFM', 'ERROR') return 0., 0. if min_value is not None and max_value is not None: use_nsigma = False if nhwhm is not None: use_nsigma = False if not self._signal_pdfs_[idx].has_hwhm(): Logger('HWHM not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) hwhm, _ = self.get_hwhm(idx) min_value = mass - nhwhm * hwhm max_value = mass + nhwhm * hwhm if use_nsigma: if not self._signal_pdfs_[idx].has_sigma(): Logger('Sigma not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) sigma, _ = self.get_sigma(idx) min_value = mass - nsigma * sigma max_value = mass + nsigma * sigma _, bkg_fracs, _, _, bkg_err_fracs, _ = self.__get_all_fracs() limits = self._data_handler_.get_limits() min_value = max(min_value, limits[0]) max_value = min(max_value, limits[1]) # pylint: disable=missing-kwoa background, background_err = 0., 0. for idx2, bkg in enumerate(self._background_pdfs_): norm = self._total_pdf_norm_ * bkg_fracs[idx2] norm_err = norm * bkg_err_fracs[idx2] bkg_int = float(bkg.pdf.integrate((min_value, max_value))) background += bkg_int * norm background_err += (bkg_int * norm_err)**2 background /= self._ratio_truncated_ background_err = np.sqrt(background_err) / self._ratio_truncated_ return float(background), float(background_err)Get the background and its error in a given invariant-mass region
Parameters
idx:int- Index of the signal to be used to compute nsigma window
**kwargs:dict-
Additional optional arguments:
-
nsigma: float nsigma invariant-mass window around mean for background computation
-
nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma)
-
min: float minimum value of invariant-mass for background computation (alternative to nsigma)
-
max: float maximum value of invariant-mass for background computation (alternative to nsigma)
-
Returns
background:float- The background value obtained from the fit
background_err:float- The background error obtained from the fit
def get_background_parameter(self, idx, par_name)-
Expand source code
def get_background_parameter(self, idx, par_name): """ Get a background parameter and its error Parameters ------------------------------------------------- idx: int Index of the parameter to be returned (default: 0) par_name: str parameter to return Returns ------------------------------------------------- parameter: float The parameter value obtained from the fit parameter_err: float The parameter error obtained from the fit """ if self._background_pdfs_[idx].get_fix_par(par_name): parameter = self._background_pdfs_[idx].get_init_par(par_name) parameter_err = 0. else: parameter = self._fit_result_.params[f'{self._name_}_{par_name}_bkg{idx}']['value'] parameter_err = self._fit_result_.params[f'{self._name_}_{par_name}_bkg{idx}']['hesse']['error'] return parameter, parameter_errGet a background parameter and its error
Parameters
idx:int- Index of the parameter to be returned (default: 0)
par_name:str- parameter to return
Returns
parameter:float- The parameter value obtained from the fit
parameter_err:float- The parameter error obtained from the fit
def get_bkg_pars(self)-
Expand source code
def get_bkg_pars(self): """ Return the background parameters """ fracs = self.__get_all_fracs() bkg_pars = [] for i_bkg, pdf in enumerate(self._background_pdfs_): bkg_pars.append({}) for key, value in pdf.parameters.items(): par_name = key.split(f'{self._name_}_')[-1] par_name = par_name.split(f'_bkg{i_bkg}')[0] bkg_pars[-1][par_name] = value.numpy() bkg_pars[-1]['frac'] = fracs[1][i_bkg] return bkg_parsReturn the background parameters
def get_bkg_pars_uncs(self)-
Expand source code
def get_bkg_pars_uncs(self): """ Return the background parameters uncertainties """ fracs = self.__get_all_fracs() bkg_pars_uncs = [] for i_bkg, pdf in enumerate(self._background_pdfs_): bkg_pars_uncs.append({}) for key in pdf.parameters: par_name = key.split(f'{self._name_}_')[-1] par_name = par_name.split(f'_bkg{i_bkg}')[0] try: par_unc = self._fit_result_.params[key]['hesse']['error'] except KeyError: # fixed parameter par_unc = 0. bkg_pars_uncs[-1][par_name] = par_unc bkg_pars_uncs[-1]['frac'] = fracs[4][i_bkg] return bkg_pars_uncsReturn the background parameters uncertainties
def get_chi2(self)-
Expand source code
def get_chi2(self): """ Get chi2 for binned data Returns ------------------------------------------------- chi2: float chi2 """ chi2 = 0 norm = self._data_handler_.get_norm() if self._data_handler_.get_is_binned(): # for chi2 loss, just retrieve loss value in fit result if self._chi2_loss_: return float(self._fit_result_.loss.value()) # for nll loss, compute chi2 "by hand" # access normalized data values and errors for all bins binned_data = self._data_handler_.get_binned_data() data_values = binned_data.values() data_variances = binned_data.variances() # access model predicted values model_values = self._total_pdf_binned_.values() * norm # compute chi2 for (data, model, data_variance) in zip(data_values, model_values, data_variances): chi2 += (data - model)**2/data_variance return chi2 if self._limits_ != [self._data_handler_.get_limits()]: # restricted fit limits Logger('chi2 not yet implemented with restricted fit limits', 'FATAL') # for unbinned data data_values = self._data_handler_.get_binned_data_from_unbinned_data() # access model predicted values model_values = self._total_pdf_binned_.values() * norm # compute chi2 for (data, model) in zip(data_values, model_values): if data == 0: continue chi2 += (data - model)**2/data return float(chi2)Get chi2 for binned data
Returns
chi2:float- chi2
def get_chi2_ndf(self)-
Expand source code
def get_chi2_ndf(self): """ Get the reduced chi2 (chi2 divided by number of degrees of freedom) for binned data Returns ------------------------------------------------- chi2_ndf: float The reduced chi2 """ return self.get_chi2()/self.get_ndf()Get the reduced chi2 (chi2 divided by number of degrees of freedom) for binned data
Returns
chi2_ndf:float- The reduced chi2
def get_hwhm(self, idx=0)-
Expand source code
def get_hwhm(self, idx=0): """ Get the half width half maximum and its error Parameters ------------------------------------------------- idx: int Index of the sigma to be returned (default: 0) Returns ------------------------------------------------- hwhm: float The sigma value obtained from the fit hwhm_err: float The sigma error obtained from the fit """ if not self._signal_pdfs_[idx].has_hwhm(): Logger(f'HFWM parameter not defined for {self._signal_pdfs_[idx].kind} pdf!', 'ERROR') return 0., 0. if self._signal_pdfs_[idx].kind == PDFKind.GAUSSIAN: mult_fact = np.sqrt(2 * np.log(2)) hwhm, hwhm_err = self.get_sigma(idx) hwhm *= mult_fact hwhm_err *= mult_fact elif self._signal_pdfs_[idx].kind == PDFKind.CAUCHY: hwhm, hwhm_err = self.get_signal_parameter(idx, 'gamma') elif self._signal_pdfs_[idx].kind == PDFKind.VOIGTIAN: mult_fact = np.sqrt(2 * np.log(2)) sigma, sigma_err = self.get_sigma(idx) sigma *= mult_fact sigma_err *= mult_fact gamma, gamma_err = self.get_signal_parameter(idx, 'gamma') hwhm = 0.5346 * gamma + np.sqrt(0.2166 * gamma**2 + sigma**2) # we neglect the correlation between sigma and gamma der_sigma = sigma / np.sqrt(0.0721663 + sigma**2) der_gamma = 0.5346 + (0.2166 * gamma) / np.sqrt(0.2166 * gamma**2 + sigma**2) hwhm_err = np.sqrt(der_sigma**2 * sigma_err**2 + der_gamma**2 * gamma_err**2) return hwhm, hwhm_errGet the half width half maximum and its error
Parameters
idx:int- Index of the sigma to be returned (default: 0)
Returns
hwhm:float- The sigma value obtained from the fit
hwhm_err:float- The sigma error obtained from the fit
def get_mass(self, idx=0)-
Expand source code
def get_mass(self, idx=0): """ Get the mass and its error Parameters ------------------------------------------------- idx: int Index of the mass to be returned (default: 0) Returns ------------------------------------------------- mass: float The mass value obtained from the fit mass_err: float The mass error obtained from the fit """ if self._signal_pdfs_[idx].is_hist(): hist = self._signal_pdfs_[idx].to_hist() bin_limits = hist.to_numpy()[1] centres = [0.5 * (minn + maxx) for minn, maxx in zip(bin_limits[1:], bin_limits[:-1])] counts = hist.values() mass = np.average(centres, weights=counts) mass_err = 0. else: mass_name = 'm' if self._signal_pdfs_[idx].uses_m_not_mu() else 'mu' if self._signal_pdfs_[idx].get_fix_par(mass_name): mass = self._signal_pdfs_[idx].get_init_par(mass_name) mass_err = 0. else: mass = self._fit_result_.params[f'{self._name_}_{mass_name}_signal{idx}']['value'] mass_err = self._fit_result_.params[f'{self._name_}_{mass_name}_signal{idx}']['hesse']['error'] return mass, mass_errGet the mass and its error
Parameters
idx:int- Index of the mass to be returned (default: 0)
Returns
mass:float- The mass value obtained from the fit
mass_err:float- The mass error obtained from the fit
def get_name_background_pdf(self)-
Expand source code
def get_name_background_pdf(self): """ Return the background pdf names """ return [str(pdf.kind) for pdf in self._background_pdfs_]Return the background pdf names
def get_name_refl_pdf(self)-
Expand source code
def get_name_refl_pdf(self): """ Return the reflection pdf names """ return [str(pdf.kind) for pdf in self._signal_pdfs_[-self._n_refl_:]]Return the reflection pdf names
def get_name_signal_pdf(self)-
Expand source code
def get_name_signal_pdf(self): """ Return the signal pdf names """ signal_slice = self._signal_pdfs_[:-self._n_refl_] if self._n_refl_ > 0 else self._signal_pdfs_ return [str(pdf.kind) for pdf in signal_slice]Return the signal pdf names
def get_ndf(self)-
Expand source code
def get_ndf(self): """ Get the number of degrees of freedom for chi2 fit ndf = nbins - nfreeparams - 1 -1 because the data sample size is fixed Returns ------------------------------------------------- ndf: int The number of degrees of freedom """ nbins = self._data_handler_.get_nbins() nfreeparams = len(self._fit_result_.params) self._ndf_ = nbins - nfreeparams - 1 return self._ndf_Get the number of degrees of freedom for chi2 fit ndf = nbins - nfreeparams - 1 -1 because the data sample size is fixed
Returns
ndf:int- The number of degrees of freedom
def get_raw_yield(self, idx=0)-
Expand source code
def get_raw_yield(self, idx=0): """ Get the raw yield and its error Parameters ------------------------------------------------- idx: int Index of the raw yield to be returned (default: 0) Returns ------------------------------------------------- raw_yield: float The raw yield obtained from the fit raw_yield_err: float The raw yield error obtained from the fit """ return self._rawyield_[idx], self._rawyield_err_[idx]Get the raw yield and its error
Parameters
idx:int- Index of the raw yield to be returned (default: 0)
Returns
raw_yield:float- The raw yield obtained from the fit
raw_yield_err:float- The raw yield error obtained from the fit
def get_raw_yield_bincounting(self, idx=0, **kwargs)-
Expand source code
def get_raw_yield_bincounting(self, idx=0, **kwargs): """ Get the raw yield and its error via the bin-counting method Parameters ------------------------------------------------- idx: int Index of the raw yield to be returned (default: 0) **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for signal counting - nhwhm: float number of hwhm invariant-mass window around mean for signal counting (alternative to nsigma) - min: float minimum value of invariant-mass for signal counting (alternative to nsigma) - max: float maximum value of invariant-mass for signal counting (alternative to nsigma) Returns ------------------------------------------------- raw_yield: float The raw yield obtained from the bin counting raw_yield_err: float The raw yield error obtained from the bin counting """ nsigma = kwargs.get('nsigma', 3.) nhwhm = kwargs.get('nhwhm', None) min_value = kwargs.get('min', None) max_value = kwargs.get('max', None) use_nsigma = True if nhwhm is not None and (min_value is not None or max_value is not None): Logger('I cannot compute the signal within a fixed mass interval and a number of HWFM', 'ERROR') return 0., 0. if min_value is not None and max_value is not None: use_nsigma = False if nhwhm is not None: use_nsigma = False if not self._signal_pdfs_[idx].has_hwhm(): Logger('HWHM not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) hwhm, _ = self.get_hwhm(idx) min_value = mass - nhwhm * hwhm max_value = mass + nhwhm * hwhm if use_nsigma: if not self._signal_pdfs_[idx].has_sigma(): Logger('Sigma not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) sigma, _ = self.get_sigma(idx) min_value = mass - nsigma * sigma max_value = mass + nsigma * sigma if len(self._raw_residuals_) == 0: self.__get_raw_residuals() bin_centers = self._data_handler_.get_bin_center() bin_width = (bin_centers[1] - bin_centers[0]) / 2 raw_yield, raw_yield_err = 0., 0. for residual, variance, bin_center in zip(self._raw_residuals_, self._raw_residual_variances_, bin_centers): if bin_center - bin_width >= min_value and bin_center + bin_width <= max_value: raw_yield += residual raw_yield_err += variance raw_yield = float(raw_yield) raw_yield_err = np.sqrt(float(raw_yield_err)) return raw_yield, raw_yield_errGet the raw yield and its error via the bin-counting method
Parameters
idx:int- Index of the raw yield to be returned (default: 0)
**kwargs:dict-
Additional optional arguments:
-
nsigma: float nsigma invariant-mass window around mean for signal counting
-
nhwhm: float number of hwhm invariant-mass window around mean for signal counting (alternative to nsigma)
-
min: float minimum value of invariant-mass for signal counting (alternative to nsigma)
-
max: float maximum value of invariant-mass for signal counting (alternative to nsigma)
-
Returns
raw_yield:float- The raw yield obtained from the bin counting
raw_yield_err:float- The raw yield error obtained from the bin counting
def get_sigma(self, idx=0)-
Expand source code
def get_sigma(self, idx=0): """ Get the sigma and its error Parameters ------------------------------------------------- idx: int Index of the sigma to be returned (default: 0) Returns ------------------------------------------------- sigma: float The sigma value obtained from the fit sigma_err: float The sigma error obtained from the fit """ if not self._signal_pdfs_[idx].has_sigma(): Logger(f'Sigma parameter not defined for {self._signal_pdfs_[idx].kind} pdf!', 'ERROR') return 0., 0. # if histogram, the rms is used as proxy if self._signal_pdfs_[idx].is_hist(): Logger(f'RMS used as proxy for sigma parameter of {self._signal_pdfs_[idx].kind} pdf!', 'WARNING') mean = self.get_mass(idx)[0] hist = self._signal_pdfs_[idx].to_hist() bin_limits = hist.to_numpy()[1] centres = [0.5 * (minn + maxx) for minn, maxx in zip(bin_limits[1:], bin_limits[:-1])] counts = hist.values() sigma = np.sqrt(np.average((centres - mean)**2, weights=counts)) sigma_err = 0. else: if self._signal_pdfs_[idx].get_fix_par('sigma'): sigma = self._signal_pdfs_[idx].get_init_par('sigma') sigma_err = 0. else: sigma = self._fit_result_.params[f'{self._name_}_sigma_signal{idx}']['value'] sigma_err = self._fit_result_.params[f'{self._name_}_sigma_signal{idx}']['hesse']['error'] return sigma, sigma_errGet the sigma and its error
Parameters
idx:int- Index of the sigma to be returned (default: 0)
Returns
sigma:float- The sigma value obtained from the fit
sigma_err:float- The sigma error obtained from the fit
def get_signal(self, idx=0, **kwargs)-
Expand source code
def get_signal(self, idx=0, **kwargs): """ Get the signal and its error in a given invariant-mass region Parameters ------------------------------------------------- idx: int Index of the signal to be returned **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for signal computation - nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma) - min: float minimum value of invariant-mass for signal computation (alternative to nsigma) - max: float maximum value of invariant-mass for signal computation (alternative to nsigma) Returns ------------------------------------------------- signal: float The signal value obtained from the fit signal_err: float The signal error obtained from the fit """ nsigma = kwargs.get('nsigma', 3.) nhwhm = kwargs.get('nhwhm', None) min_value = kwargs.get('min', None) max_value = kwargs.get('max', None) use_nsigma = True if nhwhm is not None and (min_value is not None or max_value is not None): Logger('I cannot compute the signal within a fixed mass interval and a number of HWFM', 'ERROR') return 0., 0. if min_value is not None and max_value is not None: use_nsigma = False if nhwhm is not None: use_nsigma = False if not self._signal_pdfs_[idx].has_hwhm(): Logger('HWHM not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) hwhm, _ = self.get_hwhm(idx) min_value = mass - nhwhm * hwhm max_value = mass + nhwhm * hwhm if use_nsigma: if not self._signal_pdfs_[idx].has_sigma(): Logger('Sigma not defined, I cannot compute the signal for this pdf', 'ERROR') return 0., 0. mass, _ = self.get_mass(idx) sigma, _ = self.get_sigma(idx) min_value = mass - nsigma * sigma max_value = mass + nsigma * sigma # pylint: disable=missing-kwoa signal = self._signal_pdfs_[idx].pdf.integrate((min_value, max_value)) signal_fracs, _, refl_fracs, signal_err_fracs, _, _ = self.__get_all_fracs() if len(self._background_pdfs_) > 0: frac = signal_fracs[idx] frac_err = signal_err_fracs[idx] else: if len(self._signal_pdfs_) == 1: frac = 1. frac_err = 0. if idx < len(signal_fracs): frac = signal_fracs[idx] frac_err = signal_err_fracs[idx] else: frac = 1. - sum(signal_fracs) - sum(refl_fracs) frac_err = np.sqrt(sum(list(err**2 for err in signal_err_fracs))) norm = self._data_handler_.get_norm() norm_err = norm * frac_err norm *= frac return float(signal * norm), float(signal * norm_err)Get the signal and its error in a given invariant-mass region
Parameters
idx:int- Index of the signal to be returned
**kwargs:dict-
Additional optional arguments:
-
nsigma: float nsigma invariant-mass window around mean for signal computation
-
nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma)
-
min: float minimum value of invariant-mass for signal computation (alternative to nsigma)
-
max: float maximum value of invariant-mass for signal computation (alternative to nsigma)
-
Returns
signal:float- The signal value obtained from the fit
signal_err:float- The signal error obtained from the fit
def get_signal_over_background(self, idx=0, **kwargs)-
Expand source code
def get_signal_over_background(self, idx=0, **kwargs): """ Get the S/B ratio and its error in a given invariant-mass region Parameters ------------------------------------------------- idx: int Index of the signal to be used to compute nsigma window **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for signal and background computation - nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma) - min: float minimum value of invariant-mass for signal and background computation (alternative to nsigma) - max: float maximum value of invariant-mass for signal and background computation (alternative to nsigma) Returns ------------------------------------------------- signal_over_background: float The S/B value obtained from the fit signal_over_background_err: float The S/B error obtained from the fit """ signal = self.get_signal(idx, **kwargs) bkg = self.get_background(idx, **kwargs) signal_over_background = signal[0]/bkg[0] signal_over_background_err = np.sqrt(signal[1]**2/signal[0]**2 + bkg[1]**2/bkg[0]**2) signal_over_background_err *= signal_over_background return signal_over_background, signal_over_background_errGet the S/B ratio and its error in a given invariant-mass region
Parameters
idx:int- Index of the signal to be used to compute nsigma window
**kwargs:dict-
Additional optional arguments:
-
nsigma: float nsigma invariant-mass window around mean for signal and background computation
-
nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma)
-
min: float minimum value of invariant-mass for signal and background computation (alternative to nsigma)
-
max: float maximum value of invariant-mass for signal and background computation (alternative to nsigma)
-
Returns
signal_over_background:float- The S/B value obtained from the fit
signal_over_background_err:float- The S/B error obtained from the fit
def get_signal_parameter(self, idx, par_name)-
Expand source code
def get_signal_parameter(self, idx, par_name): """ Get a signal parameter and its error Parameters ------------------------------------------------- idx: int Index of the parameter to be returned (default: 0) par_name: str parameter to return Returns ------------------------------------------------- parameter: float The parameter value obtained from the fit parameter_err: float The parameter error obtained from the fit """ if par_name == 'gamma': Logger('The gamma parameter that you are getting is half of the width of a resonance,' ' for more info check the Cauchy pdf defined here ' 'https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/basic/zfit.pdf.Cauchy.html', 'WARNING') if self._signal_pdfs_[idx].get_fix_par(par_name): parameter = self._signal_pdfs_[idx].get_init_par(par_name) parameter_err = 0. else: parameter = self._fit_result_.params[f'{self._name_}_{par_name}_signal{idx}']['value'] parameter_err = self._fit_result_.params[f'{self._name_}_{par_name}_signal{idx}']['hesse']['error'] return parameter, parameter_errGet a signal parameter and its error
Parameters
idx:int- Index of the parameter to be returned (default: 0)
par_name:str- parameter to return
Returns
parameter:float- The parameter value obtained from the fit
parameter_err:float- The parameter error obtained from the fit
def get_signal_pars(self)-
Expand source code
def get_signal_pars(self): """ Return the signal parameters """ fracs = self.__get_all_fracs() signal_pars = [] signal_slice = self._signal_pdfs_[:-self._n_refl_] if self._n_refl_ > 0 else self._signal_pdfs_ for i_sgn, pdf in enumerate(signal_slice): signal_pars.append({}) for key, value in pdf.parameters.items(): par_name = key.split(f'{self._name_}_')[-1] par_name = par_name.split(f'_signal{i_sgn}')[0] signal_pars[-1][par_name] = value.numpy() signal_pars[-1]['frac'] = fracs[0][i_sgn] return signal_parsReturn the signal parameters
def get_signal_pars_uncs(self)-
Expand source code
def get_signal_pars_uncs(self): """ Return the signal parameters uncertainties """ fracs = self.__get_all_fracs() signal_pars_uncs = [] signal_slice = self._signal_pdfs_[:-self._n_refl_] if self._n_refl_ > 0 else self._signal_pdfs_ for i_sgn, pdf in enumerate(signal_slice): signal_pars_uncs.append({}) for key in pdf.parameters: par_name = key.split(f'{self._name_}_')[-1] par_name = par_name.split(f'_signal{i_sgn}')[0] try: par_unc = self._fit_result_.params[key]['hesse']['error'] except KeyError: # fixed parameter par_unc = 0. signal_pars_uncs[-1][par_name] = par_unc signal_pars_uncs[-1]['frac'] = fracs[3][i_sgn] return signal_pars_uncsReturn the signal parameters uncertainties
def get_significance(self, idx=0, **kwargs)-
Expand source code
def get_significance(self, idx=0, **kwargs): """ Get the significance and its error in a given invariant-mass region Parameters ------------------------------------------------- idx: int Index of the signal to be used to compute nsigma window **kwargs: dict Additional optional arguments: - nsigma: float nsigma invariant-mass window around mean for signal and background computation - nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma) - min: float minimum value of invariant-mass for signal and background computation (alternative to nsigma) - max: float maximum value of invariant-mass for signal and background computation (alternative to nsigma) Returns ------------------------------------------------- significance: float The significance value obtained from the fit significance_err: float The significance error obtained from the fit """ signal = self.get_signal(idx, **kwargs) bkg = self.get_background(idx, **kwargs) significance = signal[0]/np.sqrt(signal[0]+bkg[0]) sig_plus_bkg = signal[0] + bkg[0] significance_err = significance*np.sqrt( (signal[1]**2 + bkg[1]**2) / (4. * sig_plus_bkg**2) + ( bkg[0]/sig_plus_bkg) * signal[1]**2 / signal[0]**2) return significance, significance_errGet the significance and its error in a given invariant-mass region
Parameters
idx:int- Index of the signal to be used to compute nsigma window
**kwargs:dict-
Additional optional arguments:
-
nsigma: float nsigma invariant-mass window around mean for signal and background computation
-
nhwhm: float number of hwhm invariant-mass window around mean for signal and background computation (alternative to nsigma)
-
min: float minimum value of invariant-mass for signal and background computation (alternative to nsigma)
-
max: float maximum value of invariant-mass for signal and background computation (alternative to nsigma)
-
Returns
significance:float- The significance value obtained from the fit
significance_err:float- The significance error obtained from the fit
def get_sweights(self)-
Expand source code
def get_sweights(self): """ Calculate sWeights for signal and background components. Returns: dict: A dictionary containing sWeights for 'signal' and 'bkg' components. """ if self._is_truncated_: Logger('sWeights not available for truncated fit', 'ERROR') return {'signal': None, 'bkg': None} if self._extended_: model = self._total_pdf_.get_models()[0] # Get the SumPDF object (not the truncated one) sweights_model = [] for i_pdf, pdf in enumerate(self._signal_pdfs_): sweights_model.append(i_pdf) for i_pdf, pdf in enumerate(self._background_pdfs_): sweights_model.append(i_pdf + len(self._signal_pdfs_)) model = zfit.pdf.SumPDF( [model.get_models()[i] for i in sweights_model], extended=self._data_handler_.get_norm(), obs=self._data_handler_.get_obs(), name=self._name_ + '_sweights' ) sweights = compute_sweights(model, self._data_handler_.get_data()) names = self._signal_pdfs_+self._background_pdfs_ names = [names[i].kind.value for i in sweights_model] for new_name, old_name in zip( self._signal_pdfs_+self._background_pdfs_, list(sweights.keys()) ): sweights[new_name.kind.value] = sweights.pop(old_name) return sweights # Not extended case signal_pdf_extended = [] refl_pdf_extended = [] bkg_pdf_extended = [] norm = self._data_handler_.get_norm() signal_fracs, bkg_fracs, refl_fracs, _, _, _ = self.__get_all_fracs() bkg_fracs.append(1-sum(bkg_fracs)-sum(signal_fracs)-sum(refl_fracs)) total_signal_yields = 0. total_bkg_yields = 0. for pdf, frac in zip(self._signal_pdfs_, signal_fracs+refl_fracs): signal_pdf_extended.append(pdf.create_extended(frac * norm)) total_signal_yields += frac * norm for pdf, frac in zip(self._background_pdfs_, bkg_fracs): bkg_pdf_extended.append(pdf.create_extended(frac * norm)) total_bkg_yields += frac * norm total_signal_yields_par = zfit.Parameter('signal_yield', total_signal_yields) total_bkg_yields_par = zfit.Parameter('bkg_yield', total_bkg_yields) if len(signal_pdf_extended + refl_pdf_extended) > 1: signal_pdf_for_sweights = zfit.pdf.SumPDF( signal_pdf_extended, extended=total_signal_yields_par ) else: signal_pdf_for_sweights = signal_pdf_extended[0] if len(bkg_pdf_extended) > 1: bkg_pdf_for_sweights = zfit.pdf.SumPDF( bkg_pdf_extended + refl_pdf_extended, extended=total_bkg_yields_par ) else: bkg_pdf_for_sweights = bkg_pdf_extended[0] total_pdf_for_sweights = zfit.pdf.SumPDF([signal_pdf_for_sweights, bkg_pdf_for_sweights]) sweights = compute_sweights(total_pdf_for_sweights, self._data_handler_.get_data()) sweights_labels = list(sweights.keys()) return {'signal': sweights[sweights_labels[0]], 'bkg': sweights[sweights_labels[1]]}Calculate sWeights for signal and background components.
Returns
dict- A dictionary containing sWeights for 'signal' and 'bkg' components.
def mass_zfit(self, do_prefit=False, **kwargs)-
Expand source code
def mass_zfit(self, do_prefit=False, **kwargs): """ Perform a mass fit with the zfit library Parameters ------------------------------------------------- do_prefit: bool Flag to enable prefit to background only (supported for unbinned fits only) The expected signal regions must be provided. **kwargs: dict Additional optional arguments: - prefit_excluded_regions: list List or list of lists with limits for the excluded regions - prefit_exclude_nsigma: Alternative argument to prefit_excluded_regions that excludes the regions around the signals, using the initialised mean and sigma parameters Returns ------------------------------------------------- fit_result: zfit.minimizers.fitresult.FitResult The fit result """ if self._data_handler_ is None: Logger('Data handler not specified', 'FATAL') self._raw_residuals_ = [] self._raw_residual_variances_ = [] self._std_residuals_ = [] self.__build_total_pdf() self.__build_total_pdf_binned() self._is_pdf_built_ = True # do prefit if do_prefit: skip_prefit = False excluded_regions = [] exclude_signals_nsigma = kwargs.get('prefit_exclude_nsigma', None) if exclude_signals_nsigma is not None: for pdf in self._signal_pdfs_: if not pdf.kind.has_sigma(): Logger(f"Sigma parameter not defined for {str(pdf.kind)}, " "cannot use nsigma for excluded regions in prefit", "Error") skip_prefit = True else: mass_name = 'm' if pdf.kind.uses_m_not_mu() else 'mu' mass = pdf.get_init_par(mass_name) sigma = pdf.get_init_par('sigma') excluded_regions.append( [mass - exclude_signals_nsigma * sigma, mass + exclude_signals_nsigma * sigma]) else: excluded_regions = kwargs.get('prefit_excluded_regions', None) if excluded_regions is None: Logger("To perform the prefit, you should set the excluded regions, skip", "Error") skip_prefit = True if not skip_prefit: self.__prefit(excluded_regions) if self._data_handler_.get_is_binned(): # chi2 loss if self._chi2_loss_: loss = zfit.loss.BinnedChi2(self._total_pdf_binned_, self._data_handler_.get_binned_data()) # nll loss else: loss = zfit.loss.BinnedNLL(self._total_pdf_binned_, self._data_handler_.get_binned_data()) else: if self._extended_: loss = zfit.loss.ExtendedUnbinnedNLL(model=self._total_pdf_, data=self._data_handler_.get_data()) else: loss = zfit.loss.UnbinnedNLL(model=self._total_pdf_, data=self._data_handler_.get_data()) Logger("Performing FIT", "INFO") self._fit_result_ = self._minimizer_.minimize(loss=loss) Logger(self._fit_result_, 'RESULT') if self._fit_result_.hesse() == {}: if self._fit_result_.hesse(method='hesse_np') == {}: Logger('Impossible to compute hesse error', 'FATAL') self.__get_ratio_truncated() signal_fracs, _, _, signal_frac_errs, _, _ = self.__get_all_fracs() norm = self._total_pdf_norm_ if len(self._fracs_) == 0: self._rawyield_[0] = self._data_handler_.get_norm() self._rawyield_err_[0] = np.sqrt(self._rawyield_[0]) else: if self._extended_: for i_pdf, (_, frac, frac_err) in enumerate(zip(self._signal_pdfs_, signal_fracs, signal_frac_errs)): self._rawyield_[i_pdf] = self._total_pdf_.models[i_pdf].get_yield().value() # no background case: last signal fraction is 1 - sum(others) if not self.no_signal and\ self.no_background and\ i_pdf == len(self._signal_pdfs_) - 1: self._rawyield_err_[i_pdf] = np.sqrt( np.sum(np.square(signal_frac_errs)) + 2 * np.sum([ self.__get_frac_cov(self._fracs_[i], 'signal', self._fracs_[j]) for i in range(len(self._fracs_)-1) for j in range(i+1, len(self._fracs_)-1) ]) ) else: self._rawyield_err_[i_pdf] = self._rawyield_[i_pdf] * np.sqrt( (frac_err / frac)**2 + (self._fit_result_.hesse( params=self._total_yield_, method='hesse_np' )[self._total_yield_]['error'] / self._total_pdf_.get_yield().value())**2 + 2 * self.__get_frac_cov(self._fracs_[i_pdf], 'signal', self._total_yield_) / self._total_pdf_.get_yield().value() / frac ) else: for i_pdf, _ in enumerate(self._signal_pdfs_): if i_pdf in self._refl_idx_: continue self._rawyield_[i_pdf] = signal_fracs[i_pdf] * norm / self._ratio_truncated_ self._rawyield_err_[i_pdf] = signal_frac_errs[i_pdf] * norm / self._ratio_truncated_ return self._fit_result_Perform a mass fit with the zfit library
Parameters
do_prefit:bool- Flag to enable prefit to background only (supported for unbinned fits only) The expected signal regions must be provided.
**kwargs:dict-
Additional optional arguments:
-
prefit_excluded_regions: list List or list of lists with limits for the excluded regions
-
prefit_exclude_nsigma: Alternative argument to prefit_excluded_regions that excludes the regions around the signals, using the initialised mean and sigma parameters
-
Returns
fit_result:zfit.minimizers.fitresult.FitResult- The fit result
def plot_mass_fit(self, **kwargs)-
Expand source code
def plot_mass_fit(self, **kwargs): """ Plot the mass fit Parameters ------------------------------------------------- **kwargs: dict Additional optional arguments: - style: str style to be used (see https://github.com/scikit-hep/mplhep for more details) - logy: bool log scale in y axis - figsize: tuple size of the figure - axis_title: str x-axis title - show_extra_info: bool show mu, sigma, chi2/ndf, signal, bkg, signal/bkg, significance - extra_info_massnsigma: float number of sigmas for extra info - extra_info_massnhwhm: float number of hwhms for extra info (alternative to extra_info_massnsigma) - extra_info_massrange: list mass range limits for extra info (alternative to extra_info_massnsigma) - extra_info_loc: list location of extra info (one for chi2 and one for other info) - legend_loc: str location of the legend - num: int number of bins to plot pdfs converted into histograms Returns ------------------------------------------------- fig: matplotlib.figure.Figure figure containing the mass fit plot """ style = kwargs.get('style', 'LHCb2') logy = kwargs.get('logy', False) figsize = kwargs.get('figsize', (7, 7)) bins = self._data_handler_.get_nbins() axis_title = kwargs.get('axis_title', self._data_handler_.get_var_name()) show_extra_info = kwargs.get('show_extra_info', False) num = kwargs.get('num', 10000) mass_range = kwargs.get('extra_info_massrange', None) nhwhm = kwargs.get('extra_info_massnhwhm', None) nsigma = kwargs.get('extra_info_massnsigma', 3) info_loc = kwargs.get('extra_info_loc', ['lower right', 'upper left']) legend_loc = kwargs.get('legend_loc', 'best') mplhep.style.use(style) obs = self._data_handler_.get_obs() limits = self._data_handler_.get_limits() fig, axs = plt.subplots(figsize=figsize) hdata = self._data_handler_.to_hist(lower_edge=limits[0], upper_edge=limits[1], nbins=bins, varname=self._data_handler_.get_var_name()) hdata.plot(yerr=True, color='black', histtype='errorbar', label='data') bin_sigma = (limits[1] - limits[0]) / bins norm_total_pdf = self._total_pdf_norm_ * bin_sigma x_plot = np.linspace(limits[0], limits[1], num=num) total_func = zfit.run(self._total_pdf_.pdf(x_plot, norm_range=obs)) signal_funcs, bkg_funcs = ([] for _ in range(2)) if not self.no_signal: for signal_pdf in self._signal_pdfs_: signal_funcs.append(zfit.run(signal_pdf.pdf.pdf(x_plot, norm_range=obs))) if not self.no_background: for bkg_pdf in self._background_pdfs_: bkg_funcs.append(zfit.run(bkg_pdf.pdf.pdf(x_plot, norm_range=obs))) signal_fracs, bkg_fracs, refl_fracs, _, _, _ = self.__get_all_fracs() # first draw backgrounds for ibkg, (bkg_func, bkg_frac) in enumerate(zip(bkg_funcs, bkg_fracs)): plt.plot(x_plot, bkg_func * norm_total_pdf * bkg_frac / self._ratio_truncated_, color=self._bkg_cmap_(ibkg), ls='--', label=self._background_pdfs_[ibkg].label) # then draw signals for isgn, (signal_func, frac) in enumerate(zip(signal_funcs, signal_fracs+refl_fracs)): plt.plot(x_plot, signal_func * norm_total_pdf * frac / self._ratio_truncated_, color=self._sgn_cmap_(isgn)) plt.fill_between(x_plot, signal_func * norm_total_pdf * frac / self._ratio_truncated_, color=self._sgn_cmap_(isgn), alpha=0.5, label=self._signal_pdfs_[isgn].label) plt.plot(x_plot, total_func * norm_total_pdf, color='xkcd:blue', label='total fit') plt.xlim(limits[0], limits[1]) plt.xlabel(axis_title) plt.ylabel(rf'counts / {(limits[1]-limits[0])/bins*1000:0.1f} MeV/$c^2$') if logy: plt.yscale('log') plt.ylim(min(total_func) * norm_total_pdf / 5, max(total_func) * norm_total_pdf * 5) else: plt.ylim(0., max(total_func) * norm_total_pdf * 1.5) if show_extra_info: # signal and background info for all signals text = [] for idx, signal_pdf in enumerate(self._signal_pdfs_): mass, mass_unc = self.get_mass(idx) sigma, sigma_unc = None, None gamma, gamma_unc = None, None rawyield, rawyield_err = self.get_raw_yield(idx=idx) if signal_pdf.has_sigma(): sigma, sigma_unc = self.get_sigma(idx) if signal_pdf.kind in (PDFType.CAUCHY, PDFType.VOIGTIAN): gamma, gamma_unc = self.get_signal_parameter(idx, 'gamma') extra_info = fr'{signal_pdf.label}''\n' \ + fr' $\mu = {mass*1000:.1f}\pm{mass_unc*1000:.1f}$ MeV$/c^2$''\n' if sigma is not None: extra_info += fr' $\sigma = {sigma*1000:.1f}\pm{sigma_unc*1000:.1f}$ MeV$/c^2$''\n' if gamma is not None: extra_info += fr' $\Gamma/2 = {gamma*1000:.1f}\pm{gamma_unc*1000:.1f}$ MeV$/c^2$''\n' extra_info += fr' $S={rawyield:.0f} \pm {rawyield_err:.0f}$''\n' if not self.no_background: if mass_range is not None: interval = f'[{mass_range[0]:.3f}, {mass_range[1]:.3f}]' bkg, bkg_err = self.get_background(idx=idx, min=mass_range[0], max=mass_range[1]) s_over_b, s_over_b_err = self.get_signal_over_background(idx=idx, min=mass_range[0], max=mass_range[1]) signif, signif_err = self.get_significance(idx=idx, min=mass_range[0], max=mass_range[1]) extra_info += fr' $B({interval})={bkg:.0f} \pm {bkg_err:.0f}$''\n' extra_info += fr' $S/B({interval})={s_over_b:.2f} \pm {s_over_b_err:.2f}$''\n' extra_info += fr' Signif.$({interval})={signif:.1f} \pm {signif_err:.1f}$' elif nhwhm is not None: bkg, bkg_err = self.get_background(idx=idx, nhwhm=nhwhm) s_over_b, s_over_b_err = self.get_signal_over_background(idx=idx, nhwhm=nhwhm) signif, signif_err = self.get_significance(idx=idx, nhwhm=nhwhm) extra_info += fr' $B({nhwhm}~\mathrm{{HWHM}})=${bkg:.0f} $\pm$ {bkg_err:.0f}''\n' extra_info += fr' $S/B({nhwhm}~\mathrm{{HWHM}})=${s_over_b:.2f} $\pm$ {s_over_b_err:.2f}''\n' extra_info += fr' Signif.$({nhwhm}~\mathrm{{HWHM}})=${signif:.1f} $\pm$ {signif_err:.1f}' else: bkg, bkg_err = self.get_background(idx=idx, nsigma=nsigma) s_over_b, s_over_b_err = self.get_signal_over_background(idx=idx, nsigma=nsigma) signif, signif_err = self.get_significance(idx=idx, nsigma=nsigma) extra_info += fr' $B({nsigma}\sigma)=${bkg:.0f} $\pm$ {bkg_err:.0f}''\n' extra_info += fr' $S/B({nsigma}\sigma)=${s_over_b:.2f} $\pm$ {s_over_b_err:.2f}''\n' extra_info += fr' Signif.$({nsigma}\sigma)=${signif:.1f} $\pm$ {signif_err:.1f}' text.append(extra_info) concatenated_text = '\n'.join(text) anchored_text_signal = AnchoredText(concatenated_text, loc=info_loc[1], frameon=False) if not self._is_truncated_: # chi2 not yet available for truncated fits # info on chi2/ndf chi2 = self.get_chi2() ndf = self.get_ndf() anchored_text_chi2 = AnchoredText(fr'$\chi^2 / \mathrm{{ndf}} =${chi2:.2f} / {ndf}', loc=info_loc[0], frameon=False) axs.add_artist(anchored_text_chi2) axs.add_artist(anchored_text_signal) plt.legend(loc=legend_loc) Logger('plot_mass_fit now returns a tuple (fig, axs) !', 'WARNING') return fig, axsPlot the mass fit
Parameters
**kwargs:dict-
Additional optional arguments:
-
style: str style to be used (see https://github.com/scikit-hep/mplhep for more details)
-
logy: bool log scale in y axis
-
figsize: tuple size of the figure
-
axis_title: str x-axis title
-
show_extra_info: bool show mu, sigma, chi2/ndf, signal, bkg, signal/bkg, significance
-
extra_info_massnsigma: float number of sigmas for extra info
-
extra_info_massnhwhm: float number of hwhms for extra info (alternative to extra_info_massnsigma)
-
extra_info_massrange: list mass range limits for extra info (alternative to extra_info_massnsigma)
-
extra_info_loc: list location of extra info (one for chi2 and one for other info)
-
legend_loc: str location of the legend
-
num: int number of bins to plot pdfs converted into histograms
-
Returns
fig:matplotlib.figure.Figure- figure containing the mass fit plot
def plot_raw_residuals(self, **kwargs)-
Expand source code
def plot_raw_residuals(self, **kwargs): """ Plot the raw residuals Parameters ------------------------------------------------- **kwargs: dict Additional optional arguments: - style: str style to be used (see https://github.com/scikit-hep/mplhep for more details) - figsize: tuple size of the figure - axis_title: str x-axis title Returns ------------------------------------------------- fig: matplotlib.figure.Figure figure containing the raw residuals plot """ style = kwargs.get('style', 'LHCb2') figsize = kwargs.get('figsize', (7, 7)) axis_title = kwargs.get('axis_title', self._data_handler_.get_var_name()) mplhep.style.use(style) obs = self._data_handler_.get_obs() limits = self._data_handler_.get_limits() fig = plt.figure(figsize=figsize) if len(self._raw_residuals_) == 0: self.__get_raw_residuals() # draw residuals plt.errorbar( self._data_handler_.get_bin_center(), self._raw_residuals_, xerr=None, yerr=np.sqrt(self._raw_residual_variances_), linestyle="None", elinewidth=1, capsize=0, color="black", marker="o", markersize=5, label="residuals" ) bins = self._data_handler_.get_nbins() bin_sigma = (limits[1] - limits[0]) / bins norm = self._total_pdf_norm_ * bin_sigma x_plot = np.linspace(limits[0], limits[1], num=1000) signal_funcs, refl_funcs = ([] for _ in range(2)) for signal_pdf in self._signal_pdfs_: signal_funcs.append(zfit.run(signal_pdf.pdf.pdf(x_plot, norm_range=obs))) signal_fracs, _, refl_fracs, _, _, _ = self.__get_all_fracs() # draw signals for isgn, (signal_func, frac) in enumerate(zip(signal_funcs, signal_fracs)): plt.plot(x_plot, signal_func * norm * frac / self._ratio_truncated_, color=self._sgn_cmap_(isgn)) plt.fill_between(x_plot, signal_func * norm * frac / self._ratio_truncated_, color=self._sgn_cmap_(isgn), alpha=0.5, label=self._signal_pdfs_[isgn].label) # finally draw reflected signals (if any) is_there_refl = False for irefl, (refl_func, frac) in enumerate(zip(refl_funcs, refl_fracs)): if self._refl_pdfs_[irefl] is None: continue is_there_refl = True plt.plot(x_plot, refl_func * norm * frac / self._ratio_truncated_, color=self._refl_cmap_(irefl)) plt.fill_between(x_plot, refl_func * norm * frac / self._ratio_truncated_, color=self._refl_cmap_(irefl), alpha=0.5, label=f'reflected signal {irefl}') # draw signal + reflected signals (if any) if is_there_refl: for isgn, (signal_func, refl_func, frac_sgn, frac_refl) in enumerate( zip(signal_funcs, refl_funcs, signal_fracs, refl_fracs)): plt.plot(x_plot, (signal_func * frac_sgn + frac_refl * refl_func) * norm / self._ratio_truncated_, color='xkcd:blue', label='total - bkg') plt.xlim(limits[0], limits[1]) plt.xlabel(axis_title) plt.ylabel(rf'(data - fitted bkg) / {(limits[1]-limits[0])/bins*1000:0.1f} MeV/$c^2$') plt.legend(loc='best') return figPlot the raw residuals
Parameters
**kwargs:dict-
Additional optional arguments:
-
style: str style to be used (see https://github.com/scikit-hep/mplhep for more details)
-
figsize: tuple size of the figure
-
axis_title: str x-axis title
-
Returns
fig:matplotlib.figure.Figure- figure containing the raw residuals plot
def plot_std_residuals(self, **kwargs)-
Expand source code
def plot_std_residuals(self, **kwargs): """ Plot the raw residuals Parameters ------------------------------------------------- **kwargs: dict Additional optional arguments: - style: str style to be used (see https://github.com/scikit-hep/mplhep for more details) - figsize: tuple size of the figure - axis_title: str x-axis title Returns ------------------------------------------------- fig: matplotlib.figure.Figure figure containing the raw residuals plot """ style = kwargs.get('style', 'LHCb2') figsize = kwargs.get('figsize', (7, 7)) axis_title = kwargs.get('axis_title', self._data_handler_.get_var_name()) mplhep.style.use(style) limits = self._data_handler_.get_limits() bins = self._data_handler_.get_nbins() bin_center = self._data_handler_.get_bin_center() xerr_lower = [bin_center[i] - bin_center[i-1] for i in range(1, len(bin_center))] xerr_lower.insert(0, bin_center[1] - bin_center[0]) xerr_upper = [bin_center[i+1] - bin_center[i] for i in range(len(bin_center)-1)] xerr_upper.append(bin_center[-1] - bin_center[-2]) fig = plt.figure(figsize=figsize) if len(self._std_residuals_) == 0: self.__get_std_residuals() # draw residuals plt.errorbar(bin_center, self._std_residuals_, xerr=(xerr_lower, xerr_upper), yerr=None, linestyle="None", elinewidth=1, capsize=0, color="black", marker="o", markersize=5, label=None) # line at 0 plt.plot([bin_center[0], bin_center[-1]], [0., 0.], lw=2, color='xkcd:blue') # line at -3 sigma plt.plot([bin_center[0], bin_center[-1]], [-3., -3.], lw=2, color='xkcd:red') # line at 3 sigma plt.plot([bin_center[0], bin_center[-1]], [3., 3.], lw=2, color='xkcd:red') plt.xlim(limits[0], limits[1]) plt.xlabel(axis_title) plt.ylabel(fr"$\dfrac{{ \mathrm{{data}} - \mathrm{{total \ fit}} }}{{ \sigma_{{ \mathrm{{data}} }} }}$" fr"/ {(limits[1]-limits[0])/bins*1000:0.1f} MeV/$c^2$") return figPlot the raw residuals
Parameters
**kwargs:dict-
Additional optional arguments:
-
style: str style to be used (see https://github.com/scikit-hep/mplhep for more details)
-
figsize: tuple size of the figure
-
axis_title: str x-axis title
-
Returns
fig:matplotlib.figure.Figure- figure containing the raw residuals plot
def sample_pdf(self, num=1000)-
Expand source code
def sample_pdf(self, num=1000): """ Sample the total pdf Parameters ------------------------------------------------- num: int Number of points to sample Returns ------------------------------------------------- samples: numpy array Array of sampled points """ if not self._is_pdf_built_: self.__build_total_pdf() self.__build_total_pdf_binned() self._is_pdf_built_ = True sample = self._total_pdf_.sample(num).to_numpy() return sample.flatten()Sample the total pdf
Parameters
num:int- Number of points to sample
Returns
samples:numpy array- Array of sampled points
def set_background_initpar(self, idx, par_name, init_value, **kwargs)-
Expand source code
def set_background_initpar(self, idx, par_name, init_value, **kwargs): """ Set a background parameter Parameters ------------------------------------------------- idx: int Index of the background par_name: str The name of the parameter to be set init_value: float The value of parameter to be set **kwargs: dict Additional optional arguments: - limits: list minimum and maximum limits for the parameter - fix: bool fix the mass parameter """ self._background_pdfs_[idx].set_init_par(par_name, init_value) if 'limits' in kwargs: self._background_pdfs_[idx].set_limits_par(par_name, kwargs['limits']) if 'fix' in kwargs: self._background_pdfs_[idx].set_fix_par(par_name, kwargs['fix'])Set a background parameter
Parameters
idx:int- Index of the background
par_name:str- The name of the parameter to be set
init_value:float- The value of parameter to be set
**kwargs:dict-
Additional optional arguments:
-
limits: list minimum and maximum limits for the parameter
-
fix: bool fix the mass parameter
-
def set_background_kde(self, idx, sample, **kwargs)-
Expand source code
def set_background_kde(self, idx, sample, **kwargs): """ Set sample and options for background kde Parameters ------------------------------------------------- idx: int Index of the background sample: flarefly.DataHandler Data sample for Kernel Density Estimation **kwargs: dict Arguments for kde options. See https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/kde_api/zfit.pdf.KDE1DimGrid.html#zfit.pdf.KDE1DimGrid for more details """ self._background_pdfs_[idx].kde_sample = sample self._background_pdfs_[idx].kde_option = kwargsSet sample and options for background kde
Parameters
idx:int- Index of the background
sample:DataHandler- Data sample for Kernel Density Estimation
**kwargs:dict- Arguments for kde options. See https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/kde_api/zfit.pdf.KDE1DimGrid.html#zfit.pdf.KDE1DimGrid for more details
def set_background_template(self, idx, sample)-
Expand source code
def set_background_template(self, idx, sample): """ Set sample and options for background template histogram Parameters ------------------------------------------------- idx: int Index of the background sample: flarefly.DataHandler Data sample for template histogram """ edges_bkg = sample.get_bin_edges() edges_data = self._data_handler_.get_bin_edges() if len(edges_bkg) != len(edges_data): Logger(f'The data and the background template {idx} have different number of bins:' f' \n -> background template: {len(edges_bkg)-1}, data -> {len(edges_data)-1}', 'FATAL') if not np.allclose(edges_bkg, edges_data): Logger(f'The data and the background template {idx} have different bin edges:' f' \n -> background template: {edges_bkg}, data -> {edges_data}', 'FATAL') self._background_pdfs_[idx].hist_sample = sampleSet sample and options for background template histogram
Parameters
idx:int- Index of the background
sample:DataHandler- Data sample for template histogram
def set_particle_mass(self, idx, **kwargs)-
Expand source code
def set_particle_mass(self, idx, **kwargs): """ Set the particle mass Parameters ------------------------------------------------- idx: int Index of the signal **kwargs: dict Additional optional arguments: - mass: float The mass of the particle - pdg_id: int PDG ID of the particle (alternative to mass) - pdg_name: str Name of the particle (alternative to mass) - limits: list minimum and maximum limits for the mass parameter - fix: bool fix the mass parameter """ mass_name = 'm' if self._signal_pdfs_[idx].uses_m_not_mu() else 'mu' mass = 0. if 'mass' in kwargs: mass = kwargs['mass'] elif 'pdg_id' in kwargs: mass = self.pdg_api.get_particle_by_mcid(kwargs['pdg_id']).mass elif 'pdg_name' in kwargs: mass = self.pdg_api.get_particle_by_name(kwargs['pdg_name']).mass else: Logger(f'"mass", "pdg_id", and "pdg_name" not provided, mass value for signal {idx} will not be set', 'ERROR') self._signal_pdfs_[idx].set_init_par(mass_name, mass) if 'limits' in kwargs: self._signal_pdfs_[idx].set_limits_par(mass_name, kwargs['limits']) if 'fix' in kwargs: self._signal_pdfs_[idx].set_fix_par(mass_name, kwargs['fix'])Set the particle mass
Parameters
idx:int- Index of the signal
**kwargs:dict-
Additional optional arguments:
-
mass: float The mass of the particle
-
pdg_id: int PDG ID of the particle (alternative to mass)
-
pdg_name: str Name of the particle (alternative to mass)
-
limits: list minimum and maximum limits for the mass parameter
-
fix: bool fix the mass parameter
-
def set_reflection_kde(self, idx, sample, r_over_s, **kwargs)-
Expand source code
def set_reflection_kde(self, idx, sample, r_over_s, **kwargs): """ Set sample and options for reflected signal kde Parameters ------------------------------------------------- idx: int Index of the signal sample: flarefly.DataHandler Data sample for Kernel Density Estimation r_over_s: float R/S ratio **kwargs: dict Arguments for kde options. See https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/kde_api/zfit.pdf.KDE1DimGrid.html#zfit.pdf.KDE1DimGrid for more details """ self._signal_pdfs_[self._refl_idx_[idx]].kde_sample = sample self._signal_pdfs_[self._refl_idx_[idx]].kde_option = kwargs self.fix_signal_frac_to_signal_pdf(idx, self._refl_idx_[idx], factor=r_over_s)Set sample and options for reflected signal kde
Parameters
idx:int- Index of the signal
sample:DataHandler- Data sample for Kernel Density Estimation
r_over_s:float- R/S ratio
**kwargs:dict- Arguments for kde options. See https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/kde_api/zfit.pdf.KDE1DimGrid.html#zfit.pdf.KDE1DimGrid for more details
def set_reflection_template(self, idx, sample, r_over_s)-
Expand source code
def set_reflection_template(self, idx, sample, r_over_s): """ Set sample and options for reflected signal template Parameters ------------------------------------------------- idx: int Index of the reflected signal sample: flarefly.DataHandler Data sample for histogram template r_over_s: float R/S ratio """ edges_refl = sample.get_bin_edges() edges_data = self._data_handler_.get_bin_edges() if len(edges_refl) != len(edges_data): Logger(f'The data and the reflection template {idx} have different number of bins:' f' \n -> reflection template: {len(edges_refl)-1}, data -> {len(edges_data)-1}', 'FATAL') if not np.allclose(edges_refl, edges_data): Logger(f'The data and the reflection template {idx} have different bin edges:' f' \n -> reflection template: {edges_refl}, data -> {edges_data}', 'FATAL') self._signal_pdfs_[self._refl_idx_[idx]].hist_sample = sample self.fix_signal_frac_to_signal_pdf(self._refl_idx_[idx], idx, factor=r_over_s)Set sample and options for reflected signal template
Parameters
idx:int- Index of the reflected signal
sample:DataHandler- Data sample for histogram template
r_over_s:float- R/S ratio
def set_signal_initpar(self, idx, par_name, init_value, **kwargs)-
Expand source code
def set_signal_initpar(self, idx, par_name, init_value, **kwargs): """ Set a signal parameter Parameters ------------------------------------------------- idx: int Index of the signal par_name: str The name of the parameter to be set init_value: float The value of parameter to be set **kwargs: dict Additional optional arguments: - limits: list minimum and maximum limits for the parameter - fix: bool fix the parameter to init_value """ if par_name == 'gamma': Logger('The gamma parameter that you are setting is half of the width of a resonance,' ' for more info check the Cauchy pdf defined here ' 'https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/basic/zfit.pdf.Cauchy.html', 'WARNING') self._signal_pdfs_[idx].set_init_par(par_name, init_value) if 'limits' in kwargs: self._signal_pdfs_[idx].set_limits_par(par_name, kwargs['limits']) if 'fix' in kwargs: self._signal_pdfs_[idx].set_fix_par(par_name, kwargs['fix'])Set a signal parameter
Parameters
idx:int- Index of the signal
par_name:str- The name of the parameter to be set
init_value:float- The value of parameter to be set
**kwargs:dict-
Additional optional arguments:
-
limits: list minimum and maximum limits for the parameter
-
fix: bool fix the parameter to init_value
-
def set_signal_kde(self, idx, sample, **kwargs)-
Expand source code
def set_signal_kde(self, idx, sample, **kwargs): """ Set sample and options for signal kde Parameters ------------------------------------------------- idx: int Index of the signal sample: flarefly.DataHandler Data sample for Kernel Density Estimation **kwargs: dict Arguments for kde options. See https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/kde_api/zfit.pdf.KDE1DimGrid.html#zfit.pdf.KDE1DimGrid for more details """ self._signal_pdfs_[idx].kde_sample = sample self._signal_pdfs_[idx].kde_option = kwargsSet sample and options for signal kde
Parameters
idx:int- Index of the signal
sample:DataHandler- Data sample for Kernel Density Estimation
**kwargs:dict- Arguments for kde options. See https://zfit.readthedocs.io/en/latest/user_api/pdf/_generated/kde_api/zfit.pdf.KDE1DimGrid.html#zfit.pdf.KDE1DimGrid for more details
def set_signal_template(self, idx, sample)-
Expand source code
def set_signal_template(self, idx, sample): """ Set sample and options for signal template Parameters ------------------------------------------------- idx: int Index of the signal sample: flarefly.DataHandler Data sample for histogram template """ edges_sgn = sample.get_bin_edges() edges_data = self._data_handler_.get_bin_edges() if len(edges_sgn) != len(edges_data): Logger(f'The data and the signal template {idx} have different number of bins:' f' \n -> signal template: {len(edges_sgn)-1}, data -> {len(edges_data)-1}', 'FATAL') if not np.allclose(edges_sgn, edges_data): Logger(f'The data and the signal template {idx} have different bin edges:' f' \n -> signal template: {edges_sgn}, data -> {edges_data}', 'FATAL') self._signal_pdfs_[idx].hist_sample = sampleSet sample and options for signal template
Parameters
idx:int- Index of the signal
sample:DataHandler- Data sample for histogram template