from itertools import productfrom category_encoders import TargetEncoderfrom sklearn.model_selection import StratifiedKFold, KFold
class MeanEncoder: def __init__(self, categorical_features, n_splits=5, target_type='classification', min_samples_leaf=2, smoothing=1, hierarchy=None, verbose=0, shuffle=False, random_state=None): """ Parameters ---------- categorical_features: list of str the name of the categorical columns to encode. n_splits: int the number of splits used in mean encoding. target_type: str, 'regression' or 'classification'. min_samples_leaf: int For regularization the weighted average between category mean and global mean is taken. The weight is an S-shaped curve between 0 and 1 with the number of samples for a category on the x-axis. The curve reaches 0.5 at min_samples_leaf. (parameter k in the original paper) smoothing: float smoothing effect to balance categorical average vs prior. Higher value means stronger regularization. The value must be strictly bigger than 0. Higher values mean a flatter S-curve (see min_samples_leaf). hierarchy: dict or dataframe A dictionary or a dataframe to define the hierarchy for mapping. If a dictionary, this contains a dict of columns to map into hierarchies. Dictionary key(s) should be the column name from X which requires mapping. For multiple hierarchical maps, this should be a dictionary of dictionaries.
If dataframe: a dataframe defining columns to be used for the hierarchies. Column names must take the form: HIER_colA_1, ... HIER_colA_N, HIER_colB_1, ... HIER_colB_M, ... where [colA, colB, ...] are given columns in cols list. 1:N and 1:M define the hierarchy for each column where 1 is the highest hierarchy (top of the tree). A single column or multiple can be used, as relevant. verbose: int integer indicating verbosity of the output. 0 for none. shuffle : bool, default=False random_state : int or RandomState instance, default=None When `shuffle` is True, `random_state` affects the ordering of the indices, which controls the randomness of each fold for each class. Otherwise, leave `random_state` as `None`. Pass an int for reproducible output across multiple function calls. """
self.categorical_features = categorical_features self.n_splits = n_splits self.learned_stats = {} self.min_samples_leaf = min_samples_leaf self.smoothing = smoothing self.hierarchy = hierarchy self.verbose = verbose self.shuffle = shuffle self.random_state = random_state
if target_type == 'classification': self.target_type = target_type self.target_values = [] else: self.target_type = 'regression' self.target_values = None
def mean_encode_subroutine(self, X_train, y_train, X_test, variable, target): X_train = X_train[[variable]].copy() X_test = X_test[[variable]].copy()
if target is not None: nf_name = '{}_pred_{}'.format(variable, target) X_train['pred_temp'] = (y_train == target).astype(int) # classification else: nf_name = '{}_pred'.format(variable) X_train['pred_temp'] = y_train # regression prior = X_train['pred_temp'].mean() te = TargetEncoder(verbose=self.verbose, hierarchy=self.hierarchy, cols=[variable], smoothing=self.smoothing, min_samples_leaf=self.min_samples_leaf) te.fit(X_train[[variable]], X_train['pred_temp']) tmp_l = te.ordinal_encoder.mapping[0]["mapping"].reset_index() tmp_l.rename(columns={"index":variable, 0:"encode"}, inplace=True) tmp_l.dropna(inplace=True) tmp_r = te.mapping[variable].reset_index() if self.hierarchy is None: tmp_r.rename(columns={variable: "encode", 0:nf_name}, inplace=True) else: tmp_r.rename(columns={"index": "encode", 0:nf_name}, inplace=True) col_avg_y = pd.merge(tmp_l, tmp_r, how="left",on=["encode"]) col_avg_y.drop(columns=["encode"], inplace=True) col_avg_y.set_index(variable, inplace=True) nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values
return nf_train, nf_test, prior, col_avg_y
def fit(self, X, y): """ :param X: pandas DataFrame, n_samples * n_features :param y: pandas Series or numpy array, n_samples :return X_new: the transformed pandas DataFrame containing mean-encoded categorical features """ X_new = X.copy() if self.target_type == 'classification': skf = StratifiedKFold(self.n_splits, shuffle=self.shuffle, random_state=self.random_state) else: skf = KFold(self.n_splits, shuffle=self.shuffle, random_state=self.random_state)
if self.target_type == 'classification': self.target_values = sorted(set(y)) self.learned_stats = {'{}_pred_{}'.format(variable, target): [] for variable, target in product(self.categorical_features, self.target_values)} for variable, target in product(self.categorical_features, self.target_values): nf_name = '{}_pred_{}'.format(variable, target) X_new.loc[:, nf_name] = np.nan for large_ind, small_ind in skf.split(y, y): nf_large, nf_small, prior, col_avg_y = self.mean_encode_subroutine( X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, target) X_new.iloc[small_ind, -1] = nf_small self.learned_stats[nf_name].append((prior, col_avg_y)) else: self.learned_stats = {'{}_pred'.format(variable): [] for variable in self.categorical_features} for variable in self.categorical_features: nf_name = '{}_pred'.format(variable) X_new.loc[:, nf_name] = np.nan for large_ind, small_ind in skf.split(y, y): nf_large, nf_small, prior, col_avg_y = self.mean_encode_subroutine( X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, None) X_new.iloc[small_ind, -1] = nf_small self.learned_stats[nf_name].append((prior, col_avg_y)) return X_new
def transform(self, X): """ :param X: pandas DataFrame, n_samples * n_features :return X_new: the transformed pandas DataFrame containing mean-encoded categorical features """ X_new = X.copy()
if self.target_type == 'classification': for variable, target in product(self.categorical_features, self.target_values): nf_name = '{}_pred_{}'.format(variable, target) X_new[nf_name] = 0 for prior, col_avg_y in self.learned_stats[nf_name]: X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[ nf_name] X_new[nf_name] /= self.n_splits else: for variable in self.categorical_features: nf_name = '{}_pred'.format(variable) X_new[nf_name] = 0 for prior, col_avg_y in self.learned_stats[nf_name]: X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[ nf_name] X_new[nf_name] /= self.n_splits
return X_new
评论