一、大數(shù)據(jù)框架及Spark介紹
1.1 大數(shù)據(jù)框架
大數(shù)據(jù)(Big Data)是指無法在一定時間內(nèi)用常規(guī)軟件工具對其內(nèi)容進行抓取旁瘫、管理和處理的數(shù)據(jù)集合祖凫。大數(shù)據(jù)技術(shù),是指從各種各樣類型的數(shù)據(jù)中境蜕,快速獲得有價值信息的能力蝙场。
自2003年Google公布了3篇大數(shù)據(jù)奠基性論文,為大數(shù)據(jù)存儲及分布式處理的核心問題提供了思路:非結(jié)構(gòu)化文件分布式存儲(GFS)粱年、分布式計算(MapReduce)及結(jié)構(gòu)化數(shù)據(jù)存儲(BigTable)售滤,并奠定了現(xiàn)代大數(shù)據(jù)技術(shù)的理論基礎(chǔ),而后大數(shù)據(jù)技術(shù)便快速發(fā)展台诗,誕生了很多日新月異的技術(shù)完箩。
歸納現(xiàn)有大數(shù)據(jù)框架解決的核心問題及相關(guān)技術(shù)主要為:
- 分布式存儲的問題:有GFS,HDFS等拉队,使得大量的數(shù)據(jù)能橫跨成百上千臺機器弊知;
- 大數(shù)據(jù)計算的問題:有MapReduce、Spark批處理粱快、Flink流處理等秩彤,可以分配計算任務(wù)給各個計算節(jié)點(機器);
- 結(jié)構(gòu)化數(shù)據(jù)存儲及查詢的問題:有Hbase事哭、Bigtable等漫雷,可以快速獲取/存儲結(jié)構(gòu)化的鍵值數(shù)據(jù);
- 大數(shù)據(jù)挖掘的問題:有Hadoop的mahout鳍咱,spark的ml等降盹,可以使用分布式機器學(xué)習(xí)算法挖掘信息;
1.2 Spark的介紹
Spark是一個分布式內(nèi)存批計算處理框架谤辜,Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos)蓄坏,以及Worker Node組成价捧。對于每個Spark應(yīng)用程序,Worker Node上存在一個Executor進程涡戳,Executor進程中包括多個Task線程结蟋。
在執(zhí)行具體的程序時,Spark會將程序拆解成一個任務(wù)DAG(有向無環(huán)圖)妹蔽,再根據(jù)DAG決定程序各步驟執(zhí)行的方法椎眯。該程序先分別從textFile和HadoopFile讀取文件,經(jīng)過一些列操作后再進行join胳岂,最終得到處理結(jié)果编整。
PySpark是Spark的Python API,通過Pyspark可以方便地使用 Python編寫 Spark 應(yīng)用程序乳丰, 其支持 了Spark 的大部分功能掌测,例如 Spark SQL、DataFrame产园、Streaming汞斧、MLLIB(ML)和 Spark Core。
二什燕、PySpark分布式機器學(xué)習(xí)
2.1 PySpark機器學(xué)習(xí)庫
Pyspark中支持兩個機器學(xué)習(xí)庫:mllib及ml粘勒,區(qū)別在于ml主要操作的是DataFrame,而mllib操作的是RDD屎即,即二者面向的數(shù)據(jù)集不一樣庙睡。相比于mllib在RDD提供的基礎(chǔ)操作,ml在DataFrame上的抽象級別更高技俐,數(shù)據(jù)和操作耦合度更低乘陪。
注:mllib在后面的版本中可能被廢棄,本文示例使用的是ml庫雕擂。
pyspark.ml訓(xùn)練機器學(xué)習(xí)庫有三個主要的抽象類:Transformer啡邑、Estimator、Pipeline井赌。
- Transformer主要對應(yīng)feature子模塊谤逼,實現(xiàn)了算法訓(xùn)練前的一系列的特征預(yù)處理工作,例如MinMaxScaler仇穗、word2vec森缠、onehotencoder等,對應(yīng)操作為transform仪缸;
# 舉例:特征加工
from pyspark.ml.feature import VectorAssembler
featuresCreator = VectorAssembler(
inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
outputCol='features'
)
- Estimator對應(yīng)各種機器學(xué)習(xí)算法,主要為分類列肢、回歸恰画、聚類和推薦算法4大類宾茂,具體可選算法大多在sklearn中均有對應(yīng),對應(yīng)操作為fit拴还;
# 舉例:分類模型
from pyspark.ml.classification import LogisticRegression
logistic = LogisticRegression(featuresCol=featuresCreator.getOutputCol(),
labelCol='INFANT_ALIVE_AT_REPORT')
- Pipeline可將一些列轉(zhuǎn)換和訓(xùn)練過程串聯(lián)形成流水線跨晴。
# 舉例:創(chuàng)建流水線
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder, featuresCreator, logistic]) # 特征編碼,特征加工片林,載入LR模型
# 擬合模型
train, test = data.randomSplit([0.7,0.3],seed=123)
model = pipeline.fit(train)
2.2 PySpark分布式機器學(xué)習(xí)原理
在分布式訓(xùn)練中端盆,用于訓(xùn)練模型的工作負(fù)載會在多個微型處理器之間進行拆分和共享,這些處理器稱為工作器節(jié)點费封,通過這些工作器節(jié)點并行工作以加速模型訓(xùn)練焕妙。 分布式訓(xùn)練可用于傳統(tǒng)的 ML 模型,但更適用于計算和時間密集型任務(wù)弓摘,如用于訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)焚鹊。分布式訓(xùn)練有兩種主要類型:數(shù)據(jù)并行及模型并行,主要代表有Spark ML韧献,Parameter Server和TensorFlow末患。
spark的分布式訓(xùn)練的實現(xiàn)為數(shù)據(jù)并行:按行對數(shù)據(jù)進行分區(qū),從而可以對數(shù)百萬甚至數(shù)十億個實例進行分布式訓(xùn)練锤窑。 以其核心的梯度下降算法為例:
1璧针、首先對數(shù)據(jù)劃分至各計算節(jié)點;
2渊啰、把當(dāng)前的模型參數(shù)廣播到各個計算節(jié)點(當(dāng)模型參數(shù)量較大時會比較耗帶寬資源)探橱;
3、各計算節(jié)點進行數(shù)據(jù)抽樣得到mini batch的數(shù)據(jù)虽抄,分別計算梯度走搁,再通過treeAggregate操作匯總梯度,得到最終梯度gradientSum迈窟;
4私植、利用gradientSum更新模型權(quán)重(這里采用的阻斷式的梯度下降方式,當(dāng)各節(jié)點有數(shù)據(jù)傾斜時车酣,每輪的時間起決于最慢的節(jié)點曲稼。這是Spark并行訓(xùn)練效率較低的主要原因)。
PySpark項目實戰(zhàn)
注:單純拿Pyspark練練手湖员,可無需配置Pyspark集群贫悄,直接本地配置下單機Pyspark,也可以使用線上spark集群(如: community.cloud.databricks.com)娘摔。
本項目通過PySpark實現(xiàn)機器學(xué)習(xí)建模全流程:數(shù)據(jù)的載入窄坦,數(shù)據(jù)分析,特征加工,二分類模型訓(xùn)練及評估鸭津。
#!/usr/bin/env python
# coding: utf-8
# 初始化SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark RF example").config("spark.some.config.option", "some-value").getOrCreate()
# 加載數(shù)據(jù)
df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data.csv",header=True)
from pyspark.sql.functions import *
# 數(shù)據(jù)基本信息分析
df.dtypes # Return df column names and data types
df.show() #Display the content of df
df.head() #Return first n rows
df.first() #Return first row
df.take(2) #Return the first n rows
df.schema # Return the schema of df
df.columns # Return the columns of df
df.count() #Count the number of rows in df
df.distinct().count() #Count the number of distinct rows in df
df.printSchema() #Print the schema of df
df.explain() #Print the (logical and physical) plans
df.describe().show() #Compute summary statistics
df.groupBy('Survived').agg(avg("Age"),avg("Fare")).show() # 聚合分析
df.select(df.Sex, df.Survived==1).show() # 帶條件查詢
df.sort("Age", ascending=False).collect() # 排序
#特征加工
df = df.dropDuplicates() # 刪除重復(fù)值
df = df.na.fill(value=0) # 缺失填充值
df = df.na.drop() # 或者刪除缺失值
df = df.withColumn('isMale', when(df['Sex']=='male',1).otherwise(0)) # 新增列:性別0 1
df = df.drop('_c0','Name','Sex') # 刪除姓名彤侍、性別、索引列
# 設(shè)定特征/標(biāo)簽列
from pyspark.ml.feature import VectorAssembler
ignore=['Survived']
vectorAssembler = VectorAssembler(inputCols=[x for x in df.columns
if x not in ignore], outputCol = 'features')
new_df = vectorAssembler.transform(df)
new_df = new_df.select(['features', 'Survived'])
# 劃分測試集訓(xùn)練集
train, test = new_df.randomSplit([0.75, 0.25], seed = 12345)
# 模型訓(xùn)練
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features',
labelCol='Survived')
lr_model = lr.fit(test)
# 模型評估
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = lr_model.transform(test)
auc = BinaryClassificationEvaluator().setLabelCol('Survived')
print('AUC of the model:' + str(auc.evaluate(predictions)))
print('features weights', lr_model.coefficientMatrix)
文章首發(fā)于算法進階逆趋,公眾號閱讀原文可訪問GitHub項目源碼