macOS Sierra 10.12.4
Spark 1.6.2
Python 2.7
轉(zhuǎn)載請(qǐng)注明出處:http://blog.csdn.net/MrLevo520/article/details/76087612
前言
既然做了Hive的整理,那就把spark的也整理下吧薯演,當(dāng)做入門指南和自己的筆記吧~與君共勉
Spark基礎(chǔ)
Spark是什么翠勉?
? Spark是UC Berkeley AMP lab所開源的類Hadoop MapReduce的通用分布式并行計(jì)算框架朦蕴。Spark擁有hadoop MapReduce所具有的優(yōu)點(diǎn)疏尿,但和MapReduce 的最大不同之處在于Spark是基于內(nèi)存的迭代式計(jì)算——Spark的Job處理的中間輸出結(jié)果可以保存在內(nèi)存中榆鼠,從而不再需要讀寫HDFS震鹉,除此之外俱笛,一個(gè)MapReduce 在計(jì)算過(guò)程中只有map 和reduce 兩個(gè)階段,處理之后就結(jié)束了传趾,而在Spark的計(jì)算模型中迎膜,可以分為n階段,因?yàn)樗鼉?nèi)存迭代式的浆兰,我們?cè)谔幚硗暌粋€(gè)階段以后磕仅,可以繼續(xù)往下處理很多個(gè)階段,而不只是兩個(gè)階段簸呈。
因此Spark能更好地適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等需要迭代的MapReduce的算法榕订。其不僅實(shí)現(xiàn)了MapReduce的算子map 函數(shù)和reduce函數(shù)及計(jì)算模型,還提供更為豐富的算子蜕便,如filter劫恒、join、groupByKey等轿腺。是一個(gè)用來(lái)實(shí)現(xiàn)快速而同用的集群計(jì)算的平臺(tái)两嘴。— From Spark 工作原理及核心RDD 詳解
Spark原理過(guò)程
-
使用spark-submit提交一個(gè)Spark作業(yè)之后族壳,這個(gè)作業(yè)就會(huì)啟動(dòng)一個(gè)對(duì)應(yīng)的Driver進(jìn)程溶诞。
- Driver:運(yùn)行Application的main()函數(shù)并且創(chuàng)建SparkContext
-
Driver根據(jù)我們?cè)O(shè)置的參數(shù)(比如說(shuō)設(shè)定任務(wù)隊(duì)列,設(shè)定最大內(nèi)存等)Cluster Manager 申請(qǐng)運(yùn)行Spark作業(yè)需要使用的資源决侈,這里的資源指的就是Executor進(jìn)程螺垢,YARN集群管理器會(huì)根據(jù)我們?yōu)镾park作業(yè)設(shè)置的資源參數(shù)喧务,在各個(gè)工作節(jié)點(diǎn)上,啟動(dòng)一定數(shù)量的Executor進(jìn)程枉圃,每個(gè)Executor進(jìn)程都占有一定數(shù)量的內(nèi)存和CPU core功茴。
- Executor:是為某Application運(yùn)行在Worker Node上的一個(gè)進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行Task孽亲,并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上坎穿,每個(gè)Application都有各自獨(dú)立的Executors
- Cluster Manager:集群管理器,在集群上獲取資源的外部服務(wù)(例如:Local返劲、Standalone玲昧、Mesos或Yarn等集群管理系統(tǒng))
-
申請(qǐng)到了作業(yè)執(zhí)行所需的資源之后,river進(jìn)程會(huì)將我們編寫的Spark作業(yè)代碼分拆為多個(gè)stage篮绿,每個(gè)stage執(zhí)行一部分代碼片段孵延,并為每個(gè)stage創(chuàng)建一批task,然后將這些task分配到各個(gè)Executor進(jìn)程中執(zhí)行亲配,一個(gè)stage的所有task都執(zhí)行完畢之后尘应,會(huì)在各個(gè)節(jié)點(diǎn)本地的磁盤文件中寫入計(jì)算中間結(jié)果,然后Driver就會(huì)調(diào)度運(yùn)行下一個(gè)stage吼虎。下一個(gè)stage的task的輸入數(shù)據(jù)就是上一個(gè)stage輸出的中間結(jié)果犬钢。如此循環(huán)往復(fù),直到將我們自己編寫的代碼邏輯全部執(zhí)行完
-
task:最小的計(jì)算單元思灰,負(fù)責(zé)執(zhí)行一模一樣的計(jì)算邏輯(也就是我們自己編寫的某個(gè)代碼片段)
?
-
什么是RDD?
全稱為彈性分布式數(shù)據(jù)集;本質(zhì)上玷犹,RDD是種編程抽象,代表可以跨機(jī)器進(jìn)行分割的只讀對(duì)象集合洒疚。RDD可以從一個(gè)繼承結(jié)構(gòu)(lineage)重建(因此可以容錯(cuò))歹颓,通過(guò)并行操作訪問(wèn),可以讀寫HDFS或S3這樣的分布式存儲(chǔ)拳亿,更重要的是晴股,可以緩存到worker節(jié)點(diǎn)的內(nèi)存中進(jìn)行立即重用。由于RDD可以被緩存在內(nèi)存中肺魁,Spark對(duì)迭代應(yīng)用特別有效电湘,因?yàn)檫@些應(yīng)用中,數(shù)據(jù)是在整個(gè)算法運(yùn)算過(guò)程中都可以被重用鹅经。大多數(shù)機(jī)器學(xué)習(xí)和最優(yōu)化算法都是迭代的寂呛,使得Spark對(duì)數(shù)據(jù)科學(xué)來(lái)說(shuō)是個(gè)非常有效的工具。另外瘾晃,由于Spark非炒荆快,可以通過(guò)類似Python REPL的命令行提示符交互式訪問(wèn)蹦误。
RDD特點(diǎn)
- RDD在抽象上來(lái)說(shuō)是一種元素集合劫拢,包含了數(shù)據(jù)肉津。它是被分區(qū)的,分為多個(gè)分區(qū)舱沧,每個(gè)分區(qū)分布在集群中的不同節(jié)點(diǎn)上妹沙,從而讓RDD中的數(shù)據(jù)可以被并行操作。(分布式數(shù)據(jù)集)
- RDD的數(shù)據(jù)默認(rèn)情況下存放在內(nèi)存中的熟吏,但是在內(nèi)存資源不足時(shí)距糖,Spark會(huì)自動(dòng)將RDD數(shù)據(jù)寫入磁盤。比如每個(gè)節(jié)點(diǎn)最多放5萬(wàn)數(shù)據(jù)牵寺,結(jié)果你每個(gè)partition是10萬(wàn)數(shù)據(jù)悍引。那么就會(huì)把partition中的部分?jǐn)?shù)據(jù)寫入磁盤上,進(jìn)行保存帽氓。(彈性)
- RDD將操作分為兩類:transformation與action趣斤。無(wú)論執(zhí)行了多少次transformation操作,RDD都不會(huì)真正執(zhí)行運(yùn)算杏节,只有當(dāng)action操作被執(zhí)行時(shí)唬渗,運(yùn)算才會(huì)觸發(fā)典阵。而在RDD的內(nèi)部實(shí)現(xiàn)機(jī)制中奋渔,底層接口則是基于迭代器的,從而使得數(shù)據(jù)訪問(wèn)變得更高效壮啊,也避免了大量中間結(jié)果對(duì)內(nèi)存的消耗嫉鲸。
- RDD最重要的特性就是,提供了容錯(cuò)性歹啼,可以自動(dòng)從節(jié)點(diǎn)失敗中恢復(fù)過(guò)來(lái)玄渗。即如果某個(gè)節(jié)點(diǎn)上的RDD partition,因?yàn)楣?jié)點(diǎn)故障狸眼,導(dǎo)致數(shù)據(jù)丟了藤树,那么RDD會(huì)自動(dòng)通過(guò)自己的數(shù)據(jù)來(lái)源重新計(jì)算該partition。這一切對(duì)使用者是透明的拓萌。
RDD在Spark中的地位及作用
這需要從四個(gè)方面闡述
-
為什么會(huì)有Spark岁钓?
因?yàn)閭鹘y(tǒng)的并行計(jì)算模型無(wú)法有效的解決迭代計(jì)算(iterative)和交互式計(jì)算(interactive);而Spark的使命便是解決這兩個(gè)問(wèn)題微王,這也是他存在的價(jià)值和理由屡限。
-
Spark如何解決迭代計(jì)算?
其主要實(shí)現(xiàn)思想就是RDD炕倘,把所有計(jì)算的數(shù)據(jù)保存在分布式的內(nèi)存中钧大。迭代計(jì)算通常情況下都是對(duì)同一個(gè)數(shù)據(jù)集做反復(fù)的迭代計(jì)算,數(shù)據(jù)在內(nèi)存中將大大提升IO操作罩旋。這也是Spark涉及的核心:內(nèi)存計(jì)算啊央。
-
Spark如何實(shí)現(xiàn)交互式計(jì)算眶诈?
因?yàn)镾park是用scala語(yǔ)言實(shí)現(xiàn)的,Spark和scala能夠緊密的集成瓜饥,所以Spark可以完美的運(yùn)用scala的解釋器册养,使得其中的scala可以向操作本地集合對(duì)象一樣輕松操作分布式數(shù)據(jù)集。當(dāng)然你也可以使用python压固,java球拦,R等接口,spark也提供了相應(yīng)的操作方式
-
Spark和RDD的關(guān)系帐我?
可以理解為:RDD是一種具有容錯(cuò)性基于內(nèi)存的集群計(jì)算抽象方法坎炼,Spark則是這個(gè)抽象方法的實(shí)現(xiàn)。
如何操作RDD拦键?
Step1-獲取RDD
- 自己創(chuàng)建個(gè)RDD谣光,如以下語(yǔ)句:
rdd = sc.parallelize(['1,2,3,4','5,6,6','9,10,11'])
- 從共享的文件系統(tǒng)獲取,(如:HDFS)
- 通過(guò)已存在的RDD轉(zhuǎn)換
- 將已存在scala集合(只要是Seq對(duì)象)并行化,通過(guò)調(diào)用SparkContext的parallelize方法實(shí)現(xiàn)
- 改變現(xiàn)有RDD的之久性;RDD是懶散,短暫的.(RDD的固化:cache緩存至內(nèi)錯(cuò);save保存到分布式文件系統(tǒng))
Step2-操作RDD
Transformation:根據(jù)數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集,計(jì)算后返回一個(gè)新RDD芬为;例如:Map將數(shù)據(jù)的每個(gè)元素經(jīng)過(guò)某個(gè)函數(shù)計(jì)算后萄金,返回一個(gè)新的分布式數(shù)據(jù)集即RDD。
值得注意的是媚朦,RDD的轉(zhuǎn)化操作都是惰性求值得氧敢,也就意味著在被調(diào)用行動(dòng)操作之前Spark不會(huì)開始計(jì)算,相反询张,Spark會(huì)在內(nèi)部記錄下所要求執(zhí)行的操作的相關(guān)信息孙乖,因此在調(diào)用sc.textFile()時(shí)候,數(shù)據(jù)并沒(méi)有讀取進(jìn)來(lái)份氧,而是在必要的時(shí)候才會(huì)進(jìn)行讀取唯袄。所以也就導(dǎo)致了導(dǎo)入文件的時(shí)候感覺(jué)很快的錯(cuò)覺(jué)
- Transformation的一些例子
def func(a):
line_split = a.split(",")
return sum(map(int,line_split))
data = sc.parallelize(['1,2,3,4','5,6,6','9,10,11']) # 生成rdd
t_rdd= data.map(func) # rdd的Transformation過(guò)程
a_rdd = t_rdd.collect() # action過(guò)程 [10, 17, 30]
Actions:對(duì)數(shù)據(jù)集計(jì)算后返回一個(gè)數(shù)值value給驅(qū)動(dòng)程序;例如:Reduce將數(shù)據(jù)集的所有元素用某個(gè)函數(shù)聚合后蜗帜,將最終結(jié)果返回給程序恋拷。返回的是一個(gè)新的數(shù)據(jù)類型,這里注意的是厅缺,返回的并不是新的RDD蔬顾,只有Transformation之后是新的RDD
- Actions具體內(nèi)容
spark執(zhí)行步驟
- 定義一個(gè)或多個(gè)RDD,可以通過(guò)獲取存儲(chǔ)在磁盤上的數(shù)據(jù)(HDFS店归,Cassandra阎抒,HBase,Local Disk)消痛,并行化內(nèi)存中的某些集合且叁,轉(zhuǎn)換(transform)一個(gè)已存在的RDD,或者秩伞,緩存或保存逞带。
- 通過(guò)傳遞一個(gè)閉包(函數(shù))給RDD上的每個(gè)元素來(lái)調(diào)用RDD上的操作欺矫。Spark提供了除了Map和Reduce的80多種高級(jí)操作。
- 使用結(jié)果RDD的動(dòng)作(action)(如count展氓、collect穆趴、save等)。動(dòng)作將會(huì)啟動(dòng)集群上的worker機(jī)器進(jìn)行計(jì)算遇汞。
當(dāng)Spark在一個(gè)worker上運(yùn)行閉包時(shí)未妹,閉包中用到的所有變量都會(huì)被拷貝到節(jié)點(diǎn)上,但是由閉包的局部作用域來(lái)維護(hù)空入。Spark提供了兩種類型的共享變量络它,這些變量可以按照限定的方式被所有worker訪問(wèn)。廣播變量會(huì)被分發(fā)給所有worker歪赢,但是是只讀的化戳。累加器這種變量,worker可以使用關(guān)聯(lián)操作來(lái)“加”埋凯,通常用作計(jì)數(shù)器点楼。
Spark實(shí)際操作
那么, sc的是什么鬼?
你可以把他理解成由
SparkContext
構(gòu)造出來(lái)的實(shí)例白对,通過(guò)這個(gè)實(shí)例我們可以構(gòu)造自己的RDD
# -*- coding:utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import math
appName ="hellospark" #你的應(yīng)用程序名稱
master= "local"#設(shè)置單機(jī)
conf = SparkConf().setAppName(appName).setMaster(master)#配置SparkContext
sc = SparkContext(conf=conf)
# 一個(gè)簡(jiǎn)單的wordcount測(cè)試
str_ = '''this is a word count test only test show twice'''
data = sc.parallelize(str_.split(" "))
data.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collect()
# spark.akka.frameSize: 控制Spark中通信消息的最大容量 (如 task 的輸出結(jié)果)掠廓,默認(rèn)為10M。當(dāng)處理大數(shù)據(jù)時(shí)躏结,task 的輸出可能會(huì)大于這個(gè)值却盘,需要根據(jù)實(shí)際數(shù)據(jù)設(shè)置一個(gè)更高的值狰域。
# SparkConf為Spark配置類媳拴,配置已鍵值對(duì)形式存儲(chǔ),封裝了一個(gè)ConcurrentHashMap類實(shí)例settings用于存儲(chǔ)Spark的配置信息兆览;配置項(xiàng)包括:master屈溉、appName、Jars抬探、ExecutorEnv等等
# SparkContext用于連接Spark集群子巾、創(chuàng)建RDD、累加器(accumlator)小压、廣播變量(broadcast variables)线梗,所以說(shuō)SparkContext為Spark程序的根
注意sc的構(gòu)造是怎么來(lái)的
方法一:在jupyter中操作(推薦)
當(dāng)然,你得把pyspark的kernel配到j(luò)upyter中怠益,可參考解決:win遠(yuǎn)程連接ubuntu服務(wù)器安裝jupyter仪搔,啟動(dòng)pyspark
方法二:使用spark-submit pythonfile.py
來(lái)實(shí)現(xiàn)提交python腳本操作
# 前提是在一個(gè)文件夾中,不然要定位文件位置
$ spark-submit --driver-memory 6G --queue 如果有隊(duì)列填上隊(duì)列名字 testpy.py 可帶參數(shù)
# pyspark test
# 中文測(cè)試
方法三:使用ipython在pyspark的shell中操作
# 啟動(dòng)local spark:pyspark --master local[2]
# local[2]是開雙核的意思蜻牢,[4]即是開4核
xiaoju@map-traffic-spd131.gz01:~$ pyspark --master local[2]
In [8]: line = sc.textFile("file:/home/xiaoju/user/xukai/test.tx #創(chuàng)建RDD載入的路徑這里是機(jī)器路徑
...: t")
In [9]: pythonlines = line.filter(lambda line:"test" in line) # 轉(zhuǎn)化操作
In [10]: pythonlines.first() # 行動(dòng)操作
Out[10]: u'test;'
# 當(dāng)一個(gè)文本讀取為RDD時(shí)烤咧,輸入的每一行都會(huì)成為RDD的一個(gè)元素
In [21]: line.first()
Out[21]: u'this is a test txtfile!'
In [24]: print line.first().split(" ")[0] # 這樣就可以流暢使用python進(jìn)行操作了偏陪,只是導(dǎo)入的時(shí)候用的是RDD存儲(chǔ)
this
In [26]: stringlist = line.first().split(" ")
In [27]: nums = sc.parallelize(stringlist) # 用sparkContext的parallelize制作RDD的,是ParallelCollectionRDD煮嫌,創(chuàng)建一個(gè)并行集合笛谦。
In [28]: squared = nums.map(lambda x:x=="this").collect()
In [29]: for num in squared:
...: print num
...:
True
False
False
False
False
In [34]: words = lines.flatMap(lambda line:line.split(" ")).collec
...: t() # 使用collece()才能進(jìn)行for輸出,flatmap文件中的所有行數(shù)據(jù)僅返回了一個(gè)數(shù)組對(duì)象
In [35]: for i in words:
...: print i
...:
this
is
a
test
# 產(chǎn)生新的鍵值對(duì)pair類型RDD
In [56]: rdd = sc.parallelize([1,2,3,3])
# 操作過(guò)程中昌阿,轉(zhuǎn)化并不會(huì)被執(zhí)行饥脑,需要有個(gè)動(dòng)作操作才被執(zhí)行,比如collect()
In [57]: rdd.collect()
Out[57]: [1, 2, 3, 3]
In [59]: rdd1 = rdd.map(lambda x:(x,x+1))
In [60]: rdd1.collect()
Out[60]: [(1, 2), (2, 3), (3, 4), (3, 4)]
In [64]: rdd2 = rdd1.filter(lambda x: x[0]>2)
In [65]: rdd2.collect()
Out[65]: [(3, 4), (3, 4)]
In [67]: rdd1.sortByKey().collect()
Out[67]: [(1, 2), (2, 3), (3, 4), (3, 4)]
一些Demo
# -*- coding:utf-8 -*-
# 如果使用jupyter的話懦冰,sc已經(jīng)構(gòu)造好了好啰,不需要再倒入包
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import math
appName ="hellospark" #你的應(yīng)用程序名稱
master= "local"#設(shè)置單機(jī)
conf = SparkConf().setAppName(appName).setMaster(master)#配置SparkContext
sc = SparkContext(conf=conf)
# parallelize:并行化數(shù)據(jù),轉(zhuǎn)化為RDD
# 并行集合的一個(gè)重要參數(shù)是slices儿奶,表示數(shù)據(jù)集切分的份數(shù)框往。Spark將會(huì)在集群上為每一份數(shù)據(jù)起一個(gè)任務(wù)。
# 典型地闯捎,你可以在集群的每個(gè)CPU上分布2-4個(gè)slices. 一般來(lái)說(shuō)椰弊,Spark會(huì)嘗試根據(jù)集群的狀況,
# 來(lái)自動(dòng)設(shè)定slices的數(shù)目瓤鼻。
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data, numSlices=10) # numSlices為分塊數(shù)目秉版,根據(jù)集群數(shù)進(jìn)行分塊
#--------------------------------------------------
# textFile讀取外部數(shù)據(jù)
rdd = sc.textFile("file:/data/map_da/xukai/sparkstreaming/test/test.txt") # 以行為單位讀取外部文件,并轉(zhuǎn)化為RDD
print rdd.collect()
# 打印出的結(jié)果是 [u'lslsllslsiiiiiiiiiii']
#--------------------------------------------------
# map:迭代茬祷,對(duì)數(shù)據(jù)集中數(shù)據(jù)進(jìn)行單獨(dú)操作
def my_add(l):
return (l,l)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data) # 并行化數(shù)據(jù)集
result = distData.map(my_add)
print (result.collect()) # 返回一個(gè)分布數(shù)據(jù)集
# 打印出的結(jié)果 [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]
#--------------------------------------------------
# filter:過(guò)濾數(shù)據(jù)
def my_add(l):
result = False
if l > 2:
result = True
return result
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)#并行化數(shù)據(jù)集清焕,分片
result = distData.filter(my_add)
print (result.collect())#返回一個(gè)分布數(shù)據(jù)集
# [3, 4, 5]
# zip:將兩個(gè)RDD對(duì)應(yīng)元素組合為元組
#--------------------------------------------------
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
print x.zip(y).collect()
# [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
#union 組合兩個(gè)RDD
print x.union(x).collect()
# [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
# Aciton操作
#--------------------------------------------------
# collect:返回RDD中的數(shù)據(jù)
rdd = sc.parallelize(range(1, 10))
print rdd
print rdd.collect()
# ParallelCollectionRDD[11] at parallelize at PythonRDD.scala:423
# [1, 2, 3, 4, 5, 6, 7, 8, 9]
#--------------------------------------------------
# collectAsMap:以rdd元素為元組,以元組中一個(gè)元素作為索引返回RDD中的數(shù)據(jù)
m = sc.parallelize([('a', 2), (3, 4)]).collectAsMap()
print m['a']
print m[3]
#2
#4
#--------------------------------------------------
# groupby函數(shù):根據(jù)提供的方法為RDD分組:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
def fun(i):
return i % 2
result = rdd.groupBy(fun).collect()
print [(x, sorted(y)) for (x, y) in result]
# [(0, [2, 8]), (1, [1, 1, 3, 5])]
#--------------------------------------------------
# reduce:對(duì)數(shù)據(jù)集進(jìn)行運(yùn)算
rdd = sc.parallelize(range(1, 10))
result = rdd.reduce(lambda a, b: a + b)
print result
# 45
#--------------------------------------------------
a = sc.parallelize([i for i in range(9)], 3)
print a.collect()
#[0, 1, 2, 3, 4, 5, 6, 7, 8]
y = a.map(lambda a:(a,a*2)) # 需要的表現(xiàn)形式為(a,a*2)的形式祭犯,而a是傳遞的參數(shù)
print y.collect()
#[(0, 0), (1, 2), (2, 4), (3, 6), (4, 8), (5, 10), (6, 12), (7, 14), (8, 16)]
z = a.map(lambda a:a*2)
print print z.collect()
#[0, 2, 4, 6, 8, 10, 12, 14, 16]
y = a.flatMap(lambda a:(a*2,a*3))
print y.collect();
#[0, 0, 2, 3, 4, 6, 6, 9, 8, 12, 10, 15, 12, 18, 14, 21, 16, 24]
#--------------------------------------------------
# union有點(diǎn)像append
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['D','C','A'])
z = x.union(y)
z2 = x.intersection(y)
print(x.collect())
#['A', 'A', 'B']
print(y.collect())
#['D', 'C', 'A']
print(z.collect())
#['A', 'A', 'B', 'D', 'C', 'A']
print(z2.collect())
# ['A']
#--------------------------------------------------
x = sc.parallelize([1,2,3])
y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )
print(x.collect())
#[1, 2, 3]
print([(j[0],[i for i in j[1]]) for j in y.collect()])
#[('A', [1, 3]), ('B', [2])]
#--------------------------------------------------
x=sc.parallelize([1,3,1,2,3])
y=x.countByValue()
print y[1]
#2
#--------------------------------------------------
# 按升序排秸妥,取前n個(gè)
x = sc.parallelize([1,3,1,2,3,4,1,6])
y=x.takeOrdered(5)
# [1, 1, 1, 2, 3]
小結(jié)
- 總的來(lái)說(shuō),RDD之所以被描述為"彈性"沃粗,是因?yàn)樵谌魏螘r(shí)候都能進(jìn)行重算粥惧,因?yàn)楸4鍾DD數(shù)據(jù)的一臺(tái)機(jī)器失敗時(shí),Spark可以使用這種特性來(lái)重算出丟棄的部分分區(qū)最盅。
- 轉(zhuǎn)化RDD的時(shí)候突雪,是返回新的RDD而不是對(duì)現(xiàn)有的RDD進(jìn)行操作,只有在執(zhí)行動(dòng)作的時(shí)候返回的是其他數(shù)據(jù)類型 涡贱。
Spark進(jìn)階
這里會(huì)總結(jié)下我以前實(shí)習(xí)時(shí)候用到過(guò)的一些處理方法
使用MySqldb+Pyspark操作Mysql
- 首先得知道咏删,這個(gè)數(shù)據(jù)庫(kù)在哪,也就是數(shù)據(jù)庫(kù)所在服務(wù)器的ip地址问词,才能進(jìn)行連接
# 使用ping命令進(jìn)行所需要連接的數(shù)據(jù)庫(kù)的ip地址獲取
$ ping 服務(wù)器
PING xxxxxx bytes of data.
64 bytes from xxxxxx: icmp_seq=1 ttl=64 time=0.020 ms
- 使用Mysqldb包進(jìn)行數(shù)據(jù)庫(kù)的連接操作
In [22]: import MySQLdb
In [23]: conn = MySQLdb.connect(host=ip地址,user=用戶名
...: ,passwd=密碼,db='test',charset='utf8')
# 這邊連接的時(shí)候最好制定數(shù)據(jù)庫(kù)督函,即添加 db="test",charset="utf8",如不制定,則在sql語(yǔ)句中選擇上指定的數(shù)據(jù)庫(kù)名字
In [24]: cursor = conn.cursor()
In [25]: count = cursor.execute("select count(*) from test_uk ")
In [26]: print cursor.fetchall()
((8L,),)
# 打開服務(wù)器上的Mysql查看一下,ok侨核,沒(méi)問(wèn)題草穆,獲取行數(shù)正確
mysql> select * from test_uk;
+----+----+-------+
| id | tp | value |
+----+----+-------+
| 3 | 1 | 0.75 |
| 4 | 5 | 0 |
| 5 | 6 | 2 |
| 6 | 4 | 0 |
| 9 | 7 | 3 |
| 11 | 9 | 2 |
| 12 | 10 | 11 |
| 14 | 13 | 13 |
+----+----+-------+
8 rows in set (0.00 sec)
# 寫入test數(shù)據(jù)庫(kù)中的tbl_realtime_statis表,記得需要提交
mysql> desc tbl_realtime_statis
+-------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| id | int(11) | YES | | NULL | |
| value | double | YES | | NULL | |
+-------+---------+------+-----+---------+-------+
2 rows in set (0.00 sec)
# 開始執(zhí)行insert動(dòng)作
In [38]: cursor.execute('INSERT INTO tbl_realtime_statis (id,value) VALUE
...: S (1,11)')
Out[38]: 1L
In [39]: cursor.connection.commit()
mysql> select * from tbl_realtime_statis;
+------+-------+
| id | value |
+------+-------+
| 1 | 11 |
+------+-------+
1 row in set (0.00 sec)
更多語(yǔ)句參考:python使用mysqldb連接數(shù)據(jù)庫(kù)操作方法示例詳解
使用spark-submit pythonfile來(lái)執(zhí)行本地txt寫入指定數(shù)據(jù)庫(kù)的操作
在文件同目錄下創(chuàng)建名為txttosql.py的python腳本搓译,填寫如下
conn = MySQLdb.connect(host='',user='',passwd='',db='test',charset='utf8')
cursor = conn.cursor()
datapath = "test.txt"
with open(datapath) as f:
for line in f.readlines():
linesplit = line.strip().split(",")
key = int(linesplit[0])
value = int(linesplit[1])
sqlstring = 'INSERT INTO tbl_realtime_statis (id,value) VALUES (%d,%d)'%(key,value)
cursor.execute(sqlstring)
cursor.connection.commit()
cursor.close()
conn.close()
之后執(zhí)行spark-submit txttosql.py即可悲柱,注意數(shù)據(jù)庫(kù)如果已有數(shù)據(jù),將不會(huì)被覆蓋些己,而是之后插入操作
使用JDBC+Pypark進(jìn)行MySql操作
建一個(gè)parallelize的RDD
In [12]: rdd1 = sc.parallelize([(1,'id1',100000,12,1.2),(2,'id2',2000000,13,1.22)])
轉(zhuǎn)化成為DataFrame的RDD
In [13]: rdd2 = rdd1.toDF()
# 會(huì)耗費(fèi)比較長(zhǎng)的時(shí)間
In [14]: rdd2.collect()
Out[14]:
[Row(_1=1, _2=u'id1', _3=100000, _4=12, _5=1.2),
Row(_1=2, _2=u'id2', _3=2000000, _4=13, _5=1.22)]
In [15]: rdd2.show()
+---+---+-------+---+----+
| _1| _2| _3| _4| _5|
+---+---+-------+---+----+
| 1|id1| 100000| 12| 1.2|
| 2|id2|2000000| 13|1.22|
+---+---+-------+---+----+
In [18]: rdd2.filter("_3 > 100000").show()
+---+---+-------+---+----+
| _1| _2| _3| _4| _5|
+---+---+-------+---+----+
| 2|id2|2000000| 13|1.22|
+---+---+-------+---+----+
# 可以修改別名豌鸡,貌似只有一次改的?
In [34]: rdd2.withColumnRenamed("_2","name_string").withColumnRenamed("_3","money_bigint")
Out[34]: DataFrame[_1: bigint, name_string: string, money_bigint: bigint, _4: bigint, _5: double]
# 嘗試在toDF的時(shí)候就寫好名字
In [41]: rdd3 = rdd1.toDF(["id_int","name_string","money_bigint","age_double","tall_float"])
In [42]: rdd3.write.jdbc("jdbc:mysql://xxxx/test", "testalltype", "overwrite", {"
...: user":"", "password":""})
# 查看
mysql> select * from testalltype;
+--------+-------------+--------------+------------+------------+
| id_int | name_string | money_bigint | age_double | tall_float |
+--------+-------------+--------------+------------+------------+
| 2 | id2 | 2000000 | 13 | 1.22 |
| 1 | id1 | 100000 | 12 | 1.2 |
+--------+-------------+--------------+------------+------------+
2 rows in set (0.00 sec)
# 插入語(yǔ)句可以用append段标,使用另一種方法創(chuàng)建dataframe
In [48]: newline = [(932,'Alice', 1929291,2,22.92)]
In [51]: rdd4 = sqlContext.createDataFrame(newline,['id_int','name
...: _string','money_bigint','age_double','tall_float'])
In [52]: rdd4.show()
+------+-----------+------------+----------+-----------+
|id_int|name_string|money_bigint|age_double|tall_float|
+------+-----------+------------+----------+-----------+
| 932| Alice| 1929291| 2| 22.92|
+------+-----------+------------+----------+-----------+
In [55]: rdd4.write.jdbc("jdbc:mysql://xxxx/test","t
...: estalltype","append",{"user":"","password":"
...: "})
# 查看
mysql> select * from testalltype;
+--------+-------------+--------------+------------+------------+
| id_int | name_string | money_bigint | age_double | tall_float |
+--------+-------------+--------------+------------+------------+
| 2 | id2 | 2000000 | 13 | 1.22 |
| 1 | id1 | 100000 | 12 | 1.2 |
| 932 | Alice | 1929291 | 2 | 22.92 |
+--------+-------------+--------------+------------+-------------+
##############使用pyspark+jdbc將本地csv存儲(chǔ)到mysql###########
In [1]: datapath = "dataform.csv"
In [2]: with open(datapath) as f:
...: k = 1
...: parallelizelist = []
...: for line in f.readlines():
...: linesplit = line.strip().split("|")
...: tuple_data = tuple(linesplit)
...: if k == 1:
...: tuple_title = linesplit
...: else:
...: parallelizelist.append(tuple_data)
...:
...: k +=1
# 方法1:sqlContext.createDataFrame
In [3]: rdd3 = sqlContext.createDataFrame(parallelizelist,tuple_title)
In [4]: rdd3.write.jdbc("jdbc:mysql://xxxx/test","testallty
...: pe","overwrite",{"user":"","password":"
...: "})
# 方法2:toDF
In [8]: rdd4 = sc.parallelize(parallelizelist)
In [9]: rdd5 = rdd4.toDF(tuple_title)
In [10]: rdd5.write.jdbc("jdbc:mysql://xxxx/test","testallt
...: ype","append",{"user":"","password":""
...: })
Spark對(duì)Hive表操作
首先理解下什么是SparkContext, SQLContext 和HiveContext涯冠,原文可參考@pig2--讓你真正理解什么是SparkContext, SQLContext 和HiveContext這位版主很厲害!這里簡(jiǎn)單總結(jié)下
- SparkContext:用于連接Spark集群逼庞、創(chuàng)建RDD蛇更、累加器(accumlator)、廣播變量(broadcast variables)赛糟,所以說(shuō)SparkContext為Spark程序的根派任,你只要知道它能讓一個(gè)普通的列表編程rdd就行了,非常牛逼璧南,就是傳說(shuō)中的sc掌逛!
- SparkSQL:是spark的一個(gè)模塊,是spark的一個(gè)模塊司倚,SparkSQL 用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)豆混,所以SparkSQL你的data必須定義schema.在spark1.3.1,sparksql繼承dataframes 和SQL 查詢引擎
- SQLContext:spark處理結(jié)構(gòu)化數(shù)據(jù)的入口动知。允許創(chuàng)建DataFrame以及sql查詢
- HiveContext:spark sql執(zhí)行引擎皿伺,集成hive數(shù)據(jù)
In [68]: from pyspark.sql import HiveContext,Row
In [69]: hiveCtx = HiveContext(sc)
In [70]: rows = hiveCtx.sql("SELECT * FROM test.table1 limit
...: 5")
In [71]: firstRow = rows.first()
[Stage 32:=====>
[Stage 32:========>
[Stage 32:==========>
[Stage 32:=============>
[Stage 32:================>
[Stage 32:==================>
[Stage 32:=====================>
[Stage 32:======================>
In [72]: print firstRow.business_id
257
In [73]: print firstRow.order_id
3057564118
In [74]: hiveowntest = HiveContext(sc)
In [75]: rows2 = hiveowntest.sql("SELECT * FROM test.owntest")
In [76]: rows2.show()
+--------+---+
| name|age|
+--------+---+
|shangsan| 20|
| lisi| 22|
| zhouwu| 21|
+--------+---+
# 保存入表,其實(shí)就是講hive表讀入RDD拍柒,然后再寫入新的hive表中
In [79]: rows2.saveAsTable("hive_test_spark")
# 然后進(jìn)入hive中進(jìn)行操作,雖然有點(diǎn)錯(cuò)誤心傀,單還是可以執(zhí)行查詢動(dòng)作
hive> select * from hive_test_spark;
OK
hive_test_spark.name hive_test_spark.age
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
shangsan 20
lisi 22
zhouwu 21
hive> select * from hive_test_spark where name="shangsan";
OK
hive_test_spark.name hive_test_spark.age
shangsan 20
Time taken: 0.645 seconds, Fetched: 1 row(s)
Pyspark使用本地文件建立Hive表
1.再另一終端,執(zhí)行如下拆讯,將文件put到集群
# 其中spark_write_hive.txt是本地寫好的文件,之后的操作是put到hdfs上
$ hadoop fs -put spark_write_hive.txt hdfs:/xxxx/xxxx/
# 查看是否put到hdfs上
$ hadoop fs -cat hdfs:/xxxx/xxxx/spark_write_hive.txt
shangsan,20
lisi,30
2.需要測(cè)試的文件已推送到集群上存儲(chǔ)养叛,接下來(lái)是使用spark并將數(shù)據(jù)導(dǎo)入到表中
sqlContext = HiveContext(sc)
********
# 建hive表
sqlContext.sql("CREATE TABLE IF NOT EXISTS hive_test_spark2 (key INT,value STRING)")
# 導(dǎo)入數(shù)據(jù)种呐,注意這里是集群的數(shù)據(jù)
hivenewtable.sql("LOAD DATA INPATH '/xxxx/xxxx/test.txt' INTO TABLE hive_test_spark2")
********
# 還是推薦以下方式建立外表
In [21]: txttohive = HiveContext(sc)
In [22]: txttohive.sql("CREATE EXTERNAL TABLE IF NOT EXISTS hive_t
...: est_spark3 (name string,age string ) ROW FORMAT DELIMITED
...: FIELDS TERMINATED BY ','")
Out[22]: DataFrame[result: string]
In [23]: txttohive.sql("LOAD DATA INPATH '/xxxx/xxxx/
...: spark_write_hive.txt' INTO TABLE hive_test_s
...: park3")
16/12/22 13:49:35 ERROR KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
Out[23]: DataFrame[result: string]
# 查看hive表
hive> select * from hive_test_spark3;
OK
hive_test_spark3.name hive_test_spark3.age
shangsan 20
lisi 30
使用Pyspark 將Hive表轉(zhuǎn)化成rdd操作
from pyspark.sql import HiveContext,Row
hiveCtx = HiveContext(sc)
data_order = hiveCtx.sql("select * from xx.hot_position")
data_order.show(7)
+----+--------+-------+----+-----------+----+-----+---+
|city| badlng| badlat| num|badcaseprec|year|month|day|
+----+--------+-------+----+-----------+----+-----+---+
| xx市|1x6.3213|x9.8959|3597| 0.857|2017| 05| 10|
| xx市| 1x6.379| x9.865|5775| 0.857|2017| 05| 10|
| xx市|1x1.3198|x1.1937|1269| 0.849|2017| 05| 10|
| xx市|1x1.3199|x1.1937|3387| 0.847|2017| 05| 10|
| xx市|1x6.5509|x9.6083|1092| 0.835|2017| 05| 10|
| xx市| 1x1.354|x1.1988|1482| 0.825|2017| 05| 10|
| xx市|1x0.2131|x0.2915|8215| 0.817|2017| 05| 10|
+----+--------+-------+----+-----------+----+-----+---+
only showing top 7 rows
# 將dataframe轉(zhuǎn)化為rdd進(jìn)行計(jì)算
data_order_rdd=data_order.rdd
data_order_rdd.map(lambda x:(x.badlng,x.badlat)).collect()
# output
[(u'1x6.3213', u'x9.8959'),
(u'1x6.379', u'x9.865'),
(u'1x1.3198', u'x1.1937'),
(u'1x1.3199', u'x1.1937'),
(u'1x6.5509', u'x9.6083')]
當(dāng)然你也可以這樣操作
# 進(jìn)行復(fù)合計(jì)算
# -*- coding: utf-8 -*-
from math import*
def Distance2(data):# 第二種計(jì)算方法
lat2=float(data.split("\t")[1])
lng2=float(data.split("\t")[0])
lat1=39.8959
lng1=116.3213
radlat1=radians(lat1)
radlat2=radians(lat2)
a=radlat1-radlat2
b=radians(lng1)-radians(lng2)
s=2*asin(sqrt(pow(sin(a/2),2)+cos(radlat1)*cos(radlat2)*pow(sin(b/2),2)))
earth_radius=6378.137
s=s*earth_radius
if s<0:
return -s
else:
return s
data_order_rdd.filter(lambda x:x.badlat>'31').map(lambda x:x.badlng+'\t'+x.badlat).map(Distance2).collect()
# 輸出
[0.0,
6.010588654903075,
1068.8138888545056,
1068.8177053251293,
1069.6032160827797,
1068.969082793839,
0.07015355066273321]
Spark Streaming
Spark是一個(gè)類似于MapReduce的分布式計(jì)算框架,其核心是彈性分布式數(shù)據(jù)集弃甥,提供了比MapReduce更豐富的模型爽室,可以在快速在內(nèi)存中對(duì)數(shù)據(jù)集進(jìn)行多次迭代,以支持復(fù)雜的數(shù)據(jù)挖掘算法和圖形計(jì)算算法淆攻。Spark Streaming是一種構(gòu)建在Spark上的實(shí)時(shí)計(jì)算框架阔墩,它擴(kuò)展了Spark處理大規(guī)模流式數(shù)據(jù)的能力嘿架。
基于云梯Spark on Yarn的Spark Streaming總體架構(gòu)如圖,Spark on Yarn啟動(dòng)后,由Spark AppMaster把Receiver作為一個(gè)Task提交給某一個(gè)Spark Executor啸箫;Receive啟動(dòng)后輸入數(shù)據(jù)耸彪,生成數(shù)據(jù)塊,然后通知Spark AppMaster忘苛;Spark AppMaster會(huì)根據(jù)數(shù)據(jù)塊生成相應(yīng)的Job蝉娜,并把Job的Task提交給空閑Spark Executor 執(zhí)行。圖中藍(lán)色的粗箭頭顯示被處理的數(shù)據(jù)流扎唾,輸入數(shù)據(jù)流可以是磁盤召川、網(wǎng)絡(luò)和HDFS等,輸出可以是HDFS胸遇,數(shù)據(jù)庫(kù)等荧呐。
Spark Streaming的基本原理
將輸入數(shù)據(jù)流以時(shí)間片(秒級(jí))為單位進(jìn)行拆分,然后以類似批處理的方式處理每個(gè)時(shí)間片數(shù)據(jù)纸镊,其基本原理如圖
首先坛增,Spark Streaming把實(shí)時(shí)輸入數(shù)據(jù)流以時(shí)間片Δt (如1秒)為單位切分成塊。Spark Streaming會(huì)把每塊數(shù)據(jù)作為一個(gè)RDD薄腻,并使用RDD操作處理每一小塊數(shù)據(jù)星瘾。每個(gè)塊都會(huì)生成一個(gè)Spark Job處理皮假,最終結(jié)果也返回多塊。
Spark Streaming的內(nèi)部原理
使用Spark Streaming編寫的程序與編寫Spark程序非常相似,在Spark程序中蜜笤,主要通過(guò)操作RDD(Resilient Distributed Datasets彈性分布式數(shù)據(jù)集)提供的接口,如map畅蹂、reduce乃坤、filter等,實(shí)現(xiàn)數(shù)據(jù)的批處理弄贿。而在Spark Streaming中春锋,則通過(guò)操作DStream(表示數(shù)據(jù)流的RDD序列)提供的接口,這些接口和RDD提供的接口類似差凹。
Spark Streaming把程序中對(duì)DStream的操作轉(zhuǎn)換為DStream Graph
對(duì)于每個(gè)時(shí)間片期奔,DStream Graph都會(huì)產(chǎn)生一個(gè)RDD Graph;針對(duì)每個(gè)輸出操作(如print危尿、foreach等)呐萌,Spark Streaming都會(huì)創(chuàng)建一個(gè)Spark action;對(duì)于每個(gè)Spark action谊娇,Spark Streaming都會(huì)產(chǎn)生一個(gè)相應(yīng)的Spark job肺孤,并交給JobManager。JobManager中維護(hù)著一個(gè)Jobs隊(duì)列, Spark job存儲(chǔ)在這個(gè)隊(duì)列中,JobManager把Spark job提交給Spark Scheduler赠堵,Spark Scheduler負(fù)責(zé)調(diào)度Ta
Spark Streaming優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- Spark Streaming 內(nèi)部的實(shí)現(xiàn)和調(diào)度方式高度依賴 Spark 的 DAG 調(diào)度器和 RDD小渊,這就決定了 Spark Streaming 的設(shè)計(jì)初衷必須是粗粒度方式的,同時(shí)茫叭,由于 Spark 內(nèi)部調(diào)度器足夠快速和高效酬屉,可以快速地處理小批量數(shù)據(jù),這就獲得準(zhǔn)實(shí)時(shí)的特性杂靶。
- Spark Streaming 的粗粒度執(zhí)行方式使其確卑鸸撸“處理且僅處理一次”的特性,同時(shí)也可以更方便地實(shí)現(xiàn)容錯(cuò)恢復(fù)機(jī)制吗垮。
- 由于 Spark Streaming 的 DStream 本質(zhì)是 RDD 在流式數(shù)據(jù)上的抽象垛吗,因此基于 RDD 的各種操作也有相應(yīng)的基于 DStream 的版本,這樣就大大降低了用戶對(duì)于新框架的學(xué)習(xí)成本烁登,在了解 Spark 的情況下用戶將很容易使用 Spark Streaming怯屉。
- 由于 DStream 是在 RDD 上的抽象,那么也就更容易與 RDD 進(jìn)行交互操作饵沧,在需要將流式數(shù)據(jù)和批處理數(shù)據(jù)結(jié)合進(jìn)行分析的情況下锨络,將會(huì)變得非常方便。
缺點(diǎn)
- Spark Streaming 的粗粒度處理方式也造成了不可避免的延遲狼牺。在細(xì)粒度處理方式下羡儿,理想情況下每一條記錄都會(huì)被實(shí)時(shí)處理,而在 Spark Streaming 中是钥,數(shù)據(jù)需要匯總到一定的量后再一次性處理掠归,這就增加了數(shù)據(jù)處理的延遲,這種延遲是由框架的設(shè)計(jì)引入的悄泥,并不是由網(wǎng)絡(luò)或其他情況造成的虏冻。
- Spark Streaming 當(dāng)前版本穩(wěn)定性不是很好。[spark 1.5]
如何使用Spark Streaming
作為構(gòu)建于Spark之上的應(yīng)用框架弹囚,Spark Streaming承襲了Spark的編程風(fēng)格厨相,對(duì)于已經(jīng)了解Spark的用戶來(lái)說(shuō)能夠快速地上手。接下來(lái)以Spark Streaming官方提供的WordCount代碼為例來(lái)介紹Spark Streaming的使用方式鸥鹉。
// scala語(yǔ)言編寫spark streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
Spark在機(jī)器學(xué)習(xí)中應(yīng)用
- 官方教程:官網(wǎng)地址pyspark svm
- 數(shù)據(jù):sample_libsvm_data.txt
第一步蛮穿,新建python 文件,取名svmwithsgd.py]
# -*- coding:utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
appName ="hellospark" #你的應(yīng)用程序名稱
master= "local"#設(shè)置單機(jī)
conf = SparkConf().setAppName(appName).setMaster(master)#配置SparkContext
sc = SparkContext(conf=conf)
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.strip().split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("newdata.txt")
parsedData = data.map(parsePoint)
# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
# Save and load model
#model.save(sc, "myModelPath")
#sameModel = SVMModel.load(sc, "myModelPath")
# 最后兩段會(huì)加載出錯(cuò)
#SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
#SLF4J: Defaulting to no-operation (NOP) logger implementation
#SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
第二步宋舷,進(jìn)行spark-submit 提交任務(wù)
$ spark-submit svmwithsgd.py
Pyspark在工作流中套路
詳見(jiàn)我的另一篇博客Spark日志清洗一般流程
總結(jié)
在實(shí)習(xí)的過(guò)程中绪撵,對(duì)于不同的任務(wù)啟用不同的工具,處理手段祝蝠,技巧等,而對(duì)編程語(yǔ)言的選擇,其實(shí)并不是那么重要绎狭,這是一種實(shí)現(xiàn)形式罷了细溅,效率的核心還是對(duì)數(shù)據(jù)結(jié)構(gòu)和算法的理解上,大數(shù)據(jù)處理在我現(xiàn)在的認(rèn)知范圍內(nèi)儡嘶,只不過(guò)是一種海量數(shù)據(jù)處理的技術(shù)喇聊,就像一臺(tái)機(jī)器不夠算了,那就搞兩臺(tái)蹦狂,n臺(tái)誓篱,機(jī)械硬盤算起來(lái)不夠快了,那就加載到內(nèi)存中算(spark)凯楔,當(dāng)然機(jī)器之間的通信窜骄,任務(wù)的派發(fā),最后的匯總這些也是非常值得琢磨的摆屯,而設(shè)置分配內(nèi)存邻遏,設(shè)置mapreduce個(gè)數(shù)這類的,很多時(shí)候都是靠經(jīng)驗(yàn)來(lái)總結(jié)虐骑,對(duì)數(shù)據(jù)的把握程度准验,預(yù)估上進(jìn)行判斷,如何更快速的處理數(shù)據(jù)并且開銷更低廷没,這里都是屬于性能調(diào)優(yōu)里面的糊饱。總之颠黎,能處理數(shù)據(jù)另锋,了解t和a用法,并不能說(shuō)掌握spark盏缤,只能說(shuō)會(huì)用這個(gè)工具而已砰蠢,而現(xiàn)實(shí)中大多數(shù)任務(wù)只是挑選工具的過(guò)程而已,我們通常是為了close掉任務(wù)而去學(xué)習(xí)一種新的更快的工具唉铜,我想這對(duì)于工具的理解和學(xué)習(xí)來(lái)說(shuō)是十分不利的台舱,而越來(lái)越多的任務(wù)迫使我們沒(méi)有太多的余力靜下心去去更深入的學(xué)習(xí)和了解,記錄和總結(jié)潭流。而我竞惋,并不想這樣。