Source code for eurybia.core.smartdrift

"""
- SmartDrift module
"""
import copy
import datetime
import io
import logging
import pickle
import shutil
import tempfile
from pathlib import Path
from typing import Dict, Text

import catboost
import pandas as pd
from pandas.api.types import is_datetime64_any_dtype as is_datetime
from shapash.explainer.smart_explainer import SmartExplainer
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split

from eurybia.core.smartplotter import SmartPlotter
from eurybia.style.style_utils import colors_loading, select_palette
from eurybia.utils.io import load_pickle, save_pickle
from eurybia.utils.model_drift import catboost_hyperparameter_init, catboost_hyperparameter_type
from eurybia.utils.statistical_tests import chisq_test, compute_js_divergence, ksmirnov_test
from eurybia.utils.utils import base_100, convert_date_col_into_multiple_col

logging.getLogger("papermill").setLevel(logging.WARNING)
logging.getLogger("blib2to3").setLevel(logging.WARNING)


[docs]class SmartDrift: """ The SmartDrift class is the main object to compute drift in the Eurybia library It allows to calculate data drift between 2 datasets using a data drift classification model Attributes ---------- df_current: pandas.DataFrame current (or production) dataset which is compared to df_baseline df_baseline: pandas.DataFrame baseline (or learning) dataset which is compared to df_current datadrift_classifier: model object model used for binary classification of data drift xpl: Shapash object object used to compute explainability on datadrift_classifier df_predict: pandas.DataFrame computed score on both datasets if a deployed_model is specified feature_importance: pandas.DataFrame feature importance of datadrift_classifier and feature importance of production model if exist pb_cols: dict Dictionnary that references columns differences between df_current and df_baseline err_mods: dict Dictionnary that references modalities differences in columns between df_current and df_baseline auc: int Value auc of model drift historical_auc: pandas.DataFrame Dataframe that contains auc history of datadrift_classifier over time data_modeldrift: pandas.DataFrame Dataframe that contains performance history of deployed_model ignore_cols: list list of feature to ignore in compute dataset_names : dict, (Optional) Dictionnary used to specify dataset names to display in report. _df_concat : pandas.DataFrame Dataframe that's composed of both df_baseline and df_current concatenated plot : eurybia.core.smartplotter.SmartPlotter Instance of an Eurybia SmartPlotter class. It's used for graph displaying purpose. deployed_model: model object, optional model in production used to put in perspective drift and to predict encoding: preprocessing object, optional (default: None) Preprocessing used before the training step datadrift_stat_test : dict Datadrift statistical tests for each feature. Each test identifies whether the feature has drifted. There are 2 types of test implemented depending on the type of feature: - Chi-square for discrete variables - ref: (https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.chi2_contingency.html) - Kolmogorov-Smirnov for continuous variables - ref: (https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.kstest.html) This datadrift_stat_test attribute specifies for each feature the test performed, the statistic the test and the p value palette_name : str (default: 'eurybia') Name of the palette used for the colors of the report (refer to style folder). colors_dict: dict Dict of the colors used in the different plots datadrift_file : str, optional Name of the csv file that contains the performance history of data drift If no datadrift file is given, the drift will not be logged js_divergence : float Jensen-Shannon divergence of probability distributions - ref: (https://en.wikipedia.org/wiki/Jensen%E2%80%93Shannon_divergence) How to declare a new SmartDrift object? Example -------- >>> SD = Smartdrift(df_current=df_production, df_baseline=df_learning) """ @classmethod def load(cls, path): """ The load() class method allows Eurybia users to use a pickled SmartDrift. Parameters ---------- path : str File path of the pickle file. Returns ---------- SmartDrift SmartDrift instance loaded with the pickle given as "path" Example -------- >>> from eurybia import SmartDrift >>> SmartDrift.load('path_to_pkl/smardrift.pkl') """ dict_to_load = load_pickle(path) sd = cls() if isinstance(dict_to_load, dict): for attr, val in dict_to_load.items(): if isinstance(val, io.BytesIO): setattr(sd, attr, pickle.load(val.seek(0))) elif attr == "xpl": xpl = SmartExplainer(model=val["model"]) xpl.__dict__.update(val) setattr(sd, attr, xpl) else: setattr(sd, attr, val) else: raise ValueError("pickle file must contain dictionary") return sd def __init__( self, df_current=None, df_baseline=None, dataset_names={"df_current": "Current dataset", "df_baseline": "Baseline dataset"}, deployed_model=None, encoding=None, palette_name="eurybia", colors_dict=None, ): """ Parameters ---------- df_current: pandas.DataFrame current (or production) dataset which is compared to df_baseline df_baseline: pandas.DataFrame baseline (or learning) dataset which is compared to df_current dataset_names : dict, (Optional) Dictionnary used to specify dataset names to display in report. deployed_model: model object, optional model in production used to put in perspective drift and to predict encoding: preprocessing object, optional (default: None) Preprocessing used before the training step palette_name : str (default: 'eurybia') Name of the palette used for the colors of the report (refer to style folder). colors_dict: dict Dict of the colors used in the different plots How to declare a new SmartDrift object ? Example -------- >>> SD = Smartdrift(df_current=df_production, df_baseline=df_learning) """ self.df_current = df_current self.df_baseline = df_baseline self.xpl = None self.df_predict = None self.feature_importance = None self.pb_cols, self.err_mods = None, None self.auc = None self.js_divergence = None self.historical_auc = None self.data_modeldrift = None self.ignore_cols = list() self.datadrift_stat_test = None if "df_current" not in dataset_names.keys() or "df_baseline" not in dataset_names.keys(): raise ValueError("dataset_names must be a dictionnary with keys 'df_current' and 'df_baseline'") self.dataset_names = pd.DataFrame(dataset_names, index=[0]) self._df_concat = None self._datadrift_target = "target" self.plot = SmartPlotter(self) self.deployed_model = deployed_model self.encoding = encoding self.palette_name = palette_name self.colors_dict = copy.deepcopy(select_palette(colors_loading(), self.palette_name)) if colors_dict is not None: self.colors_dict.update(colors_dict) self.plot.define_style_attributes(colors_dict=self.colors_dict) self.datadrift_file = None
[docs] def compile( self, full_validation=False, ignore_cols: list = list(), sampling=True, sample_size=100000, datadrift_file=None, date_compile_auc=None, hyperparameter: Dict = catboost_hyperparameter_init.copy(), attr_importance="feature_importances_", ): r""" The compile method is the first step to compute data drift. It allows to calculate data drift between 2 datasets using a data drift classification model. Most of the parameters are optional but helps to adapt the data drift calculation if necessary. This step can last a few moments with large datasets. Parameters ---------- full_validation: bool, optional (default: False) If True, analyze consistency on modalities between columns ignore_cols: list, optional list of feature to ignore in compute sampling: bool, optional If True, applies the sampling sample_size: int, optional the size of the sample to build date_compile_auc: str (optional) format dd/mm/yyyy use for specify date of compute drift, useful when compute few time drift for different time at the same moment hyperparameter: dict, optional if user want to modify catboost hyperparameter attr_importance: string, optional (default: "feature_importances\_") Attribute "feature_importance" of the deployed_model datadrift_file : str, optional Name of the csv file that contains the performance history of data drift. If no datadrift file is given, the drift will not be logged Examples -------- >>> SD.compile() """ if datadrift_file is not None: self.datadrift_file = datadrift_file if hyperparameter is not None: for key, value in catboost_hyperparameter_init.items(): catboost_hyperparameter_init[key] = ( hyperparameter[key] if key in hyperparameter and str(type(hyperparameter[key])) in catboost_hyperparameter_type[key] else value ) hyperparameter = catboost_hyperparameter_init.copy() if sample_size is not None: self.df_baseline = self._sampling(sampling, sample_size, self.df_baseline) self.df_current = self._sampling(sampling, sample_size, self.df_current) # Checking datasets self._check_dataset(ignore_cols) # Consistency analysis pb_cols, err_mods = self._analyze_consistency(full_validation, ignore_cols) # Adding results to ignored columns ignore_cols = list(set(ignore_cols + [item for sublist in [pb_cols[sl] for sl in pb_cols] for item in sublist])) if len(ignore_cols) != 0: self.df_baseline = self.df_baseline[[c for c in self.df_baseline.columns if c not in ignore_cols]] self.df_current = self.df_current[[c for c in self.df_current.columns if c not in ignore_cols]] df_concat = ( pd.concat([self.df_current, self.df_baseline], keys=[1, 0]) .reset_index() .rename(columns={"level_0": self._datadrift_target}) ) df_concat.drop(df_concat.columns[1], axis=1, inplace=True) varz = [c for c in df_concat.columns if c not in [self._datadrift_target] and c not in ignore_cols] dtypes = df_concat[varz].dtypes.map(str) cat_features = list(dtypes[dtypes.isin(["object"])].index) df_concat[cat_features] = df_concat[cat_features].fillna("NA") self._df_concat = df_concat train, test = train_test_split(df_concat[varz + [self._datadrift_target]], test_size=0.25, random_state=42) i = 0 indice_cat = [] for var_x in df_concat[varz]: if var_x in cat_features: indice_cat.append(i) i = i + 1 train_pool_cat = catboost.Pool(data=train[varz], label=train["target"].astype(int), cat_features=indice_cat) test_pool_cat = catboost.Pool(data=test[varz], label=test["target"].astype(int), cat_features=indice_cat) datadrift_classifier = catboost.CatBoostClassifier( max_depth=hyperparameter["max_depth"], l2_leaf_reg=hyperparameter["l2_leaf_reg"], learning_rate=hyperparameter["learning_rate"], iterations=hyperparameter["iterations"], use_best_model=hyperparameter["use_best_model"], custom_loss=hyperparameter["custom_loss"], loss_function=hyperparameter["loss_function"], eval_metric=hyperparameter["eval_metric"], task_type="CPU", allow_writing_files=False, ) datadrift_classifier = datadrift_classifier.fit(train_pool_cat, eval_set=test_pool_cat, silent=True) xpl = SmartExplainer( label_dict={0: self.dataset_names["df_baseline"].values[0], 1: self.dataset_names["df_current"].values[0]}, model=datadrift_classifier, ) x_test = test[varz] y_test = test[self._datadrift_target] xpl.compile(x=x_test) xpl.compute_features_import(force=True) self.xpl = xpl self.xpl.define_style(colors_dict=self.colors_dict) self.datadrift_classifier = datadrift_classifier self.df_predict = self._predict(deployed_model=self.deployed_model, encoding=self.encoding) self.auc = roc_auc_score(y_test, datadrift_classifier.predict(x_test)) self.feature_importance = self._feature_importance( deployed_model=self.deployed_model, attr_importance=attr_importance ) self.plot.feature_importance = self.feature_importance self.pb_cols, self.err_mods = pb_cols, err_mods if self.deployed_model is not None: self.js_divergence = compute_js_divergence( self.df_predict.loc[lambda df: df["dataset"] == self.dataset_names["df_baseline"].values[0], :][ "Score" ].values, self.df_predict.loc[lambda df: df["dataset"] == self.dataset_names["df_current"].values[0], :][ "Score" ].values, n_bins=20, ) self.historical_auc = self._histo_datadrift_metric( datadrift_file=self.datadrift_file, date_compile_auc=date_compile_auc ) self.data_modeldrift = None self.ignore_cols = ignore_cols if self.deployed_model is not None: self.datadrift_stat_test = self._compute_datadrift_stat_test()
[docs] def generate_report( self, output_file, project_info_file=None, title_story="Drift Report", title_description="", working_dir=None ): """ This method will generate an HTML report containing different information about the project. It allows the information compiled to be rendered. It can be associated with a project info yml file on which can figure different information about the project. Parameters ---------- output_file : str Path to the HTML file to write project_info_file : str Path to the file used to display some information about the project in the report title_story : str, optional Report title title_description : str, optional Report title description (as written just below the title) working_dir : str, optional Working directory in which will be generated the notebook used to create the report and where the objects used to execute it will be saved. This parameter can be usefull if one wants to create its own custom report and debug the notebook used to generate the html report. If None, a temporary directory will be used Examples -------- >>> SD.generate_report( output_file='report.html', project_info_file='project_info.yml', title_story="Drift project report", title_description="This document is a drift report of the score in production" ) """ from eurybia.report.generation import execute_report rm_working_dir = False if not working_dir: working_dir = tempfile.mkdtemp() rm_working_dir = True try: execute_report( project_info_file=project_info_file, explainer=self.xpl, smartdrift=self, config_report=dict(title_story=title_story, title_description=title_description), output_file=output_file, ) finally: if rm_working_dir: shutil.rmtree(working_dir)
def _check_dataset(self, ignore_cols: list = list()): """ Method to check if datasets are correct before to be analysed and if it's not, try to modify them and informs the user. In worse case raise an error. Parameters ---------- full_validation : bool, optional (default: False) If True, analyze consistency on modalities between columns ignore_cols: list, optional list of feature to ignore in compute """ if len([column for column in self.df_current.columns if is_datetime(self.df_current[column])]) > 0: if self.deployed_model is None: for col in [column for column in self.df_current.columns if is_datetime(self.df_current[column])]: print( f"""Column {col} will be dropped and transformed in df_current by : {col}_year, {col}_month, {col}_day""" ) self.df_current = convert_date_col_into_multiple_col(self.df_current) else: raise TypeError("df_current have datetime column. You should drop it") if len([column for column in self.df_baseline.columns if is_datetime(self.df_baseline[column])]) > 0: if self.deployed_model is None: for col in [column for column in self.df_baseline.columns if is_datetime(self.df_baseline[column])]: print( f"""Column {col} will be dropped and transformed in df_baseline by : {col}_year, {col}_month, {col}_day""" ) self.df_baseline = convert_date_col_into_multiple_col(self.df_baseline) else: raise TypeError("df_baseline have datetime column. You should drop it") def _analyze_consistency(self, full_validation=False, ignore_cols: list = list()): """ method to analyse consistency between the 2 datasets, in terms of columns and modalities Parameters ---------- full_validation : bool, optional (default: False) If True, analyze consistency on modalities between columns ignore_cols: list, optional list of feature to ignore in compute """ logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(module)s: %(message)s", datefmt="%y/%m/%d %H:%M:%S" ) if not isinstance(self.df_current, pd.DataFrame) or not isinstance(self.df_baseline, pd.DataFrame): raise TypeError("df_current and df_baseline should be Pandas dataframes.") if len(ignore_cols) > 0: print(f"""The following variables are manually set to be ignored in the analysis: \n {ignore_cols}""") # Features new_cols = [c for c in self.df_baseline.columns if c not in self.df_current.columns] removed_cols = [c for c in self.df_current.columns if c not in self.df_baseline.columns] if len(new_cols) > 0: print( f"""The following variables are no longer available in the current dataset and will not be analyzed: \n {new_cols}""" ) if len(removed_cols) > 0: print( f"""The following variables are only available in the current dataset and will not be analyzed: \n {removed_cols}""" ) common_cols = [c for c in self.df_current.columns if c in self.df_baseline.columns] # dtypes err_dtypes = [ c for c in common_cols if self.df_baseline.dtypes.map(str)[c] != self.df_current.dtypes.map(str)[c] ] if len(err_dtypes) > 0: print( f"""The following variables have mismatching dtypes and will not be analyzed: \n {err_dtypes}""" ) # Feature values err_mods: Dict[Text, Dict] = {} if full_validation is True: invalid_cols = ignore_cols + new_cols + removed_cols + err_dtypes for column in self.df_baseline.columns: if column not in invalid_cols and self.df_baseline.dtypes.map(str)[column] == "object": uniques_histo = pd.unique(self.df_baseline[column]) uniques_current = pd.unique(self.df_current[column]) new_mods = [mod for mod in uniques_current if mod not in uniques_histo] removed_mods = [mod for mod in uniques_histo if mod not in uniques_current] if len(new_mods) > 0 or len(removed_mods) > 0: err_mods[column] = {} err_mods[column]["New distinct values"] = new_mods err_mods[column]["Removed distinct values"] = removed_mods print( f"""The variable {column} has mismatching unique values: {new_mods} | {removed_mods}\n""" ) return ({"New columns": new_cols, "Removed columns": removed_cols, "Type errors": err_dtypes}, err_mods) def _predict(self, deployed_model=None, encoding=None): """ Create an attributes df_predict with the computed score on both datasets Parameters ---------- deployed_model : model object, optional (default: None) model in production used to put in perspective drift and to predict encoding : preprocessing object, optional (default: None) Preprocessing used before the training step Returns ------- pandas.DataFrame, None DataFrame with predicted score for both datasets """ if deployed_model is None: return None if not hasattr(deployed_model, "predict_proba") and not hasattr(deployed_model, "predict"): raise Exception("deployed_model need to have predict or predict_proba method") df_baseline = self.df_baseline df_current = self.df_current if encoding is not None: try: df_baseline = encoding.transform(df_baseline) df_current = encoding.transform(df_current) except BaseException as error: raise Exception( """ Encoding specified can't be applied directly on df_current/df_baseline - Error : """ + str(error) ) if hasattr(deployed_model, "predict_proba"): try: df_baseline_pred = pd.DataFrame(deployed_model.predict_proba(df_baseline)[:, 1], columns=["Score"]) df_current_pred = pd.DataFrame(deployed_model.predict_proba(df_current)[:, 1], columns=["Score"]) except BaseException as error: raise Exception( """ Encoding specified or deployed_model used can't be applied directly on df_current/df_baseline - Error : """ + str(error) ) else: try: df_baseline_pred = pd.DataFrame(deployed_model.predict(df_baseline), columns=["Score"]) df_current_pred = pd.DataFrame(deployed_model.predict(df_current), columns=["Score"]) except BaseException as error: raise Exception( """ Encoding specified or deployed_model used can't be applied directly on df_current/df_baseline - Error : """ + str(error) ) return pd.concat( [ df_baseline_pred.assign(dataset=self.dataset_names["df_baseline"].values[0]), df_current_pred.assign(dataset=self.dataset_names["df_current"].values[0]), ] ).reset_index(drop=True) def _feature_importance(self, deployed_model=None, attr_importance="feature_importances_"): """ Create an attributes feature_importance with the computed score on both datasets Parameters ---------- deployed_model : model object, optional (default: None) model in production used to put in perspective drift and to predict attr_importance : string, optional (default: "feature_importances_") Attribute "feature_importance" of the deployed_model Returns ------- pandas.DataFrame, None DataFrame with feature importance from production model and drift model. """ if deployed_model is None: return None try: array_importance = getattr(deployed_model, attr_importance) except BaseException as error: raise Exception( """ deployed_model used can't allow to get features importance on df_baseline - Error : """ + str(error) ) feature_importance_drift = pd.DataFrame( self.xpl.features_imp[0].values, index=self.xpl.features_imp[0].index, columns=["datadrift_classifier"] ) var_baseline = [c for c in self.df_baseline.columns if c not in ["target"]] if len(array_importance) != len(var_baseline): raise ValueError( """ Number of features in df_baseline doesn't match feature importance's shape returned by deployed model. """ ) feature_importance_model_prod = pd.DataFrame( array_importance, index=self.df_baseline[var_baseline].columns, columns=["deployed_model"] ) feature_importance = feature_importance_model_prod.merge( feature_importance_drift, how="left", left_index=True, right_index=True ).reset_index() feature_importance = feature_importance.rename(columns={"index": "feature"}) feature_importance["deployed_model"] = base_100(feature_importance["deployed_model"]) return feature_importance def _sampling(self, sampling, sample_size, dataset): """ Return a sampling from the original dataframe Parameters ---------- sampling : bool If True, applies the sampling sample_size : int the size of the sample to build df : pd.DataFrame The Dataframe to apply sampling Returns ------- pandas.DataFrame a sample of the original DataFrame or the original DataFrame """ if sampling: if dataset.shape[0] > sample_size: return dataset.sample(sample_size) else: return dataset else: return dataset def _histo_datadrift_metric(self, datadrift_file=None, date_compile_auc=None): """ Method which computes datadrift metrics (AUC, and Jensen Shannon prediction divergence if the deployed_model is filled in) and append it into a dataframe that will be exported during the generate_report method Parameters ---------- datadrift_file : str, (optional) date_compile_auc: str (optional) format dd/mm/yyyy use for specify date of compute drift, useful when compute few time drift for different time at the same moment Returns ------- pandas.DataFrame or None Dataframe with dates, AUC and Jensen Shannon prediction divergence computed at this date """ logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(module)s: %(message)s", datefmt="%y/%m/%d %H:%M:%S" ) if self.datadrift_file is None and date_compile_auc is None: return None elif self.datadrift_file is None and date_compile_auc is not None: self.datadrift_file = "historical_AUC.csv" return None else: if date_compile_auc: try: datetime.datetime.strptime(date_compile_auc, "%d/%m/%Y") # FIXME find exact exception type and use it instead of "Exception" except Exception: raise Exception("The argument date must have the format '%d/%m/%Y'") else: date_compile_auc = (datetime.datetime.today().date()).strftime("%d/%m/%Y") print(f"The computed AUC on the X_test used to build datadrift_classifier is equal to: {self.auc}") if self.deployed_model is not None: df_auc = pd.DataFrame( {"date": [date_compile_auc], "auc": [self.auc], "JS_predict": [self.js_divergence]} ) else: df_auc = pd.DataFrame({"date": [date_compile_auc], "auc": [self.auc]}) if self.datadrift_file is not None: if Path(self.datadrift_file).is_file() and self.datadrift_file.endswith(".csv"): histo_auc = pd.read_csv(self.datadrift_file).reset_index(drop=True) if self.deployed_model is not None: if not ( any(histo_auc.columns.isin(["date"])) and any(histo_auc.columns.isin(["auc"])) and any(histo_auc.columns.isin(["JS_predict"])) ): raise Exception("The csv data must have columns 'date', 'auc' and 'JS_predict'") df_auc = pd.concat([histo_auc[["date", "auc", "JS_predict"]], df_auc]).reset_index(drop=True) else: if not (any(histo_auc.columns.isin(["date"])) and any(histo_auc.columns.isin(["auc"]))): raise Exception("The csv data must have columns 'date' and 'auc'") df_auc = pd.concat([histo_auc[["date", "auc"]], df_auc]).reset_index(drop=True) else: print(f"{self.datadrift_file} did not exist and was created. ") try: df_auc.to_csv(self.datadrift_file) except OSError as error: raise OSError("Can't save to csv the AUC metrics, error : " + str(error)) return df_auc
[docs] def add_data_modeldrift( self, dataset, metric="performance", reference_columns=[], year_col="annee", month_col="mois" ): """ When method drift is specified, It will display in the report the several plots from a dataframe to analyse drift model from the deployed model. Each plot will represent one possible computed metric according to its groups. (grouped by date(year-month), reference_columns). Parameters ---------- df : pd.DataFrame The Dataframe with all the computed metrics. metric: str, (default: 'performance') The column name of the metric computed reference_columns: list, (default: []) the column names to use for aggregation with the Date computed year_col: str, (default: 'annee') The column name of the year where the metric has been computed month_col: str, (default: 'mois') The column name of the month where the metric has been computed """ try: df_modeldrift = dataset.copy() df_modeldrift[month_col] = df_modeldrift[month_col].apply(lambda row: str(row).split(".")[0]) df_modeldrift["Date"] = ( "01/" + df_modeldrift[month_col] + "/" + df_modeldrift[year_col].astype("int64").astype(str) ) df_modeldrift["Date"] = pd.to_datetime(df_modeldrift["Date"], format="%d/%m/%Y") df_aggregate = pd.DataFrame( df_modeldrift.groupby(["Date"] + reference_columns)[metric].mean() ).reset_index() df_aggregate["Date"] = pd.to_datetime(df_aggregate["Date"]).dt.strftime("%d/%m/%Y") self.data_modeldrift = df_aggregate except BaseException as error: raise Exception( """ The df specified in the method doesn't allow us to aggregate it for the report. - Error - """ + str(error) )
def _compute_datadrift_stat_test(self, max_size=50000, categ_max=20): """ calculates all statistical tests to analyze the drift of each feature Parameters ---------- max_size : int Sets the maximum number of rows. If the datasets are larger there is sampling categ_max: int Maximum number of values ​​per feature to apply the chi square test Returns : ------- dict : keys - features values - dict containing testname, statistic, pvalue """ # sampling baseline = self.df_baseline.sample(n=max_size) if self.df_baseline.shape[0] > max_size else self.df_baseline current = self.df_current.sample(n=max_size) if self.df_current.shape[0] > max_size else self.df_current test_results = {} # compute test for each feature for features, count in self.xpl.features_desc.items(): try: if current[features].dtypes.kind == "O" and count <= categ_max: test = chisq_test(current[features].to_numpy(), baseline[features].to_numpy()) else: test = ksmirnov_test(current[features].to_numpy(), baseline[features].to_numpy()) except BaseException as e: raise Exception( """ There is a problem with the format of {} column between the two datasets. Error: """.format( str(features) ) + str(e) ) test_results[features] = test return pd.DataFrame.from_dict(test_results, orient="index") def define_style(self, palette_name=None, colors_dict=None): """ the define_style function is a function that uses a palette or a dict to define the different styles used in the different outputs of eurybia Parameters ---------- palette_name : str (default: 'eurybia') Name of the palette used for the colors of the report (refer to style folder). colors_dict: dict Dict of the colors used in the different plots """ if palette_name is None and colors_dict is None: raise ValueError("At least one of palette_name or colors_dict parameters must be defined") new_palette_name = palette_name or self.palette_name new_colors_dict = copy.deepcopy(select_palette(colors_loading(), new_palette_name)) if colors_dict is not None: new_colors_dict.update(colors_dict) self.colors_dict.update(new_colors_dict) self.plot.define_style_attributes(colors_dict=self.colors_dict) self.xpl.define_style(colors_dict=self.colors_dict) def save(self, path): """ Save method allows user to save SmartDrift object on disk using a pickle file. Save method can be useful: you don't have to recompile to display results later Parameters ---------- path : str File path to store the pickle file Example -------- >>> smartdrift.save('path_to_pkl/smartdrift.pkl') """ dict_to_save = {} for att in self.__dict__.keys(): if isinstance(getattr(self, att), (list, dict, pd.DataFrame, pd.Series, type(None), bool, float)): dict_to_save.update({att: getattr(self, att)}) elif isinstance(getattr(self, att), SmartExplainer): smartexplainer_dict = {} for att_xpl in self.xpl.__dict__.keys(): if isinstance( getattr(self.xpl, att_xpl), (list, dict, pd.DataFrame, pd.Series, type(None), bool) ) or att_xpl in ["model", "preprocessing", "postprocessing"]: smartexplainer_dict.update({att_xpl: getattr(self.xpl, att_xpl)}) dict_to_save.update({att: smartexplainer_dict}) save_pickle(dict_to_save, path)