分布式機器學(xué)習(xí)原理及實戰(zhàn)(Pyspark)

一、大數(shù)據(jù)框架及Spark介紹

1.1 大數(shù)據(jù)框架

大數(shù)據(jù)(Big Data)是指無法在一定時間內(nèi)用常規(guī)軟件工具對其內(nèi)容進行抓取旁瘫、管理和處理的數(shù)據(jù)集合祖凫。大數(shù)據(jù)技術(shù),是指從各種各樣類型的數(shù)據(jù)中境蜕,快速獲得有價值信息的能力蝙场。



自2003年Google公布了3篇大數(shù)據(jù)奠基性論文,為大數(shù)據(jù)存儲及分布式處理的核心問題提供了思路:非結(jié)構(gòu)化文件分布式存儲(GFS)粱年、分布式計算(MapReduce)及結(jié)構(gòu)化數(shù)據(jù)存儲(BigTable)售滤,并奠定了現(xiàn)代大數(shù)據(jù)技術(shù)的理論基礎(chǔ),而后大數(shù)據(jù)技術(shù)便快速發(fā)展台诗,誕生了很多日新月異的技術(shù)完箩。



歸納現(xiàn)有大數(shù)據(jù)框架解決的核心問題及相關(guān)技術(shù)主要為:
  • 分布式存儲的問題:有GFS,HDFS等拉队,使得大量的數(shù)據(jù)能橫跨成百上千臺機器弊知;
  • 大數(shù)據(jù)計算的問題:有MapReduce、Spark批處理粱快、Flink流處理等秩彤,可以分配計算任務(wù)給各個計算節(jié)點(機器);
  • 結(jié)構(gòu)化數(shù)據(jù)存儲及查詢的問題:有Hbase事哭、Bigtable等漫雷,可以快速獲取/存儲結(jié)構(gòu)化的鍵值數(shù)據(jù);
  • 大數(shù)據(jù)挖掘的問題:有Hadoop的mahout鳍咱,spark的ml等降盹,可以使用分布式機器學(xué)習(xí)算法挖掘信息;

1.2 Spark的介紹

Spark是一個分布式內(nèi)存批計算處理框架谤辜,Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos)蓄坏,以及Worker Node組成价捧。對于每個Spark應(yīng)用程序,Worker Node上存在一個Executor進程涡戳,Executor進程中包括多個Task線程结蟋。



在執(zhí)行具體的程序時,Spark會將程序拆解成一個任務(wù)DAG(有向無環(huán)圖)妹蔽,再根據(jù)DAG決定程序各步驟執(zhí)行的方法椎眯。該程序先分別從textFile和HadoopFile讀取文件,經(jīng)過一些列操作后再進行join胳岂,最終得到處理結(jié)果编整。



PySpark是Spark的Python API,通過Pyspark可以方便地使用 Python編寫 Spark 應(yīng)用程序乳丰, 其支持 了Spark 的大部分功能掌测,例如 Spark SQL、DataFrame产园、Streaming汞斧、MLLIB(ML)和 Spark Core。

二什燕、PySpark分布式機器學(xué)習(xí)

2.1 PySpark機器學(xué)習(xí)庫

Pyspark中支持兩個機器學(xué)習(xí)庫:mllib及ml粘勒,區(qū)別在于ml主要操作的是DataFrame,而mllib操作的是RDD屎即,即二者面向的數(shù)據(jù)集不一樣庙睡。相比于mllib在RDD提供的基礎(chǔ)操作,ml在DataFrame上的抽象級別更高技俐,數(shù)據(jù)和操作耦合度更低乘陪。

注:mllib在后面的版本中可能被廢棄,本文示例使用的是ml庫雕擂。

pyspark.ml訓(xùn)練機器學(xué)習(xí)庫有三個主要的抽象類:Transformer啡邑、Estimator、Pipeline井赌。

  • Transformer主要對應(yīng)feature子模塊谤逼,實現(xiàn)了算法訓(xùn)練前的一系列的特征預(yù)處理工作,例如MinMaxScaler仇穗、word2vec森缠、onehotencoder等,對應(yīng)操作為transform仪缸;
# 舉例:特征加工
from pyspark.ml.feature import VectorAssembler
featuresCreator = VectorAssembler(
    inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
    outputCol='features'
)

  • Estimator對應(yīng)各種機器學(xué)習(xí)算法,主要為分類列肢、回歸恰画、聚類和推薦算法4大類宾茂,具體可選算法大多在sklearn中均有對應(yīng),對應(yīng)操作為fit拴还;
# 舉例:分類模型
from pyspark.ml.classification import LogisticRegression

logistic = LogisticRegression(featuresCol=featuresCreator.getOutputCol(),
                                labelCol='INFANT_ALIVE_AT_REPORT')
  • Pipeline可將一些列轉(zhuǎn)換和訓(xùn)練過程串聯(lián)形成流水線跨晴。
# 舉例:創(chuàng)建流水線
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[encoder, featuresCreator, logistic]) # 特征編碼,特征加工片林,載入LR模型
# 擬合模型
train, test = data.randomSplit([0.7,0.3],seed=123)
model = pipeline.fit(train)

2.2 PySpark分布式機器學(xué)習(xí)原理

在分布式訓(xùn)練中端盆,用于訓(xùn)練模型的工作負(fù)載會在多個微型處理器之間進行拆分和共享,這些處理器稱為工作器節(jié)點费封,通過這些工作器節(jié)點并行工作以加速模型訓(xùn)練焕妙。 分布式訓(xùn)練可用于傳統(tǒng)的 ML 模型,但更適用于計算和時間密集型任務(wù)弓摘,如用于訓(xùn)練深度神經(jīng)網(wǎng)絡(luò)焚鹊。分布式訓(xùn)練有兩種主要類型:數(shù)據(jù)并行及模型并行,主要代表有Spark ML韧献,Parameter Server和TensorFlow末患。


