"""SmartDrift module"""
import copy
import io
import pickle
import shutil
import tempfile
from datetime import date
from pathlib import Path
from typing import Any
import catboost
import numpy as np
import pandas as pd
from shapash.explainer.smart_explainer import SmartExplainer
from sklearn.metrics import roc_auc_score
from eurybia.core.dataset_analysis import DatasetAnalysis
from eurybia.core.smartplotter import SmartPlotter
from eurybia.report.generation import execute_report
from eurybia.style.style_utils import colors_loading, select_palette
from eurybia.utils.io import load_pickle, save_pickle
from eurybia.utils.model_drift import catboost_hyperparameter_init, catboost_hyperparameter_type
from eurybia.utils.statistical_tests import chisq_test, compute_js_divergence, ksmirnov_test
from eurybia.utils.utils import base_100, cat_features_indices, train_test_split_concat
[docs]class SmartDrift:
"""The SmartDrift class is the main object to compute drift in the Eurybia library
It allows to calculate data drift between 2 datasets using a data drift classification model
Attributes:
----------
df_current: pandas.DataFrame
current (or production) dataset which is compared to df_baseline
df_baseline: pandas.DataFrame
baseline (or learning) dataset which is compared to df_current
datadrift_classifier: model object
model used for binary classification of data drift
xpl: Shapash object
object used to compute explainability on datadrift_classifier
df_predict: pandas.DataFrame
computed score on both datasets if a deployed_model is specified
feature_importance: pandas.DataFrame
feature importance of datadrift_classifier and feature importance of production model if exist
pb_cols: dict
Dictionnary that references columns differences between df_current and df_baseline
err_mods: dict
Dictionnary that references modalities differences in columns between df_current and df_baseline
auc: int
Value auc of model drift
historical_auc: pandas.DataFrame
Dataframe that contains auc history of datadrift_classifier over time
data_modeldrift: pandas.DataFrame
Dataframe that contains performance history of deployed_model
ignore_cols: list
list of feature to ignore in compute
dataset_names : dict, (Optional)
Dictionnary used to specify dataset names to display in report.
df_concat : pandas.DataFrame
Dataframe that's composed of both df_baseline and df_current concatenated
plot : eurybia.core.smartplotter.SmartPlotter
Instance of an Eurybia SmartPlotter class. It's used for graph displaying purpose.
deployed_model: model object, optional
model in production used to put in perspective drift and to predict
encoding: preprocessing object, optional (default: None)
Preprocessing used before the training step
datadrift_stat_test : dict
Datadrift statistical tests for each feature.
Each test identifies whether the feature has drifted.
There are 2 types of test implemented depending on the type of feature:
- Chi-square for discrete variables - ref:
(https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.chi2_contingency.html)
- Kolmogorov-Smirnov for continuous variables - ref:
(https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.kstest.html)
This datadrift_stat_test attribute specifies for each feature the test performed,
the statistic the test and the p value
palette_name : str (default: 'eurybia')
Name of the palette used for the colors of the report (refer to style folder).
colors_dict: dict
Dict of the colors used in the different plots
js_divergence : float
Jensen-Shannon divergence of probability distributions - ref:
(https://en.wikipedia.org/wiki/Jensen%E2%80%93Shannon_divergence)
How to declare a new SmartDrift object?
Example:
-------
>>> SD = Smartdrift(df_current=df_production, df_baseline=df_learning)
"""
@classmethod
def load(cls, path):
"""The load() class method allows Eurybia users to use a pickled SmartDrift.
Parameters
----------
path : str
File path of the pickle file.
Returns
-------
SmartDrift
SmartDrift instance loaded with the pickle given as "path"
Example
--------
>>> from eurybia import SmartDrift
>>> SmartDrift.load('path_to_pkl/smardrift.pkl')
"""
dict_to_load = load_pickle(path)
if isinstance(dict_to_load, dict):
df_current = dict_to_load["_df_current"]
df_baseline = dict_to_load["_df_baseline"]
sd = cls(df_current, df_baseline)
for attr, val in dict_to_load.items():
if isinstance(val, io.BytesIO):
val.seek(0)
setattr(sd, attr, pickle.load(val))
elif attr == "_xpl":
xpl = SmartExplainer(model=val["model"])
xpl.__dict__.update(val)
setattr(sd, attr, xpl)
elif attr == "_da":
da = DatasetAnalysis(df_current, df_baseline)
da.__dict__.update(val)
setattr(sd, attr, da)
else:
setattr(sd, attr, val)
else:
raise ValueError("pickle file must contain dictionary")
return sd
# FIXME: we should explicitly declare the type of supported deployed_model and encoding
def __init__(
self,
df_current: pd.DataFrame,
df_baseline: pd.DataFrame,
dataset_names: tuple[str, str] = ("Current", "Baseline"),
deployed_model: Any | None = None,
encoding: Any = None,
palette_name: str = "eurybia",
colors_dict: dict | None = None,
):
"""Parameters
----------
df_current: pandas.DataFrame
current (or production) dataset which is compared to df_baseline
df_baseline: pandas.DataFrame
baseline (or learning) dataset which is compared to df_current
dataset_names : tuple, (Optional)
Tuple used to specify dataset names to display in report (df_current_name, df_baseline_name).
deployed_model: model object, optional
model in production used to put in perspective drift and to predict
encoding: preprocessing object, optional (default: None)
Preprocessing used before the training step
palette_name : str (default: 'eurybia')
Name of the palette used for the colors of the report (refer to style folder).
colors_dict: dict
Dict of the colors used in the different plots
How to declare a new SmartDrift object ?
Example:
-------
>>> SD = Smartdrift(df_current=df_production, df_baseline=df_learning)
"""
self._df_current = df_current
self._df_baseline = df_baseline
self._dataset_names = dataset_names
# model drift
self._deployed_model = deployed_model
self._encoding = encoding
# report
self.palette_name = palette_name
self.colors_dict = copy.deepcopy(select_palette(colors_loading(), self.palette_name))
if colors_dict is not None:
self.colors_dict.update(colors_dict)
# data drift
self._da: DatasetAnalysis
self._xpl: SmartExplainer
self._df_predict: pd.DataFrame
self._feature_importance: pd.DataFrame
self._auc: float
self._js_divergence: float
self._historical_auc: pd.DataFrame
self._data_modeldrift: pd.DataFrame
self._datadrift_stat_test: pd.DataFrame # smartplotter
self._datadrift_target: str = "target" # constant
self._plot = SmartPlotter(self)
self._plot.define_style_attributes(colors_dict=self.colors_dict)
self._modalities_analysis: bool = False
[docs] def compile(
self,
full_validation: bool = False,
ignore_cols: list[str] | None = None,
sampling: bool = True,
sample_size: int = 100000,
datadrift_file: str | None = None,
date_compile_auc: date | None = None,
hyperparameter: dict | None = None,
attr_importance: str = "feature_importances_",
):
r"""The compile method is the first step to compute data drift.
It allows to calculate data drift between 2 datasets using a data drift classification model.
Most of the parameters are optional but helps to adapt the data drift calculation if necessary.
This step can last a few moments with large datasets.
Parameters
----------
full_validation: bool, optional (default: False)
If True, analyze consistency on modalities between columns
ignore_cols: list, optional
list of feature to ignore in compute
sampling: bool, optional
If True, applies the sampling
sample_size: int, optional
the size of the sample to build
date_compile_auc: date (optional)
used to specify date of compute drift, useful when compute few time drift
for different time at the same moment
hyperparameter: dict, optional
if user want to modify catboost hyperparameter
attr_importance: string, optional (default: "feature_importances\_")
Attribute "feature_importance" of the deployed_model
datadrift_file : str, optional
Name of the csv file that contains the performance history of data drift. If no datadrift file is given,
the drift will not be logged
Examples
--------
>>> SD.compile()
"""
# Checking datasets
ignored_cols_set = set(ignore_cols) if ignore_cols is not None else set()
baseline_cols = [col for col in self._df_baseline.columns if col not in ignored_cols_set]
current_cols = [col for col in self._df_current.columns if col not in ignored_cols_set]
if self.datadrift_target in baseline_cols or self.datadrift_target in current_cols:
raise ValueError(
f"Your dataframes contain a column named {self.datadrift_target}. Please consider renaming it."
)
self._modalities_analysis = full_validation
if hyperparameter is not None:
for key, value in catboost_hyperparameter_init.items():
catboost_hyperparameter_init[key] = (
hyperparameter[key]
if key in hyperparameter and str(type(hyperparameter[key])) in catboost_hyperparameter_type[key]
else value
)
hyperparameter = catboost_hyperparameter_init.copy()
da_sample_size = sample_size if sampling else None
self.da = DatasetAnalysis(
df_baseline=self._df_baseline,
df_test=self._df_current,
sample_size=da_sample_size,
ignored_cols=ignore_cols,
)
# Checking datasets
if (len(self.da.datetime_cols) > 0) and (self.deployed_model is not None):
raise TypeError("Your datasets have a datetime column. You should drop it")
self.df_current, self.df_baseline = self.da.clean_datasets()
train, test = train_test_split_concat(
self.df_baseline, self.df_current, target_col=self.datadrift_target, test_size=0.25, random_state=42
)
self._df_concat = pd.concat([train, test]).reset_index(drop=True)
feature_columns = [col for col in train.columns if col != self.datadrift_target]
indice_cat = cat_features_indices(train[feature_columns])
train_pool_cat = catboost.Pool(
data=train[feature_columns], label=train[self.datadrift_target].astype(int), cat_features=indice_cat
)
test_pool_cat = catboost.Pool(
data=test[feature_columns], label=test[self.datadrift_target].astype(int), cat_features=indice_cat
)
datadrift_classifier = catboost.CatBoostClassifier(
max_depth=hyperparameter["max_depth"],
l2_leaf_reg=hyperparameter["l2_leaf_reg"],
learning_rate=hyperparameter["learning_rate"],
iterations=hyperparameter["iterations"],
use_best_model=hyperparameter["use_best_model"],
custom_loss=hyperparameter["custom_loss"],
loss_function=hyperparameter["loss_function"],
eval_metric=hyperparameter["eval_metric"],
task_type="CPU",
allow_writing_files=False,
)
datadrift_classifier = datadrift_classifier.fit(
train_pool_cat,
eval_set=test_pool_cat,
silent=True,
early_stopping_rounds=hyperparameter["early_stopping_rounds"],
)
train_logloss = datadrift_classifier.eval_metrics(train_pool_cat, "Logloss")
best_iter_train = np.argmin(train_logloss["Logloss"]) + 1
if best_iter_train < datadrift_classifier.tree_count_:
datadrift_classifier.shrink(ntree_start=0, ntree_end=best_iter_train)
self.xpl = SmartExplainer(
label_dict={0: self.baseline_dataset_name, 1: self.current_dataset_name}, model=datadrift_classifier
)
x_test = test[feature_columns]
y_test = test[self.datadrift_target]
self.xpl.compile(x=x_test, y_target=y_test)
self.xpl.compute_features_import(force=True)
self.xpl.define_style(colors_dict=self.colors_dict)
self.datadrift_classifier = datadrift_classifier
if self.deployed_model:
self.df_predict = self._predict(deployed_model=self.deployed_model, encoding=self.encoding)
self.auc = roc_auc_score(y_test, datadrift_classifier.predict_proba(x_test)[:, 1])
if self.deployed_model:
self.feature_importance = self._compute_feature_importance(
deployed_model=self.deployed_model, attr_importance=attr_importance
)
if self.deployed_model is not None:
self.js_divergence = compute_js_divergence(
self.df_predict.loc[lambda df: df["dataset"] == self.baseline_dataset_name, :]["Score"].values,
self.df_predict.loc[lambda df: df["dataset"] == self.current_dataset_name, :]["Score"].values,
n_bins=20,
)
if datadrift_file is not None:
self.historical_auc = self._histo_datadrift_metric(
datadrift_file=datadrift_file,
date_compile_auc=date_compile_auc,
)
if self.deployed_model is not None:
self.datadrift_stat_test = self._compute_datadrift_stat_test()
[docs] def generate_report(
self,
output_file: str,
project_info_file: str | None = None,
title_story: str = "Drift Report",
title_description: str = "",
working_dir: str | None = None,
):
"""This method will generate an HTML report containing different information about the project.
It allows the information compiled to be rendered.
It can be associated with a project info yml file on which can figure different information about the project.
Parameters
----------
output_file : str
Path to the HTML file to write
project_info_file : str
Path to the file used to display some information about the project in the report
title_story : str, optional
Report title
title_description : str, optional
Report title description (as written just below the title)
working_dir : str, optional
Working directory in which will be generated the notebook used to create the report and
where the objects used to execute it will
be saved. This parameter can be usefull if one wants to create its own custom report and
debug the notebook used to generate the html report. If None, a temporary directory will be used
Examples
--------
>>> SD.generate_report(
output_file='report.html',
project_info_file='project_info.yml',
title_story="Drift project report",
title_description="This document is a drift report of the score in production"
)
"""
rm_working_dir = False
if not working_dir:
working_dir = tempfile.mkdtemp()
rm_working_dir = True
try:
execute_report(
project_info_file=project_info_file,
explainer=self.xpl,
smartdrift=self,
config_report=dict(title_story=title_story, title_description=title_description),
output_file=output_file,
modalities_analysis=self._modalities_analysis,
)
finally:
if rm_working_dir:
shutil.rmtree(working_dir)
def _predict(self, deployed_model: Any, encoding: Any = None) -> pd.DataFrame:
"""Create an attributes df_predict with the computed score on both datasets
Parameters
----------
deployed_model : model object, optional (default: None)
model in production used to put in perspective drift and to predict
encoding : preprocessing object, optional (default: None)
Preprocessing used before the training step
Returns
-------
pandas.DataFrame, None
DataFrame with predicted score for both datasets
"""
if not hasattr(deployed_model, "predict_proba") and not hasattr(deployed_model, "predict"):
raise Exception("deployed_model need to have predict or predict_proba method")
df_baseline = self.df_baseline
df_current = self.df_current
if encoding is not None:
try:
df_baseline = encoding.transform(df_baseline)
df_current = encoding.transform(df_current)
except BaseException as error:
raise Exception(
"""
Encoding specified can't be applied directly on df_current/df_baseline
- Error :
"""
+ str(error)
) from error
if hasattr(deployed_model, "predict_proba"):
try:
df_baseline_pred = pd.DataFrame(deployed_model.predict_proba(df_baseline)[:, 1], columns=["Score"])
df_current_pred = pd.DataFrame(deployed_model.predict_proba(df_current)[:, 1], columns=["Score"])
except BaseException as error:
raise Exception(
"""
Encoding specified or deployed_model used can't be applied directly on df_current/df_baseline
- Error :
"""
+ str(error)
) from error
else:
try:
df_baseline_pred = pd.DataFrame(deployed_model.predict(df_baseline), columns=["Score"])
df_current_pred = pd.DataFrame(deployed_model.predict(df_current), columns=["Score"])
except BaseException as error:
raise Exception(
"""
Encoding specified or deployed_model used can't be applied directly on df_current/df_baseline
- Error :
"""
+ str(error)
) from error
return pd.concat(
[
df_baseline_pred.assign(dataset=self.baseline_dataset_name),
df_current_pred.assign(dataset=self.current_dataset_name),
]
).reset_index(drop=True)
def _compute_feature_importance(
self, deployed_model: Any, attr_importance: str = "feature_importances_"
) -> pd.DataFrame:
"""Create an attributes feature_importance with the computed score on both datasets
Parameters
----------
deployed_model : model object, optional (default: None)
model in production used to put in perspective drift and to predict
attr_importance : string, optional (default: "feature_importances_")
Attribute "feature_importance" of the deployed_model
Returns
-------
pandas.DataFrame, None
DataFrame with feature importance from production model
and drift model.
"""
try:
array_importance = getattr(deployed_model, attr_importance)
except BaseException as error:
raise Exception(
"""
deployed_model used can't allow to get features importance on df_baseline
- Error :
"""
+ str(error)
) from error
feature_importance_drift = pd.DataFrame(
self.xpl.features_imp[0].values, index=self.xpl.features_imp[0].index, columns=["datadrift_classifier"]
)
var_baseline = [c for c in self.df_baseline.columns if c not in ["target"]]
if len(array_importance) != len(var_baseline):
raise ValueError(
"""
Number of features in df_baseline doesn't match feature importance's shape returned by deployed model.
"""
)
feature_importance_model_prod = pd.DataFrame(
array_importance, index=self.df_baseline[var_baseline].columns, columns=["deployed_model"]
)
feature_importance = feature_importance_model_prod.merge(
feature_importance_drift, how="left", left_index=True, right_index=True
).reset_index()
feature_importance = feature_importance.rename(columns={"index": "feature"})
feature_importance["deployed_model"] = base_100(feature_importance["deployed_model"])
return feature_importance
def _sampling(self, sampling: bool, sample_size: int, dataset: pd.DataFrame):
"""Return a sampling from the original dataframe
Parameters
----------
sampling : bool
If True, applies the sampling
sample_size : int
the size of the sample to build
df : pd.DataFrame
The Dataframe to apply sampling
Returns
-------
pandas.DataFrame
a sample of the original DataFrame or the original DataFrame
"""
if sampling:
if dataset.shape[0] > sample_size:
return dataset.sample(sample_size)
else:
return dataset
else:
return dataset
def _histo_datadrift_metric(self, datadrift_file: str, date_compile_auc: date | None = None):
"""Method which computes datadrift metrics (AUC, and Jensen Shannon prediction divergence if the deployed_model
is filled in) and append it into a dataframe that will be exported during the generate_report method
Parameters
----------
datadrift_file : str, (optional)
date_compile_auc: str (optional)
format dd/mm/yyyy use for specify date of compute drift, useful when compute few time drift
for different time at the same moment
Returns
-------
pandas.DataFrame or None
Dataframe with dates, AUC and Jensen Shannon prediction divergence computed at this date
"""
if date_compile_auc is None:
date_compile_auc = date.today()
s_date_compile_auc = date_compile_auc.strftime("%Y-%m-%d")
print(f"The computed AUC on the X_test used to build datadrift_classifier is equal to: {self.auc}")
df_auc = (
pd.DataFrame({"date": [s_date_compile_auc], "auc": [self.auc], "JS_predict": [self.js_divergence]})
if self.deployed_model is not None
else pd.DataFrame({"date": [s_date_compile_auc], "auc": [self.auc]})
)
if datadrift_file is not None:
if Path(datadrift_file).is_file() and datadrift_file.endswith(".csv"):
histo_auc = pd.read_csv(datadrift_file).reset_index(drop=True)
if self.deployed_model is not None:
if not (
any(histo_auc.columns.isin(["date"]))
and any(histo_auc.columns.isin(["auc"]))
and any(histo_auc.columns.isin(["JS_predict"]))
):
raise Exception("The csv data must have columns 'date', 'auc' and 'JS_predict'")
df_auc = pd.concat([histo_auc[["date", "auc", "JS_predict"]], df_auc]).reset_index(drop=True)
else:
if not (any(histo_auc.columns.isin(["date"])) and any(histo_auc.columns.isin(["auc"]))):
raise Exception("The csv data must have columns 'date' and 'auc'")
df_auc = pd.concat([histo_auc[["date", "auc"]], df_auc]).reset_index(drop=True)
else:
print(f"{datadrift_file} did not exist and was created. ")
try:
df_auc.to_csv(datadrift_file)
except OSError as error:
raise OSError("Can't save to csv the AUC metrics, error : " + str(error)) from error
return df_auc
[docs] def add_data_modeldrift(
self,
dataset: pd.DataFrame,
metric: str = "performance",
reference_columns: list[str] | None = None,
year_col: str = "annee",
month_col: str = "mois",
):
"""When method drift is specified, It will display in the report
the several plots from a dataframe to analyse drift model from the deployed model.
Each plot will represent one possible computed metric according
to its groups. (grouped by date(year-month), reference_columns).
Parameters
----------
df : pd.DataFrame
The Dataframe with all the computed metrics.
metric: str, (default: 'performance')
The column name of the metric computed
reference_columns: list, (default: [])
the column names to use for aggregation with the Date computed
year_col: str, (default: 'annee')
The column name of the year where the metric has been computed
month_col: str, (default: 'mois')
The column name of the month where the metric has been computed
"""
if reference_columns is None:
reference_columns = []
try:
df_modeldrift = dataset.copy()
df_modeldrift[month_col] = df_modeldrift[month_col].apply(lambda row: str(row).split(".")[0])
df_modeldrift["Date"] = (
"01/" + df_modeldrift[month_col] + "/" + df_modeldrift[year_col].astype("int64").astype(str)
)
df_modeldrift["Date"] = pd.to_datetime(df_modeldrift["Date"], format="%d/%m/%Y")
df_aggregate = pd.DataFrame(
df_modeldrift.groupby(["Date"] + reference_columns)[metric].mean()
).reset_index()
df_aggregate["Date"] = pd.to_datetime(df_aggregate["Date"]).dt.strftime("%d/%m/%Y")
self.data_modeldrift = df_aggregate
except BaseException as error:
raise Exception(
"""
The df specified in the method doesn't allow us to aggregate it for the report.
- Error -
"""
+ str(error)
) from error
def _compute_datadrift_stat_test(self, max_size: int = 50000, categ_max: int = 20):
"""Calculates all statistical tests to analyze the drift of each feature
Parameters
----------
max_size : int
Sets the maximum number of rows. If the datasets are larger there is sampling
categ_max: int
Maximum number of values per feature to apply the chi square test
Returns :
-------
dict :
keys - features
values - dict containing testname, statistic, pvalue
"""
# sampling
baseline = self.df_baseline.sample(n=max_size) if self.df_baseline.shape[0] > max_size else self.df_baseline
current = self.df_current.sample(n=max_size) if self.df_current.shape[0] > max_size else self.df_current
test_results = {}
# compute test for each feature
for features, count in self.xpl.features_desc.items():
try:
if current[features].dtypes.kind == "O" and count <= categ_max:
test = chisq_test(current[features].to_numpy(), baseline[features].to_numpy())
else:
test = ksmirnov_test(current[features].to_numpy(), baseline[features].to_numpy())
except BaseException as error:
raise Exception(
f"""
There is a problem with the format of {str(features)} column between the two datasets.
Error:
"""
+ str(error)
) from error
test_results[features] = test
return pd.DataFrame.from_dict(test_results, orient="index")
def define_style(self, palette_name: str | None = None, colors_dict: dict | None = None):
"""The define_style function is a function that uses a palette or a dict
to define the different styles used in the different outputs of Eurybia
Parameters
----------
palette_name : str (default: 'eurybia')
Name of the palette used for the colors of the report (refer to style folder).
colors_dict: dict
Dict of the colors used in the different plots
"""
if palette_name is None and colors_dict is None:
raise ValueError("At least one of palette_name or colors_dict parameters must be defined")
new_palette_name = palette_name or self.palette_name
new_colors_dict = copy.deepcopy(select_palette(colors_loading(), new_palette_name))
if colors_dict is not None:
new_colors_dict.update(colors_dict)
self.colors_dict.update(new_colors_dict)
self.plot.define_style_attributes(colors_dict=self.colors_dict)
self.xpl.define_style(colors_dict=self.colors_dict)
def save(self, path: str):
"""Save method allows user to save SmartDrift object on disk
using a pickle file.
Save method can be useful: you don't have to recompile to display
results later
Parameters
----------
path : str
File path to store the pickle file
Example
--------
>>> smartdrift.save('path_to_pkl/smartdrift.pkl')
"""
dict_to_save = {}
for att in self.__dict__.keys():
if isinstance(getattr(self, att), (list, dict, pd.DataFrame, pd.Series, type(None), bool, float)):
dict_to_save.update({att: getattr(self, att)})
elif isinstance(getattr(self, att), SmartExplainer):
smartexplainer_dict = {}
for att_xpl in self.xpl.__dict__.keys():
if isinstance(
getattr(self.xpl, att_xpl), (list, dict, pd.DataFrame, pd.Series, type(None), bool)
) or att_xpl in ["model", "preprocessing", "postprocessing"]:
smartexplainer_dict.update({att_xpl: getattr(self.xpl, att_xpl)})
dict_to_save.update({att: smartexplainer_dict})
elif isinstance(getattr(self, att), DatasetAnalysis):
da_dict = {}
for att_da in self.da.__dict__.keys():
if isinstance(
getattr(self.da, att_da), (list, dict, pd.DataFrame, pd.Series, type(None), bool)
) or att_da in ["model", "preprocessing", "postprocessing"]:
da_dict.update({att_da: getattr(self.da, att_da)})
dict_to_save.update({att: da_dict})
save_pickle(dict_to_save, path)
@property
def df_current(self) -> pd.DataFrame:
"""Getter"""
return self._df_current
@df_current.setter
def df_current(self, val: pd.DataFrame) -> None:
"""Setter"""
if not isinstance(val, pd.DataFrame):
raise ValueError("df_current must be a pandas DataFrame")
self._df_current = val
@property
def df_baseline(self) -> pd.DataFrame:
"""Getter"""
return self._df_baseline
@df_baseline.setter
def df_baseline(self, val: pd.DataFrame) -> None:
"""Setter"""
if not isinstance(val, pd.DataFrame):
raise ValueError("df_baseline must be a pandas DataFrame")
self._df_baseline = val
@property
def xpl(self) -> SmartExplainer:
"""Getter"""
if not hasattr(self, "_xpl"):
raise RuntimeError("SmartExplainer has not been initialized yet.")
return self._xpl
@xpl.setter
def xpl(self, val: SmartExplainer) -> None:
"""Setter"""
if not isinstance(val, SmartExplainer):
raise ValueError("xpl must be a SmartExplainer instance.")
self._xpl = val
@property
def df_predict(self) -> pd.DataFrame:
"""Getter"""
if not hasattr(self, "_df_predict"):
raise RuntimeError("df_predict has not been initialized yet.")
return self._df_predict
@df_predict.setter
def df_predict(self, val: pd.DataFrame) -> None:
"""Setter"""
if not isinstance(val, pd.DataFrame):
raise ValueError("df_predict must be a pandas DataFrame.")
self._df_predict = val
@property
def feature_importance(self) -> pd.DataFrame:
"""Getter"""
if not hasattr(self, "_feature_importance"):
raise RuntimeError("feature_importance has not been initialized yet.")
return self._feature_importance
@feature_importance.setter
def feature_importance(self, val: pd.DataFrame) -> None:
"""Setter"""
if not isinstance(val, pd.DataFrame):
raise ValueError("feature_importance must be a pandas DataFrame.")
self._feature_importance = val
@property
def auc(self) -> float:
"""Getter"""
if not hasattr(self, "_auc"):
raise RuntimeError("auc has not been initialized yet.")
return self._auc
@auc.setter
def auc(self, val: float) -> None:
"""Setter"""
if not isinstance(val, (float, int)):
raise ValueError("auc must be of type float or int.")
self._auc = float(val)
@property
def js_divergence(self) -> float:
"""Getter"""
if not hasattr(self, "_js_divergence"):
raise RuntimeError("js_divergence has not been initialized yet.")
return self._js_divergence
@js_divergence.setter
def js_divergence(self, val: float) -> None:
"""Setter"""
if not isinstance(val, (float, int)):
raise ValueError("js_divergence must be of type float or int.")
self._js_divergence = float(val)
@property
def historical_auc(self) -> pd.DataFrame | None:
"""Getter"""
if not hasattr(self, "_historical_auc"):
return None
return self._historical_auc
@historical_auc.setter
def historical_auc(self, val: pd.DataFrame) -> None:
"""Setter"""
if not isinstance(val, pd.DataFrame):
raise ValueError("historical_auc must be a pandas DataFrame.")
self._historical_auc = val
@property
def data_modeldrift(self) -> pd.DataFrame | None:
"""Getter"""
if not hasattr(self, "_data_modeldrift"):
return None
return self._data_modeldrift
@data_modeldrift.setter
def data_modeldrift(self, val: pd.DataFrame) -> None:
"""Setter"""
if not isinstance(val, pd.DataFrame):
raise ValueError("data_modeldrift must be a pandas DataFrame.")
self._data_modeldrift = val
@property
def current_dataset_name(self) -> str:
"""Returns the display name of df_current"""
return self._dataset_names[0]
@property
def baseline_dataset_name(self) -> str:
"""Returns the display name of df_baseline"""
return self._dataset_names[1]
@property
def encoding(self) -> Any:
"""Getter"""
return self._encoding
@encoding.setter
def encoding(self, val: Any) -> None:
"""Setter"""
self._encoding = val
@property
def deployed_model(self) -> Any:
"""Getter"""
return self._deployed_model
@deployed_model.setter
def deployed_model(self, val: Any) -> None:
"""Setter"""
self._deployed_model = val
@property
def datadrift_stat_test(self) -> pd.DataFrame:
"""Getter"""
if not hasattr(self, "_datadrift_stat_test"):
raise RuntimeError("datadrift_stat_test has not been initialized yet.")
return self._datadrift_stat_test
@datadrift_stat_test.setter
def datadrift_stat_test(self, val: pd.DataFrame) -> None:
"""Setter"""
if not isinstance(val, pd.DataFrame):
raise ValueError("datadrift_stat_test must be a pandas DataFrame.")
self._datadrift_stat_test = val
@property
def df_concat(self) -> pd.DataFrame:
"""Getter"""
if not hasattr(self, "_df_concat"):
raise RuntimeError("df_concat has not been initialized yet.")
return self._df_concat
@df_concat.setter
def df_concat(self, val: pd.DataFrame | None) -> None:
"""Setter"""
if val is not None and not isinstance(val, pd.DataFrame):
raise ValueError("df_concat must be a pandas DataFrame or None.")
self._df_concat = val
@property
def datadrift_target(self) -> str:
"""Getter"""
return self._datadrift_target
@property
def plot(self) -> SmartPlotter:
"""Getter"""
return self._plot
@property
def da(self) -> DatasetAnalysis:
"""Getter"""
if not hasattr(self, "_da"):
raise RuntimeError("da has not been initialized yet.")
return self._da
@da.setter
def da(self, val: DatasetAnalysis) -> None:
"""Setter"""
if not isinstance(val, DatasetAnalysis):
raise ValueError(f"da must be a of type {DatasetAnalysis.__name__}.")
self._da = val