- dataframe字段操作
- 打印權(quán)重
- 解析概率
- 模型調(diào)參
- 初始化spark
- 常用缺失值填充
- StringIndexer 多字段處理
dataframe字段操作
字段split為array: withColumn('catenew', split(col('cates'), ','))
打印權(quán)重
rfModel = model_pipe.stages[-1]
attrs = sorted(
(attr['idx'], attr['name']) for attr in
(chain(*df_test_result.schema['features_asb'].metadata['ml_attr']['attrs'].values())) # features_asb為assemble的output
)
feature_weight = [(idx, name, float(rfModel.featureImportances[idx])) for idx, name in attrs]
df_weight = spark.createDataFrame(feature_weight, ['idx', 'feature', 'weight'])
df_weight.orderBy(df_weight.weight.desc()).show(df_weight.count(), truncate=False)
解析概率
from pyspark.sql import functions as F
split_udf = udf(lambda value: float(value[1])) # 需將dataframe的numpy.float64 cast to a python float.
df_result = df_result.withColumn('proba', split_udf('probability')).select('member_id', 'prediction', F.round('proba', 3).alias('proba'))
模型調(diào)參
see databricks
pipeline = Pipeline(stages=[assembler, gbdt])
paramGrid = (ParamGridBuilder()
.addGrid(gbdt.maxDepth, [3, 5, 7])
.addGrid(gbdt.maxIter, [15, 20, 25])
.build()) # 參數(shù)搜索范圍
cv = CrossValidator(estimator=pipeline,estimatorParamMaps=paramGrid,evaluator=BinaryClassificationEvaluator(), numFolds=3)
cvModel = cv.fit(df_train)
df_test_result = cvModel.transform(df_test)
gbdtModel = cvModel.bestModel.stages[-1] # 獲得模型
初始化spark
spark = SparkSession.builder.appName('pspredict').enableHiveSupport().config('spark.driver.memory', '8g').getOrCreate() # jupyter
spark.sparkContext.setLogLevel('ERROR')
常用缺失值填充
(1) replace(to_replace, values, subset)
(2) replace('', 'unknown', 'country_nm')
(3) replace(['a', 'b'], ['c', 'd'], 'country_nm'): 將國家(可列表)中a->c, b->d, ab需同類型,b不能為None
(4) replace({-1: 14}, 'stature'): 將stature的-1->14,values參數(shù)無效,字典里多個需同類型(string與None不能混用)
(5) fillna('haha'): 將null->'haha', 非string值跳過
(6) fillna('xx', [columns_name]): 將多列統(tǒng)一替換na->xx
(7) fillna({'f1': 24, 'f2': 'hah'}): 多列分別替換
StringIndexer 多字段處理
pyspark StringIndexer 輸入列不支持多字段, 考慮使用表達式列表實現(xiàn)
indexer = [StringIndexer(inputCol=x, outputCol='{}_idx'.format(x), handleInvalid='keep') for x in feature_index]