spark的分布式訓(xùn)練的實現(xiàn)為數(shù)據(jù)并行:按行對數(shù)據(jù)進行分區(qū),從而可以對數(shù)百萬甚至數(shù)十億個實例進行分布式訓(xùn)練锤窑。 以其核心的梯度下降算法為例:
1璧针、首先對數(shù)據(jù)劃分至各計算節(jié)點;
2渊啰、把當(dāng)前的模型參數(shù)廣播到各個計算節(jié)點(當(dāng)模型參數(shù)量較大時會比較耗帶寬資源)探橱;
3、各計算節(jié)點進行數(shù)據(jù)抽樣得到mini batch的數(shù)據(jù)虽抄,分別計算梯度走搁,再通過treeAggregate操作匯總梯度,得到最終梯度gradientSum迈窟;
4私植、利用gradientSum更新模型權(quán)重(這里采用的阻斷式的梯度下降方式,當(dāng)各節(jié)點有數(shù)據(jù)傾斜時车酣,每輪的時間起決于最慢的節(jié)點曲稼。這是Spark并行訓(xùn)練效率較低的主要原因)。


PySpark項目實戰(zhàn)

注:單純拿Pyspark練練手湖员,可無需配置Pyspark集群贫悄,直接本地配置下單機Pyspark,也可以使用線上spark集群(如: community.cloud.databricks.com)娘摔。

本項目通過PySpark實現(xiàn)機器學(xué)習(xí)建模全流程:數(shù)據(jù)的載入窄坦,數(shù)據(jù)分析,特征加工,二分類模型訓(xùn)練及評估鸭津。

#!/usr/bin/env python
# coding: utf-8


#  初始化SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Spark RF example").config("spark.some.config.option", "some-value").getOrCreate()

# 加載數(shù)據(jù)
df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data.csv",header=True)

from pyspark.sql.functions import *

# 數(shù)據(jù)基本信息分析
df.dtypes # Return df column names and data types
df.show()  #Display the content of df
df.head()  #Return first n rows
df.first()  #Return first row 
df.take(2)  #Return the first n rows
df.schema   # Return the schema of df
df.columns # Return the columns of df
df.count()  #Count the number of rows in df
df.distinct().count()  #Count the number of distinct rows in df
df.printSchema()  #Print the schema of df
df.explain()  #Print the (logical and physical)  plans
df.describe().show()  #Compute summary statistics 
df.groupBy('Survived').agg(avg("Age"),avg("Fare")).show()  # 聚合分析
df.select(df.Sex, df.Survived==1).show()  # 帶條件查詢 
df.sort("Age", ascending=False).collect() # 排序

#特征加工
df = df.dropDuplicates()   # 刪除重復(fù)值
df = df.na.fill(value=0)  # 缺失填充值
df = df.na.drop()        # 或者刪除缺失值
df = df.withColumn('isMale', when(df['Sex']=='male',1).otherwise(0)) # 新增列:性別0 1
df = df.drop('_c0','Name','Sex') # 刪除姓名彤侍、性別、索引列

# 設(shè)定特征/標(biāo)簽列
from pyspark.ml.feature import VectorAssembler
ignore=['Survived']
vectorAssembler = VectorAssembler(inputCols=[x for x in df.columns  
                  if x not in ignore], outputCol = 'features')
new_df = vectorAssembler.transform(df)
new_df = new_df.select(['features', 'Survived'])

# 劃分測試集訓(xùn)練集
train, test = new_df.randomSplit([0.75, 0.25], seed = 12345)

# 模型訓(xùn)練
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', 
                         labelCol='Survived')
lr_model = lr.fit(test)

# 模型評估
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = lr_model.transform(test)
auc = BinaryClassificationEvaluator().setLabelCol('Survived')
print('AUC of the model:' + str(auc.evaluate(predictions)))
print('features weights', lr_model.coefficientMatrix)

文章首發(fā)于算法進階逆趋,公眾號閱讀原文可訪問GitHub項目源碼

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末盏阶,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子闻书,更是在濱河造成了極大的恐慌名斟,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件魄眉,死亡現(xiàn)場離奇詭異砰盐,居然都是意外死亡,警方通過查閱死者的電腦和手機杆融,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門楞卡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人脾歇,你說我怎么就攤上這事蒋腮。” “怎么了藕各?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵池摧,是天一觀的道長。 經(jīng)常有香客問我激况,道長作彤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任乌逐,我火速辦了婚禮竭讳,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘浙踢。我一直安慰自己绢慢,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布洛波。 她就那樣靜靜地躺著胰舆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蹬挤。 梳的紋絲不亂的頭發(fā)上缚窿,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天,我揣著相機與錄音焰扳,去河邊找鬼倦零。 笑死误续,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的扫茅。 我是一名探鬼主播女嘲,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼诞帐!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起爆雹,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤停蕉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后钙态,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體慧起,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年册倒,在試婚紗的時候發(fā)現(xiàn)自己被綠了蚓挤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡驻子,死狀恐怖灿意,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情崇呵,我是刑警寧澤缤剧,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站域慷,受9級特大地震影響荒辕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜犹褒,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一抵窒、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧叠骑,春花似錦李皇、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至朦拖,卻和暖如春圃阳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背璧帝。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工捍岳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓锣夹,卻偏偏與公主長得像页徐,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子银萍,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容