#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import sys
import subprocess
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
from sklearn.model_selection import train_test_split
conf1 = SparkConf().setAppName("101") \
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.set("spark.sql.shuffle.partitions", "100") \
.set("spark.sql.autoBroadcastJoinThreshold", "100485760") \
.set("spark.sql.inMemoryColumnarStorage.compressed", "true") \
.set("spark.shuffle.file.buffer", "128k") \
.set("reducer.maxSizeInFlight", "96m").set("hive.exec.dynamic.partition.mode", "nonstrict")
spark = SparkSession.builder.config(conf=conf1).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel('error')
從HIVE表中讀取數(shù)據(jù)并且進(jìn)行采樣,再進(jìn)行數(shù)據(jù)的拼接
data = spark.sql("select * from tmp.tmp_shop_feature_tag where shop_id <>-1 and label<>-1")
df = data.toPandas() # j將sparkdataframe轉(zhuǎn)成pandas的dataframe
all_feature = list(df.columns.values) # 得到feature列
# 按正負(fù)樣本的比例劃分訓(xùn)練集和測(cè)試集
print(df[:1])
data_pos = df[df['label'] == 1]
data_neg = df[df['label'] == 0]
print(data_pos[:1])
X_data_pos = data_pos.iloc[:,2:101].values
y_data_pos = data_pos.iloc[:,101:102].values
X_data_neg = data_neg.iloc[:,2:101].values
y_data_neg = data_neg.iloc[:,101:102].values
X_train_pos, X_test_pos, y_train_pos, y_test_pos = train_test_split(X_data_pos, y_data_pos, test_size=0.3, random_state=10)
X_train_neg, X_test_neg, y_train_neg, y_test_neg = train_test_split(X_data_neg, y_data_neg, test_size=0.3, random_state=10)
print("打印X_train_pos的數(shù)據(jù)類(lèi)型--------")
print(X_train_pos.shape)
print("打印X_train_neg的數(shù)據(jù)類(lèi)型--------")
print(X_train_neg.shape)
# 將數(shù)據(jù)集進(jìn)行拼接
X_train = np.vstack((X_train_pos, X_train_neg))
X_test = np.vstack((X_test_pos, X_test_neg) )
y_train = np.vstack((y_train_pos, y_train_neg))
y_test = np.vstack((y_test_pos, y_test_neg) )
y_train = y_train.astype(np.float64)
y_test = y_test.astype(np.float64)
# 打印結(jié)果
print("打印X_train和X_test及y_train和y_test的數(shù)據(jù)類(lèi)型-------")
print(X_train.shape)
print(X_test.shape)
print(y_train[:3])
print(y_test[:3])
X = df.iloc[:,2:123].values
y = df.iloc[:,124:125].values.astype(np.float64)
import xgboost as xgb
dall = xgb.DMatrix(X, y)
print(dall)
# 模型參數(shù)設(shè)置赁遗。尋找最優(yōu)的參數(shù)
# 取數(shù)值型特征構(gòu)建訓(xùn)練集
dtrain = xgb.DMatrix(X_train, y_train)
dtest = xgb.DMatrix(X_test, y_test)
# 自定義xgboost 參數(shù)搜索函數(shù)
def model_fit(params, dtrain, max_round=500, cv_folds=5, n_stop_round=50):
"""對(duì)一組參數(shù)進(jìn)行交叉驗(yàn)證,并返回最優(yōu)迭代次數(shù)和最優(yōu)的結(jié)果撰洗。
Args:
params: dict, xgb 模型參數(shù)捞挥。
見(jiàn) xgb_grid_search_cv 函數(shù)
Returns:
n_round: 最優(yōu)迭代次數(shù)
mean_auc: 最優(yōu)的結(jié)果
"""
cv_result = xgb.cv(params, dtrain, max_round, nfold=cv_folds,
metrics='auc', early_stopping_rounds=n_stop_round, show_stdv=False)
n_round = cv_result.shape[0] # 最優(yōu)模型,最優(yōu)迭代次數(shù)
mean_auc = cv_result['test-auc-mean'].values[-1] # 最好的 AUC
return n_round, mean_auc
def xgb_grid_search_cv(params, key, search_params, dtrain, max_round=500, cv_folds=5,
n_stop_round=10, return_best_model=True, verbose=True):
"""自定義 grid_search_cv for xgboost 函數(shù)诊沪。
Args:
params: dict, xgb 模型參數(shù)养筒。
key: 待搜尋的參數(shù)。
search_params:list, 待搜尋的參數(shù)list端姚。
dtrain: 訓(xùn)練數(shù)據(jù)
max_round: 最多迭代次數(shù)
cv_folds: 交叉驗(yàn)證的折數(shù)
early_stopping_rounds: 迭代多少次沒(méi)有提高則停止晕粪。
return_best_model: if True, 在整個(gè)訓(xùn)練集上使用最優(yōu)的參數(shù)訓(xùn)練模型。
verbose:if True, 打印訓(xùn)練過(guò)程渐裸。
Returns:
cv_results: dict巫湘,所有參數(shù)組交叉驗(yàn)證的結(jié)果。
- mean_aucs: 每組參數(shù)對(duì)應(yīng)的結(jié)果昏鹃。
- n_rounds: 每組參數(shù)最優(yōu)迭代輪數(shù)尚氛。
- list_params: 搜尋的每一組參數(shù)。
- best_mean_auc: 最優(yōu)的結(jié)果盆顾。
- best_round: 最優(yōu)迭代輪數(shù)怠褐。
- best_params: 最優(yōu)的一組參數(shù)。
best_model: XGBoostClassifer()
"""
import time
mean_aucs = list()
n_rounds = list()
list_params = list()
print('Searching parameters: %s %s' % (key, str(values)))
tic = time.time()
for search_param in search_params:
params[key] = search_param
list_params.append(params.copy())
n_round, mean_auc = model_fit(params, dtrain, max_round, cv_folds, n_stop_round)
if verbose:
print('%s=%s: n_round=%d, mean_auc=%g. Time cost %gs' % (key, str(search_param), n_round, mean_auc, time.time() - tic))
mean_aucs.append(mean_auc)
n_rounds.append(n_round)
best_mean_auc = max(mean_aucs)
best_index = mean_aucs.index(best_mean_auc) # 最優(yōu)的一組
best_round = n_rounds[best_index]
best_params = list_params[best_index]
cv_result = {'mean_aucs': mean_aucs, 'n_rounds': n_rounds, 'list_params': list_params,
'best_mean_auc': best_mean_auc, 'best_round': best_round, 'best_params': best_params}
if return_best_model:
best_model = xgb.train(best_params, dtrain, num_boost_round=best_round)
else:
best_model = None
if verbose:
print('best_mean_auc = %g' % best_mean_auc)
print('best_round = %d' % best_round)
print('best_params = %s' % str(best_params))
return cv_result, best_model
params = {'booster': 'gbtree',
'objective': 'binary:logistic',
'subsample': 1,
#'colsample_bytree': 1,
'eta': 0.1,
'max_depth': 8,
'min_child_weight': 1,
'gamma': 0.0,
'silent': 1,
'seed': 0,
'eval_metric': 'auc',
'njob':8
}
首先尋找最佳的步長(zhǎng)和輪數(shù)
key = 'eta'
values = [0.1, 0.2, 0.3]
cv_result, best_model = xgb_grid_search_cv(params, key, values, dtrain)
print('%s: %s' % (key, str(values)))
print('n_round = %s' % str(cv_result['n_rounds']))
print('mean_aucs = %s' % str(cv_result['mean_aucs']))
在上一個(gè)參數(shù)最優(yōu)的基礎(chǔ)上您宪,尋找其他參數(shù)的最優(yōu)值.雖然這樣并沒(méi)有完全進(jìn)行 grid search奈懒,但是一般來(lái)說(shuō),結(jié)果不會(huì)太差宪巨,更重要的是節(jié)省時(shí)間磷杏。
params = cv_result['best_params']
key = 'max_depth'
values = [4, 5, 6, 7, 8]
cv_result, best_model = xgb_grid_search_cv(params, key, values, dtrain)
print('%s: %s' % (key, str(values)))
print('n_round = %s' % str(cv_result['n_rounds']))
使用 xgb_model.predict() 返回每個(gè)樣本分為 正類(lèi) 1 的概率
y_pred_prob = best_model.predict(dtest)
print(X_test.shape, y_pred_prob.shape)
print(y_pred_prob[0])
# 設(shè)置 pred_leaf=True, 返回每個(gè)樣本在每棵樹(shù)上的葉子的 ID
y_pred_prob = best_model.predict(dtest, pred_leaf=True)
print(X_test.shape, y_pred_prob.shape)
print(y_pred_prob[0])
將所有的數(shù)值特征轉(zhuǎn)為 one-hot 類(lèi)型,和原始的類(lèi)別型特征進(jìn)行拼接
from sklearn.preprocessing import OneHotEncoder
from sklearn.linear_model import LogisticRegression
# 編碼成 one-hot 類(lèi)型的數(shù)據(jù)
grd_enc = OneHotEncoder()
# LR 分類(lèi)器
grd_lm = LogisticRegression()
# 每棵樹(shù)的葉子編碼
dall = xgb.DMatrix(X, y)
all_leafs = best_model.predict(dall, pred_leaf=True)
train_leafs = best_model.predict(dtrain, pred_leaf=True)
test_leafs = best_model.predict(dtest, pred_leaf=True)
# 使用所有特征確定編碼規(guī)則
grd_enc.fit(all_leafs)
# one-hot 編碼
oh_train = grd_enc.transform(train_leafs).toarray()
oh_test = grd_enc.transform(test_leafs).toarray()
# 所有特征
X_train_oh = np.hstack([oh_train, X_train])
X_test_oh = np.hstack([oh_test, X_test])
print('X_train_oh.shape=%s' % str(X_train_oh.shape))
print('X_test_oh.shape=%s' % str(X_test_oh.shape))
使用LR進(jìn)行訓(xùn)練
import time
from sklearn.model_selection import cross_val_score
# 使用 LR 進(jìn)行訓(xùn)練
C_params = np.linspace(0.001, 0.05, 10) # 線(xiàn)性劃分參數(shù)捏卓,0.001--5 极祸,劃分20等分 # 0.015 最好
LR_aucs = [] # 存儲(chǔ)cv值
print(C_params)
tic = time.time()
for C_param in C_params:
model = LogisticRegression(C = C_param, penalty='l1', max_iter=300)
scores = cross_val_score(model, X_train_oh, y_train, cv=5, scoring='roc_auc')
LR_aucs.append(scores.mean())
print('C=%g, mean_auc = %g. Time passed %gs' % (C_param, scores.mean(), time.time() - tic))
print("mean_aucs,", LR_aucs)
print("參數(shù)怠晴,", params)
best_index = LR_aucs.index(max(LR_aucs))
print("最好的參數(shù):", C_params[best_index])
print("best_auc = %g" % max(LR_aucs))
# 使用最好的參數(shù)訓(xùn)練最后的模型
LR = LogisticRegression(C=C_params[best_index], penalty='l1', max_iter=100)
LR.fit(X_train_oh, y_train)