lightgbm 為 GBDT 算法的又一個(gè)工程實(shí)現(xiàn)普筹,相比于 xgboost,lightgbm 訓(xùn)練效率更高,同時(shí)效果同樣優(yōu)秀野宜。但是其參數(shù)眾多扫步,人工調(diào)參不僅繁瑣,效果也未必能獲得最優(yōu)匈子。
hyperopt 是 python 中的一個(gè)用于"分布式異步算法組態(tài)/超參數(shù)優(yōu)化"的類庫河胎,廣泛意義上,可以將帶有超參數(shù)的模型看作是一個(gè)必然的非凸函數(shù)旬牲,因此hyperopt 幾乎可以穩(wěn)定的獲取比手工更加合理的調(diào)參結(jié)果仿粹。
0. 相關(guān)python庫的載入
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.preprocessing import OneHotEncoder, RobustScaler
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split
from hyperopt import hp, fmin, tpe, partial
from pyspark.sql import SparkSession
from sklearn import metrics
import lightgbm as lgb
import pandas as pd
import numpy as np
import pickle
import base64
import sys
1. 數(shù)據(jù)準(zhǔn)備
首先定義一個(gè)sklearn 里 pipeline 方式的數(shù)據(jù)轉(zhuǎn)換類:
class FeatureProcess(BaseEstimator, TransformerMixin):
"""
定義數(shù)據(jù)轉(zhuǎn)換的類,使用方式如下:
pipeline = FeatureProcess(categorical_cols, numerical_cols, label_cols)
pipiline.fit(train_data)
X = pipeline.transform(train_data)
其中 train_data 為 pandas 的 dataframe
"""
def __init__(self, categorical_cols=None, numerical_cols=None, label_cols=None):
"""
:param categorical_cols: 類別特征
:param numerical_cols: 數(shù)值類特征
:param label_cols: 標(biāo)簽類特征 格式為空格分隔的字符串
"""
self.categorical_cols = categorical_cols
self.numerical_cols = numerical_cols
self.label_cols = label_cols
self.onehot = None
self.scaler = None
self.labels_pipe = {}
self.stop_words = {'null', 'nan'}
self.feature_names = None
def _get_onehot_feature_names(self, categorical_cols):
"""
返回 onthot 后的特征名原茅,定義為 原始列名+onthot對(duì)應(yīng)的值
:param categorical_cols:
:return:
"""
feature_names = list()
for col, values in zip(categorical_cols, self.onehot.categories_):
for val in values:
feature_names.append('%s_%s' % (col, val))
return feature_names
def fit(self, df, y=None):
# 空值處理
df = df.replace({'': np.NAN})
# 類型轉(zhuǎn)換
df[self.categorical_cols + self.label_cols] = df[self.categorical_cols + self.label_cols].astype(str)
df[self.numerical_cols] = df[self.numerical_cols].astype(float).replace(float('inf'), 0)
# onehot處理
self.onehot = OneHotEncoder(handle_unknown='ignore', sparse=False).fit(df[self.categorical_cols])
# 標(biāo)簽數(shù)據(jù)處理
label_feature_names = []
for label_col in self.label_cols:
self.labels_pipe[label_col] = TfidfVectorizer(stop_words=self.stop_words, min_df=500).fit(df[label_col].values)
label_feature_names.extend(sorted(self.labels_pipe[label_col].vocabulary_,
key=lambda x: self.labels_pipe[label_col].vocabulary_[x]))
# 去掉最大值
self.scaler = RobustScaler(with_centering=False, with_scaling=False, quantile_range=(1, 99)).fit(
df[self.numerical_cols])
# feature_names
self.feature_names = self._get_onehot_feature_names(self.categorical_cols) + self.numerical_cols + label_feature_names
return self
def transform(self, df, y=None):
# 空值處理
df = df.replace({'': np.NAN})
# 類型轉(zhuǎn)換
df[self.categorical_cols + self.label_cols] = df[self.categorical_cols + self.label_cols].astype(str)
df[self.numerical_cols] = df[self.numerical_cols].astype(float).replace(float('inf'), 0)
# 數(shù)據(jù)轉(zhuǎn)換
onehot_data = self.onehot.transform(df[self.categorical_cols])
scaler_data = self.scaler.transform(df[self.numerical_cols])
# 標(biāo)簽數(shù)據(jù)處理
label_data = np.concatenate([self.labels_pipe[label_col].transform(df[label_col].values).toarray()
for label_col in self.label_cols], axis=1)
data = np.c_[onehot_data, scaler_data, label_data]
return data
def fit_transform(self, df, y=None):
return self.fit(df, y).transform(df, y)
2. 定義參數(shù)空間
hp.randint
產(chǎn)生的是從0開始的整數(shù),所以定義了 args_tranform
函數(shù)進(jìn)行轉(zhuǎn)換堕仔。
# 自定義hyperopt的參數(shù)空間
space = {"learning_rate": hp.randint("learning_rate", 7),
"max_depth": hp.randint("max_depth", 10),
"num_leaves": hp.randint("num_leaves", 10),
"bagging_fraction": hp.randint("bagging_fraction", 5),
"bagging_freq": hp.randint("bagging_freq", 9),
"feature_fraction": hp.randint("feature_fraction", 5),
"lambda_l1": hp.randint("lambda_l1", 6),
"lambda_l2": hp.randint("lambda_l2", 8),
"cat_smooth": hp.randint("cat_smooth", 20),
"min_data_in_leaf": hp.randint("min_data_in_leaf", 20),
}
def args_tranform(args_dict, is_print=False):
params = dict()
params["learning_rate"] = args_dict["learning_rate"] * 0.02 + 0.01
params["max_depth"] = args_dict["max_depth"] + 3
params["num_leaves"] = args_dict["num_leaves"] * 5 + 5
params["bagging_fraction"] = args_dict["bagging_fraction"] * 0.1 + 0.2
params["bagging_freq"] = args_dict["bagging_freq"] + 1
params["feature_fraction"] = args_dict["feature_fraction"] * 0.1 + 0.5
params["lambda_l1"] = args_dict["lambda_l1"] * 0.1 + 0.1
params["lambda_l2"] = args_dict["lambda_l2"] * 5
params["cat_smooth"] = args_dict["cat_smooth"] + 1
params["min_data_in_leaf"] = args_dict["min_data_in_leaf"] * 20 + 50
params["boosting_type"] = 'gbdt'
params["objective"] = 'binary'
params["metric"] = 'auc'
params["verbosity"] = 0
params["seed"] = 42
params["num_threads"] = 16
if is_print:
print(params)
return params
3. 構(gòu)建模型工廠和目標(biāo)函數(shù)
目標(biāo)函數(shù)我增加了控制過擬合的參數(shù)擂橘,這樣能保證 train_auc
和 val_auc
的差值不會(huì)過大。
def model_fit(argsDict, rate=0.2):
params = args_tranform(argsDict)
model = lgb.train(params, dtrain, 500, dval, early_stopping_rounds=20, verbose_eval=False)
return get_score(model, rate)
def get_score(model, rate):
"""
:param model:
:param rate: 控制過擬合的參數(shù)摩骨,參數(shù)越大表示越嚴(yán)格
:return:
"""
y_val_pred = model.predict(X_val, num_iteration=model.best_iteration)
y_train_pred = model.predict(X_train, num_iteration=model.best_iteration)
train_auc = metrics.roc_auc_score(y_train, y_train_pred)
val_auc = metrics.roc_auc_score(y_val, y_val_pred)
return -val_auc + rate * abs(train_auc - val_auc)
4. 模型訓(xùn)練
通過spark讀取訓(xùn)練數(shù)據(jù)通贞,最后將模型和數(shù)據(jù)轉(zhuǎn)換的pipeline一起寫入到集群文件,也可以將最后結(jié)果寫入到數(shù)據(jù)庫中恼五。
if __name__ == '__main__':
df = spark.sql(sql).toPandas()
# 測(cè)試集劃分
train_set, test_set = train_test_split(df, test_size=0.15, shuffle=True, stratify=df['target'],
random_state=123)
pipeline = FeatureProcess(categorical_cols, numerical_cols, label_cols)
X = pipeline.fit_transform(train_set.drop('target', axis=1))
y = train_set['target']
X_test = pipeline.transform(test_set.drop('target', axis=1))
y_test = test_set['target']
feature_names = pipeline.feature_names
# 訓(xùn)練集和驗(yàn)證集劃分
X_train, X_val, y_train, y_val = train_test_split(X, y.values, test_size=0.15, shuffle=True, stratify=y, random_state=123)
dtrain = lgb.Dataset(data=X_train, label=y_train)
dval = lgb.Dataset(data=X_val, label=y_val, reference=dtrain)
valid_sets = [dtrain, dval]
# 模型訓(xùn)練
algo = partial(tpe.suggest, n_startup_jobs=-1)
best = fmin(model_fit, space, algo=algo, max_evals=50, show_progressbar=True)
lgb_params = args_tranform(best, is_print=True)
best_model = lgb.train(lgb_params, dtrain, 1500, valid_sets, ['train', 'eval'], early_stopping_rounds=50,
verbose_eval=200)
# 結(jié)果指標(biāo)
metric_score = best_model.best_score
metric_score['test'] = {}
y_prob = best_model.predict(X_test, num_iteration=best_model.best_iteration)
metric_score['test']['auc'] = metrics.roc_auc_score(y_test, y_prob)
metric_score['test']['acc'] = metrics.accuracy_score(y_test, (y_prob >= 0.5).astype(int))
# 保存
res_df = pd.DataFrame()
res_df['model_name'] = ['model_v1.0']
res_df['pipeline'] = base64.b64encode(pickle.dumps(pipeline)).strip().decode('utf8')
res_df['model'] = base64.b64encode(pickle.dumps(best_model)).strip().decode('utf8')
res_df['metric_score'] = str(metric_score)
sparkDF = spark.createDataFrame(res_df)
sparkDF.write.format('csv').save(model_path)
# sparkDF.createOrReplaceTempView("model_result")
# spark.sql(insert_sql)