筆記:新手的Spark指南

macOS Sierra 10.12.4

Spark 1.6.2

Python 2.7

轉(zhuǎn)載請(qǐng)注明出處:http://blog.csdn.net/MrLevo520/article/details/76087612

前言

既然做了Hive的整理,那就把spark的也整理下吧薯演,當(dāng)做入門指南和自己的筆記吧~與君共勉

Spark基礎(chǔ)

Spark是什么翠勉?

? Spark是UC Berkeley AMP lab所開源的類Hadoop MapReduce的通用分布式并行計(jì)算框架朦蕴。Spark擁有hadoop MapReduce所具有的優(yōu)點(diǎn)疏尿,但和MapReduce 的最大不同之處在于Spark是基于內(nèi)存的迭代式計(jì)算——Spark的Job處理的中間輸出結(jié)果可以保存在內(nèi)存中榆鼠,從而不再需要讀寫HDFS震鹉,除此之外俱笛,一個(gè)MapReduce 在計(jì)算過(guò)程中只有map 和reduce 兩個(gè)階段,處理之后就結(jié)束了传趾,而在Spark的計(jì)算模型中迎膜,可以分為n階段,因?yàn)樗鼉?nèi)存迭代式的浆兰,我們?cè)谔幚硗暌粋€(gè)階段以后磕仅,可以繼續(xù)往下處理很多個(gè)階段,而不只是兩個(gè)階段簸呈。
  因此Spark能更好地適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等需要迭代的MapReduce的算法榕订。其不僅實(shí)現(xiàn)了MapReduce的算子map 函數(shù)和reduce函數(shù)及計(jì)算模型,還提供更為豐富的算子蜕便,如filter劫恒、join、groupByKey等轿腺。是一個(gè)用來(lái)實(shí)現(xiàn)快速而同用的集群計(jì)算的平臺(tái)两嘴。— From Spark 工作原理及核心RDD 詳解

Spark原理過(guò)程

  1. 使用spark-submit提交一個(gè)Spark作業(yè)之后族壳,這個(gè)作業(yè)就會(huì)啟動(dòng)一個(gè)對(duì)應(yīng)的Driver進(jìn)程溶诞。

    • Driver:運(yùn)行Application的main()函數(shù)并且創(chuàng)建SparkContext
  2. Driver根據(jù)我們?cè)O(shè)置的參數(shù)(比如說(shuō)設(shè)定任務(wù)隊(duì)列,設(shè)定最大內(nèi)存等)Cluster Manager 申請(qǐng)運(yùn)行Spark作業(yè)需要使用的資源决侈,這里的資源指的就是Executor進(jìn)程螺垢,YARN集群管理器會(huì)根據(jù)我們?yōu)镾park作業(yè)設(shè)置的資源參數(shù)喧务,在各個(gè)工作節(jié)點(diǎn)上,啟動(dòng)一定數(shù)量的Executor進(jìn)程枉圃,每個(gè)Executor進(jìn)程都占有一定數(shù)量的內(nèi)存和CPU core功茴。

    • Executor:是為某Application運(yùn)行在Worker Node上的一個(gè)進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行Task孽亲,并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上坎穿,每個(gè)Application都有各自獨(dú)立的Executors
    • Cluster Manager:集群管理器,在集群上獲取資源的外部服務(wù)(例如:Local返劲、Standalone玲昧、Mesos或Yarn等集群管理系統(tǒng))
  3. 申請(qǐng)到了作業(yè)執(zhí)行所需的資源之后,river進(jìn)程會(huì)將我們編寫的Spark作業(yè)代碼分拆為多個(gè)stage篮绿,每個(gè)stage執(zhí)行一部分代碼片段孵延,并為每個(gè)stage創(chuàng)建一批task,然后將這些task分配到各個(gè)Executor進(jìn)程中執(zhí)行亲配,一個(gè)stage的所有task都執(zhí)行完畢之后尘应,會(huì)在各個(gè)節(jié)點(diǎn)本地的磁盤文件中寫入計(jì)算中間結(jié)果,然后Driver就會(huì)調(diào)度運(yùn)行下一個(gè)stage吼虎。下一個(gè)stage的task的輸入數(shù)據(jù)就是上一個(gè)stage輸出的中間結(jié)果犬钢。如此循環(huán)往復(fù),直到將我們自己編寫的代碼邏輯全部執(zhí)行完

    • task:最小的計(jì)算單元思灰,負(fù)責(zé)執(zhí)行一模一樣的計(jì)算邏輯(也就是我們自己編寫的某個(gè)代碼片段)

      ?

什么是RDD?

全稱為彈性分布式數(shù)據(jù)集;本質(zhì)上玷犹,RDD是種編程抽象,代表可以跨機(jī)器進(jìn)行分割的只讀對(duì)象集合洒疚。RDD可以從一個(gè)繼承結(jié)構(gòu)(lineage)重建(因此可以容錯(cuò))歹颓,通過(guò)并行操作訪問(wèn),可以讀寫HDFS或S3這樣的分布式存儲(chǔ)拳亿,更重要的是晴股,可以緩存到worker節(jié)點(diǎn)的內(nèi)存中進(jìn)行立即重用。由于RDD可以被緩存在內(nèi)存中肺魁,Spark對(duì)迭代應(yīng)用特別有效电湘,因?yàn)檫@些應(yīng)用中,數(shù)據(jù)是在整個(gè)算法運(yùn)算過(guò)程中都可以被重用鹅经。大多數(shù)機(jī)器學(xué)習(xí)和最優(yōu)化算法都是迭代的寂呛,使得Spark對(duì)數(shù)據(jù)科學(xué)來(lái)說(shuō)是個(gè)非常有效的工具。另外瘾晃,由于Spark非炒荆快,可以通過(guò)類似Python REPL的命令行提示符交互式訪問(wèn)蹦误。

RDD特點(diǎn)

  • RDD在抽象上來(lái)說(shuō)是一種元素集合劫拢,包含了數(shù)據(jù)肉津。它是被分區(qū)的,分為多個(gè)分區(qū)舱沧,每個(gè)分區(qū)分布在集群中的不同節(jié)點(diǎn)上妹沙,從而讓RDD中的數(shù)據(jù)可以被并行操作。(分布式數(shù)據(jù)集)
  • RDD的數(shù)據(jù)默認(rèn)情況下存放在內(nèi)存中的熟吏,但是在內(nèi)存資源不足時(shí)距糖,Spark會(huì)自動(dòng)將RDD數(shù)據(jù)寫入磁盤。比如每個(gè)節(jié)點(diǎn)最多放5萬(wàn)數(shù)據(jù)牵寺,結(jié)果你每個(gè)partition是10萬(wàn)數(shù)據(jù)悍引。那么就會(huì)把partition中的部分?jǐn)?shù)據(jù)寫入磁盤上,進(jìn)行保存帽氓。(彈性)
  • RDD將操作分為兩類:transformation與action趣斤。無(wú)論執(zhí)行了多少次transformation操作,RDD都不會(huì)真正執(zhí)行運(yùn)算杏节,只有當(dāng)action操作被執(zhí)行時(shí)唬渗,運(yùn)算才會(huì)觸發(fā)典阵。而在RDD的內(nèi)部實(shí)現(xiàn)機(jī)制中奋渔,底層接口則是基于迭代器的,從而使得數(shù)據(jù)訪問(wèn)變得更高效壮啊,也避免了大量中間結(jié)果對(duì)內(nèi)存的消耗嫉鲸。
  • RDD最重要的特性就是,提供了容錯(cuò)性歹啼,可以自動(dòng)從節(jié)點(diǎn)失敗中恢復(fù)過(guò)來(lái)玄渗。即如果某個(gè)節(jié)點(diǎn)上的RDD partition,因?yàn)楣?jié)點(diǎn)故障狸眼,導(dǎo)致數(shù)據(jù)丟了藤树,那么RDD會(huì)自動(dòng)通過(guò)自己的數(shù)據(jù)來(lái)源重新計(jì)算該partition。這一切對(duì)使用者是透明的拓萌。

RDD在Spark中的地位及作用

這需要從四個(gè)方面闡述

  • 為什么會(huì)有Spark岁钓?

    因?yàn)閭鹘y(tǒng)的并行計(jì)算模型無(wú)法有效的解決迭代計(jì)算(iterative)和交互式計(jì)算(interactive);而Spark的使命便是解決這兩個(gè)問(wèn)題微王,這也是他存在的價(jià)值和理由屡限。

  • Spark如何解決迭代計(jì)算?

    其主要實(shí)現(xiàn)思想就是RDD炕倘,把所有計(jì)算的數(shù)據(jù)保存在分布式的內(nèi)存中钧大。迭代計(jì)算通常情況下都是對(duì)同一個(gè)數(shù)據(jù)集做反復(fù)的迭代計(jì)算,數(shù)據(jù)在內(nèi)存中將大大提升IO操作罩旋。這也是Spark涉及的核心:內(nèi)存計(jì)算啊央。

  • Spark如何實(shí)現(xiàn)交互式計(jì)算眶诈?

    因?yàn)镾park是用scala語(yǔ)言實(shí)現(xiàn)的,Spark和scala能夠緊密的集成瓜饥,所以Spark可以完美的運(yùn)用scala的解釋器册养,使得其中的scala可以向操作本地集合對(duì)象一樣輕松操作分布式數(shù)據(jù)集。當(dāng)然你也可以使用python压固,java球拦,R等接口,spark也提供了相應(yīng)的操作方式

  • Spark和RDD的關(guān)系帐我?

    可以理解為:RDD是一種具有容錯(cuò)性基于內(nèi)存的集群計(jì)算抽象方法坎炼,Spark則是這個(gè)抽象方法的實(shí)現(xiàn)。

如何操作RDD拦键?

Step1-獲取RDD

  • 自己創(chuàng)建個(gè)RDD谣光,如以下語(yǔ)句:rdd = sc.parallelize(['1,2,3,4','5,6,6','9,10,11'])
  • 從共享的文件系統(tǒng)獲取,(如:HDFS)
  • 通過(guò)已存在的RDD轉(zhuǎn)換
  • 將已存在scala集合(只要是Seq對(duì)象)并行化,通過(guò)調(diào)用SparkContext的parallelize方法實(shí)現(xiàn)
  • 改變現(xiàn)有RDD的之久性;RDD是懶散,短暫的.(RDD的固化:cache緩存至內(nèi)錯(cuò);save保存到分布式文件系統(tǒng))

Step2-操作RDD

Transformation:根據(jù)數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集,計(jì)算后返回一個(gè)新RDD芬为;例如:Map將數(shù)據(jù)的每個(gè)元素經(jīng)過(guò)某個(gè)函數(shù)計(jì)算后萄金,返回一個(gè)新的分布式數(shù)據(jù)集即RDD。

值得注意的是媚朦,RDD的轉(zhuǎn)化操作都是惰性求值得氧敢,也就意味著在被調(diào)用行動(dòng)操作之前Spark不會(huì)開始計(jì)算,相反询张,Spark會(huì)在內(nèi)部記錄下所要求執(zhí)行的操作的相關(guān)信息孙乖,因此在調(diào)用sc.textFile()時(shí)候,數(shù)據(jù)并沒(méi)有讀取進(jìn)來(lái)份氧,而是在必要的時(shí)候才會(huì)進(jìn)行讀取唯袄。所以也就導(dǎo)致了導(dǎo)入文件的時(shí)候感覺(jué)很快的錯(cuò)覺(jué)

  • Transformation的一些例子
image
def func(a):
    line_split = a.split(",")
    return sum(map(int,line_split))

data = sc.parallelize(['1,2,3,4','5,6,6','9,10,11'])  # 生成rdd
t_rdd= data.map(func)  # rdd的Transformation過(guò)程
a_rdd = t_rdd.collect()  # action過(guò)程  [10, 17, 30]

Actions:對(duì)數(shù)據(jù)集計(jì)算后返回一個(gè)數(shù)值value給驅(qū)動(dòng)程序;例如:Reduce將數(shù)據(jù)集的所有元素用某個(gè)函數(shù)聚合后蜗帜,將最終結(jié)果返回給程序恋拷。返回的是一個(gè)新的數(shù)據(jù)類型,這里注意的是厅缺,返回的并不是新的RDD蔬顾,只有Transformation之后是新的RDD

  • Actions具體內(nèi)容
image

spark執(zhí)行步驟

  1. 定義一個(gè)或多個(gè)RDD,可以通過(guò)獲取存儲(chǔ)在磁盤上的數(shù)據(jù)(HDFS店归,Cassandra阎抒,HBase,Local Disk)消痛,并行化內(nèi)存中的某些集合且叁,轉(zhuǎn)換(transform)一個(gè)已存在的RDD,或者秩伞,緩存或保存逞带。
  2. 通過(guò)傳遞一個(gè)閉包(函數(shù))給RDD上的每個(gè)元素來(lái)調(diào)用RDD上的操作欺矫。Spark提供了除了Map和Reduce的80多種高級(jí)操作。
  3. 使用結(jié)果RDD的動(dòng)作(action)(如count展氓、collect穆趴、save等)。動(dòng)作將會(huì)啟動(dòng)集群上的worker機(jī)器進(jìn)行計(jì)算遇汞。

當(dāng)Spark在一個(gè)worker上運(yùn)行閉包時(shí)未妹,閉包中用到的所有變量都會(huì)被拷貝到節(jié)點(diǎn)上,但是由閉包的局部作用域來(lái)維護(hù)空入。Spark提供了兩種類型的共享變量络它,這些變量可以按照限定的方式被所有worker訪問(wèn)。廣播變量會(huì)被分發(fā)給所有worker歪赢,但是是只讀的化戳。累加器這種變量,worker可以使用關(guān)聯(lián)操作來(lái)“加”埋凯,通常用作計(jì)數(shù)器点楼。

Spark實(shí)際操作

那么, sc的是什么鬼?

你可以把他理解成由SparkContext構(gòu)造出來(lái)的實(shí)例白对,通過(guò)這個(gè)實(shí)例我們可以構(gòu)造自己的RDD

# -*- coding:utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import math
appName ="hellospark" #你的應(yīng)用程序名稱
master= "local"#設(shè)置單機(jī)
conf = SparkConf().setAppName(appName).setMaster(master)#配置SparkContext
sc = SparkContext(conf=conf)

# 一個(gè)簡(jiǎn)單的wordcount測(cè)試
str_ = '''this is a word count test only test show twice'''
data = sc.parallelize(str_.split(" "))
data.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collect()

# spark.akka.frameSize: 控制Spark中通信消息的最大容量 (如 task 的輸出結(jié)果)掠廓,默認(rèn)為10M。當(dāng)處理大數(shù)據(jù)時(shí)躏结,task 的輸出可能會(huì)大于這個(gè)值却盘,需要根據(jù)實(shí)際數(shù)據(jù)設(shè)置一個(gè)更高的值狰域。
# SparkConf為Spark配置類媳拴,配置已鍵值對(duì)形式存儲(chǔ),封裝了一個(gè)ConcurrentHashMap類實(shí)例settings用于存儲(chǔ)Spark的配置信息兆览;配置項(xiàng)包括:master屈溉、appName、Jars抬探、ExecutorEnv等等
# SparkContext用于連接Spark集群子巾、創(chuàng)建RDD、累加器(accumlator)小压、廣播變量(broadcast variables)线梗,所以說(shuō)SparkContext為Spark程序的根

注意sc的構(gòu)造是怎么來(lái)的

方法一:在jupyter中操作(推薦)

當(dāng)然,你得把pyspark的kernel配到j(luò)upyter中怠益,可參考解決:win遠(yuǎn)程連接ubuntu服務(wù)器安裝jupyter仪搔,啟動(dòng)pyspark

這里寫圖片描述

方法二:使用spark-submit pythonfile.py來(lái)實(shí)現(xiàn)提交python腳本操作

# 前提是在一個(gè)文件夾中,不然要定位文件位置
$ spark-submit --driver-memory 6G --queue 如果有隊(duì)列填上隊(duì)列名字 testpy.py 可帶參數(shù)

# pyspark test
# 中文測(cè)試

方法三:使用ipython在pyspark的shell中操作

# 啟動(dòng)local spark:pyspark --master local[2]
# local[2]是開雙核的意思蜻牢,[4]即是開4核

xiaoju@map-traffic-spd131.gz01:~$ pyspark --master local[2]

In [8]: line = sc.textFile("file:/home/xiaoju/user/xukai/test.tx #創(chuàng)建RDD載入的路徑這里是機(jī)器路徑
   ...: t")

In [9]: pythonlines = line.filter(lambda line:"test" in  line) # 轉(zhuǎn)化操作

In [10]: pythonlines.first() # 行動(dòng)操作
Out[10]: u'test;'

# 當(dāng)一個(gè)文本讀取為RDD時(shí)烤咧,輸入的每一行都會(huì)成為RDD的一個(gè)元素
In [21]: line.first()
Out[21]: u'this is a test txtfile!'

In [24]: print line.first().split(" ")[0] # 這樣就可以流暢使用python進(jìn)行操作了偏陪,只是導(dǎo)入的時(shí)候用的是RDD存儲(chǔ)
this

In [26]: stringlist = line.first().split(" ")
In [27]: nums = sc.parallelize(stringlist) # 用sparkContext的parallelize制作RDD的,是ParallelCollectionRDD煮嫌,創(chuàng)建一個(gè)并行集合笛谦。

In [28]: squared = nums.map(lambda x:x=="this").collect()

In [29]: for num in squared:
    ...:     print num
    ...:
True
False
False
False
False

In [34]: words = lines.flatMap(lambda line:line.split(" ")).collec
    ...: t()  # 使用collece()才能進(jìn)行for輸出,flatmap文件中的所有行數(shù)據(jù)僅返回了一個(gè)數(shù)組對(duì)象

In [35]: for i in words:
    ...:     print i
    ...:
this
is
a
test

# 產(chǎn)生新的鍵值對(duì)pair類型RDD
In [56]: rdd = sc.parallelize([1,2,3,3])

# 操作過(guò)程中昌阿,轉(zhuǎn)化并不會(huì)被執(zhí)行饥脑,需要有個(gè)動(dòng)作操作才被執(zhí)行,比如collect()
In [57]: rdd.collect()
Out[57]: [1, 2, 3, 3]
In [59]: rdd1 = rdd.map(lambda x:(x,x+1))

In [60]: rdd1.collect()
Out[60]: [(1, 2), (2, 3), (3, 4), (3, 4)]

In [64]: rdd2 = rdd1.filter(lambda x: x[0]>2)

In [65]: rdd2.collect()
Out[65]: [(3, 4), (3, 4)]

In [67]: rdd1.sortByKey().collect()
Out[67]: [(1, 2), (2, 3), (3, 4), (3, 4)]

一些Demo

# -*- coding:utf-8 -*-
# 如果使用jupyter的話懦冰,sc已經(jīng)構(gòu)造好了好啰,不需要再倒入包
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import math
appName ="hellospark" #你的應(yīng)用程序名稱
master= "local"#設(shè)置單機(jī)
conf = SparkConf().setAppName(appName).setMaster(master)#配置SparkContext
sc = SparkContext(conf=conf)
 
# parallelize:并行化數(shù)據(jù),轉(zhuǎn)化為RDD
# 并行集合的一個(gè)重要參數(shù)是slices儿奶,表示數(shù)據(jù)集切分的份數(shù)框往。Spark將會(huì)在集群上為每一份數(shù)據(jù)起一個(gè)任務(wù)。
# 典型地闯捎,你可以在集群的每個(gè)CPU上分布2-4個(gè)slices. 一般來(lái)說(shuō)椰弊,Spark會(huì)嘗試根據(jù)集群的狀況,
# 來(lái)自動(dòng)設(shè)定slices的數(shù)目瓤鼻。
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data, numSlices=10)  # numSlices為分塊數(shù)目秉版,根據(jù)集群數(shù)進(jìn)行分塊
 


#--------------------------------------------------
# textFile讀取外部數(shù)據(jù)
rdd = sc.textFile("file:/data/map_da/xukai/sparkstreaming/test/test.txt")  # 以行為單位讀取外部文件,并轉(zhuǎn)化為RDD
print rdd.collect()

#  打印出的結(jié)果是  [u'lslsllslsiiiiiiiiiii']

#--------------------------------------------------

# map:迭代茬祷,對(duì)數(shù)據(jù)集中數(shù)據(jù)進(jìn)行單獨(dú)操作
def my_add(l):
    return (l,l)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)  # 并行化數(shù)據(jù)集
result = distData.map(my_add)
print (result.collect())  # 返回一個(gè)分布數(shù)據(jù)集

# 打印出的結(jié)果  [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]

#--------------------------------------------------

# filter:過(guò)濾數(shù)據(jù)
def my_add(l):
    result = False
    if l > 2:
        result = True
    return result
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)#并行化數(shù)據(jù)集清焕,分片
result = distData.filter(my_add)
print (result.collect())#返回一個(gè)分布數(shù)據(jù)集
 

# [3, 4, 5]
# zip:將兩個(gè)RDD對(duì)應(yīng)元素組合為元組

#--------------------------------------------------

x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
print x.zip(y).collect()
 
# [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
 
 
 
#union 組合兩個(gè)RDD
print x.union(x).collect()
# [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
# Aciton操作

#--------------------------------------------------
 
# collect:返回RDD中的數(shù)據(jù)
rdd = sc.parallelize(range(1, 10))
print rdd
print rdd.collect()

# ParallelCollectionRDD[11] at parallelize at PythonRDD.scala:423
# [1, 2, 3, 4, 5, 6, 7, 8, 9]

#--------------------------------------------------

# collectAsMap:以rdd元素為元組,以元組中一個(gè)元素作為索引返回RDD中的數(shù)據(jù)
m = sc.parallelize([('a', 2), (3, 4)]).collectAsMap()
print m['a']
print m[3]

#2
#4


#--------------------------------------------------

# groupby函數(shù):根據(jù)提供的方法為RDD分組:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
def fun(i):
    return i % 2
result = rdd.groupBy(fun).collect()
print [(x, sorted(y)) for (x, y) in result]
 
# [(0, [2, 8]), (1, [1, 1, 3, 5])]

#--------------------------------------------------

# reduce:對(duì)數(shù)據(jù)集進(jìn)行運(yùn)算
rdd = sc.parallelize(range(1, 10))
result = rdd.reduce(lambda a, b: a + b)
print result
# 45

#--------------------------------------------------

a = sc.parallelize([i for i in range(9)], 3)
print a.collect()
#[0, 1, 2, 3, 4, 5, 6, 7, 8]

y = a.map(lambda a:(a,a*2))  # 需要的表現(xiàn)形式為(a,a*2)的形式祭犯,而a是傳遞的參數(shù)
print y.collect()
#[(0, 0), (1, 2), (2, 4), (3, 6), (4, 8), (5, 10), (6, 12), (7, 14), (8, 16)]

z = a.map(lambda a:a*2)
print print z.collect()
#[0, 2, 4, 6, 8, 10, 12, 14, 16]

y = a.flatMap(lambda a:(a*2,a*3))
print y.collect();
#[0, 0, 2, 3, 4, 6, 6, 9, 8, 12, 10, 15, 12, 18, 14, 21, 16, 24]


#--------------------------------------------------
# union有點(diǎn)像append

x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['D','C','A'])
z = x.union(y)
z2 = x.intersection(y)
print(x.collect())
#['A', 'A', 'B']
print(y.collect())
#['D', 'C', 'A']
print(z.collect())
#['A', 'A', 'B', 'D', 'C', 'A']
print(z2.collect())
# ['A']




#--------------------------------------------------

x = sc.parallelize([1,2,3])
y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )
print(x.collect())
#[1, 2, 3]
print([(j[0],[i for i in j[1]]) for j in y.collect()])
#[('A', [1, 3]), ('B', [2])]

#--------------------------------------------------

x=sc.parallelize([1,3,1,2,3])
y=x.countByValue()
print y[1]
#2

#--------------------------------------------------

# 按升序排秸妥,取前n個(gè)
x = sc.parallelize([1,3,1,2,3,4,1,6])
y=x.takeOrdered(5)
# [1, 1, 1, 2, 3]

小結(jié)

  1. 總的來(lái)說(shuō),RDD之所以被描述為"彈性"沃粗,是因?yàn)樵谌魏螘r(shí)候都能進(jìn)行重算粥惧,因?yàn)楸4鍾DD數(shù)據(jù)的一臺(tái)機(jī)器失敗時(shí),Spark可以使用這種特性來(lái)重算出丟棄的部分分區(qū)最盅。
  2. 轉(zhuǎn)化RDD的時(shí)候突雪,是返回新的RDD而不是對(duì)現(xiàn)有的RDD進(jìn)行操作,只有在執(zhí)行動(dòng)作的時(shí)候返回的是其他數(shù)據(jù)類型 涡贱。

Spark進(jìn)階

這里會(huì)總結(jié)下我以前實(shí)習(xí)時(shí)候用到過(guò)的一些處理方法

使用MySqldb+Pyspark操作Mysql

  1. 首先得知道咏删,這個(gè)數(shù)據(jù)庫(kù)在哪,也就是數(shù)據(jù)庫(kù)所在服務(wù)器的ip地址问词,才能進(jìn)行連接
# 使用ping命令進(jìn)行所需要連接的數(shù)據(jù)庫(kù)的ip地址獲取

$ ping 服務(wù)器
PING xxxxxx bytes of data.
64 bytes from xxxxxx: icmp_seq=1 ttl=64 time=0.020 ms
  1. 使用Mysqldb包進(jìn)行數(shù)據(jù)庫(kù)的連接操作
In [22]: import MySQLdb
In [23]: conn = MySQLdb.connect(host=ip地址,user=用戶名
    ...: ,passwd=密碼,db='test',charset='utf8')
# 這邊連接的時(shí)候最好制定數(shù)據(jù)庫(kù)督函,即添加 db="test",charset="utf8",如不制定,則在sql語(yǔ)句中選擇上指定的數(shù)據(jù)庫(kù)名字

In [24]: cursor = conn.cursor()

In [25]: count = cursor.execute("select count(*) from test_uk ")

In [26]: print cursor.fetchall()
((8L,),)


# 打開服務(wù)器上的Mysql查看一下,ok侨核,沒(méi)問(wèn)題草穆,獲取行數(shù)正確

mysql> select * from test_uk;
+----+----+-------+
| id | tp | value |
+----+----+-------+
|  3 |  1 |  0.75 |
|  4 |  5 |     0 |
|  5 |  6 |     2 |
|  6 |  4 |     0 |
|  9 |  7 |     3 |
| 11 |  9 |     2 |
| 12 | 10 |    11 |
| 14 | 13 |    13 |
+----+----+-------+
8 rows in set (0.00 sec)

# 寫入test數(shù)據(jù)庫(kù)中的tbl_realtime_statis表,記得需要提交

mysql> desc tbl_realtime_statis

+-------+---------+------+-----+---------+-------+
| Field | Type    | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| id    | int(11) | YES  |     | NULL    |       |
| value | double  | YES  |     | NULL    |       |
+-------+---------+------+-----+---------+-------+
2 rows in set (0.00 sec)

# 開始執(zhí)行insert動(dòng)作

In [38]: cursor.execute('INSERT INTO tbl_realtime_statis (id,value) VALUE
    ...: S (1,11)')
Out[38]: 1L

In [39]: cursor.connection.commit()

mysql> select * from tbl_realtime_statis;
+------+-------+
| id   | value |
+------+-------+
|    1 |    11 |
+------+-------+
1 row in set (0.00 sec)

更多語(yǔ)句參考:python使用mysqldb連接數(shù)據(jù)庫(kù)操作方法示例詳解

使用spark-submit pythonfile來(lái)執(zhí)行本地txt寫入指定數(shù)據(jù)庫(kù)的操作

在文件同目錄下創(chuàng)建名為txttosql.py的python腳本搓译,填寫如下

conn = MySQLdb.connect(host='',user='',passwd='',db='test',charset='utf8')
cursor = conn.cursor()

datapath = "test.txt"
with open(datapath) as f:
    for line in f.readlines():
        linesplit = line.strip().split(",")
        key = int(linesplit[0])
        value = int(linesplit[1])
        sqlstring = 'INSERT INTO tbl_realtime_statis (id,value) VALUES (%d,%d)'%(key,value)
        cursor.execute(sqlstring)
        cursor.connection.commit()

cursor.close()
conn.close()

之后執(zhí)行spark-submit txttosql.py即可悲柱,注意數(shù)據(jù)庫(kù)如果已有數(shù)據(jù),將不會(huì)被覆蓋些己,而是之后插入操作

使用JDBC+Pypark進(jìn)行MySql操作

建一個(gè)parallelize的RDD

In [12]: rdd1 = sc.parallelize([(1,'id1',100000,12,1.2),(2,'id2',2000000,13,1.22)])

轉(zhuǎn)化成為DataFrame的RDD

In [13]: rdd2 = rdd1.toDF()
# 會(huì)耗費(fèi)比較長(zhǎng)的時(shí)間

In [14]: rdd2.collect()
Out[14]:
[Row(_1=1, _2=u'id1', _3=100000, _4=12, _5=1.2),
 Row(_1=2, _2=u'id2', _3=2000000, _4=13, _5=1.22)]

In [15]: rdd2.show()
+---+---+-------+---+----+
| _1| _2|     _3| _4|  _5|
+---+---+-------+---+----+
|  1|id1| 100000| 12| 1.2|
|  2|id2|2000000| 13|1.22|
+---+---+-------+---+----+

In [18]: rdd2.filter("_3 > 100000").show()
+---+---+-------+---+----+
| _1| _2|     _3| _4|  _5|
+---+---+-------+---+----+
|  2|id2|2000000| 13|1.22|
+---+---+-------+---+----+

# 可以修改別名豌鸡,貌似只有一次改的?
In [34]: rdd2.withColumnRenamed("_2","name_string").withColumnRenamed("_3","money_bigint")
Out[34]: DataFrame[_1: bigint, name_string: string, money_bigint: bigint, _4: bigint, _5: double]


# 嘗試在toDF的時(shí)候就寫好名字

In [41]: rdd3 = rdd1.toDF(["id_int","name_string","money_bigint","age_double","tall_float"])

In [42]: rdd3.write.jdbc("jdbc:mysql://xxxx/test", "testalltype", "overwrite", {"
    ...: user":"", "password":""})
    
# 查看

mysql> select * from testalltype;
+--------+-------------+--------------+------------+------------+
| id_int | name_string | money_bigint | age_double | tall_float |
+--------+-------------+--------------+------------+------------+
|      2 | id2         |      2000000 |         13 |       1.22 |
|      1 | id1         |       100000 |         12 |        1.2 |
+--------+-------------+--------------+------------+------------+
2 rows in set (0.00 sec)


# 插入語(yǔ)句可以用append段标,使用另一種方法創(chuàng)建dataframe

In [48]: newline = [(932,'Alice', 1929291,2,22.92)]
In [51]: rdd4 = sqlContext.createDataFrame(newline,['id_int','name
    ...: _string','money_bigint','age_double','tall_float'])

In [52]: rdd4.show()
+------+-----------+------------+----------+-----------+
|id_int|name_string|money_bigint|age_double|tall_float|
+------+-----------+------------+----------+-----------+
|   932|      Alice|     1929291|         2|      22.92|
+------+-----------+------------+----------+-----------+


In [55]: rdd4.write.jdbc("jdbc:mysql://xxxx/test","t
    ...: estalltype","append",{"user":"","password":"
    ...: "})

# 查看

mysql> select * from testalltype;
+--------+-------------+--------------+------------+------------+
| id_int | name_string | money_bigint | age_double | tall_float |
+--------+-------------+--------------+------------+------------+
|      2 | id2         |      2000000 |         13 |       1.22 |
|      1 | id1         |       100000 |         12 |        1.2 |
|    932 | Alice       |      1929291 |          2 |       22.92 |
+--------+-------------+--------------+------------+-------------+



##############使用pyspark+jdbc將本地csv存儲(chǔ)到mysql###########


In [1]: datapath = "dataform.csv"

In [2]: with open(datapath) as f:
   ...:     k = 1
   ...:     parallelizelist = []
   ...:     for line in f.readlines():
   ...:         linesplit = line.strip().split("|")
   ...:         tuple_data = tuple(linesplit)
   ...:         if k == 1:
   ...:             tuple_title = linesplit
   ...:         else:
   ...:             parallelizelist.append(tuple_data)
   ...:
   ...:         k +=1

# 方法1:sqlContext.createDataFrame

In [3]: rdd3 = sqlContext.createDataFrame(parallelizelist,tuple_title)

In [4]: rdd3.write.jdbc("jdbc:mysql://xxxx/test","testallty
   ...: pe","overwrite",{"user":"","password":"
   ...: "})


# 方法2:toDF
In [8]: rdd4 = sc.parallelize(parallelizelist)

In [9]: rdd5 = rdd4.toDF(tuple_title)

In [10]: rdd5.write.jdbc("jdbc:mysql://xxxx/test","testallt
    ...: ype","append",{"user":"","password":""
    ...: })

Spark對(duì)Hive表操作

首先理解下什么是SparkContext, SQLContext 和HiveContext涯冠,原文可參考@pig2--讓你真正理解什么是SparkContext, SQLContext 和HiveContext這位版主很厲害!這里簡(jiǎn)單總結(jié)下

  • SparkContext:用于連接Spark集群逼庞、創(chuàng)建RDD蛇更、累加器(accumlator)、廣播變量(broadcast variables)赛糟,所以說(shuō)SparkContext為Spark程序的根派任,你只要知道它能讓一個(gè)普通的列表編程rdd就行了,非常牛逼璧南,就是傳說(shuō)中的sc掌逛!
  • SparkSQL:是spark的一個(gè)模塊,是spark的一個(gè)模塊司倚,SparkSQL 用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)豆混,所以SparkSQL你的data必須定義schema.在spark1.3.1,sparksql繼承dataframes 和SQL 查詢引擎
    • SQLContext:spark處理結(jié)構(gòu)化數(shù)據(jù)的入口动知。允許創(chuàng)建DataFrame以及sql查詢
    • HiveContext:spark sql執(zhí)行引擎皿伺,集成hive數(shù)據(jù)
In [68]: from pyspark.sql import HiveContext,Row

In [69]: hiveCtx = HiveContext(sc)

In [70]: rows = hiveCtx.sql("SELECT * FROM test.table1 limit
    ...: 5")

In [71]: firstRow = rows.first()
[Stage 32:=====> 
[Stage 32:========>
[Stage 32:==========>
[Stage 32:=============>
[Stage 32:================>
[Stage 32:==================>
[Stage 32:=====================>
[Stage 32:======================>
In [72]: print firstRow.business_id
257

In [73]: print firstRow.order_id
3057564118


In [74]: hiveowntest = HiveContext(sc)

In [75]: rows2 = hiveowntest.sql("SELECT * FROM test.owntest")

In [76]: rows2.show()
+--------+---+
|    name|age|
+--------+---+
|shangsan| 20|
|    lisi| 22|
|  zhouwu| 21|
+--------+---+

# 保存入表,其實(shí)就是講hive表讀入RDD拍柒,然后再寫入新的hive表中
In [79]: rows2.saveAsTable("hive_test_spark")


# 然后進(jìn)入hive中進(jìn)行操作,雖然有點(diǎn)錯(cuò)誤心傀,單還是可以執(zhí)行查詢動(dòng)作

hive> select * from hive_test_spark;
OK
hive_test_spark.name    hive_test_spark.age
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
shangsan    20
lisi    22
zhouwu  21

hive> select * from hive_test_spark where name="shangsan";
OK
hive_test_spark.name    hive_test_spark.age
shangsan    20
Time taken: 0.645 seconds, Fetched: 1 row(s)

Pyspark使用本地文件建立Hive表

1.再另一終端,執(zhí)行如下拆讯,將文件put到集群

# 其中spark_write_hive.txt是本地寫好的文件,之后的操作是put到hdfs上

$ hadoop fs -put spark_write_hive.txt hdfs:/xxxx/xxxx/

# 查看是否put到hdfs上

$ hadoop fs -cat hdfs:/xxxx/xxxx/spark_write_hive.txt

shangsan,20
lisi,30

2.需要測(cè)試的文件已推送到集群上存儲(chǔ)养叛,接下來(lái)是使用spark并將數(shù)據(jù)導(dǎo)入到表中

sqlContext = HiveContext(sc)

********
# 建hive表
sqlContext.sql("CREATE TABLE IF NOT EXISTS hive_test_spark2 (key INT,value STRING)")

# 導(dǎo)入數(shù)據(jù)种呐,注意這里是集群的數(shù)據(jù)
hivenewtable.sql("LOAD DATA INPATH '/xxxx/xxxx/test.txt' INTO TABLE hive_test_spark2")

********
# 還是推薦以下方式建立外表
In [21]: txttohive = HiveContext(sc)

In [22]: txttohive.sql("CREATE EXTERNAL TABLE IF NOT EXISTS hive_t
    ...: est_spark3 (name string,age string ) ROW FORMAT DELIMITED
    ...:  FIELDS TERMINATED BY ','")
Out[22]: DataFrame[result: string]

In [23]: txttohive.sql("LOAD DATA INPATH '/xxxx/xxxx/
    ...: spark_write_hive.txt' INTO TABLE hive_test_s
    ...: park3")
16/12/22 13:49:35 ERROR KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
Out[23]: DataFrame[result: string]

# 查看hive表
hive> select * from hive_test_spark3;
OK
hive_test_spark3.name hive_test_spark3.age
shangsan    20
lisi    30

使用Pyspark 將Hive表轉(zhuǎn)化成rdd操作

from pyspark.sql import HiveContext,Row
hiveCtx = HiveContext(sc)
data_order = hiveCtx.sql("select * from xx.hot_position")
data_order.show(7)

+----+--------+-------+----+-----------+----+-----+---+
|city|  badlng| badlat| num|badcaseprec|year|month|day|
+----+--------+-------+----+-----------+----+-----+---+
| xx市|1x6.3213|x9.8959|3597|      0.857|2017|   05| 10|
| xx市| 1x6.379| x9.865|5775|      0.857|2017|   05| 10|
| xx市|1x1.3198|x1.1937|1269|      0.849|2017|   05| 10|
| xx市|1x1.3199|x1.1937|3387|      0.847|2017|   05| 10|
| xx市|1x6.5509|x9.6083|1092|      0.835|2017|   05| 10|
| xx市| 1x1.354|x1.1988|1482|      0.825|2017|   05| 10|
| xx市|1x0.2131|x0.2915|8215|      0.817|2017|   05| 10|
+----+--------+-------+----+-----------+----+-----+---+
only showing top 7 rows


# 將dataframe轉(zhuǎn)化為rdd進(jìn)行計(jì)算
data_order_rdd=data_order.rdd
data_order_rdd.map(lambda x:(x.badlng,x.badlat)).collect()

# output
[(u'1x6.3213', u'x9.8959'),
 (u'1x6.379', u'x9.865'),
 (u'1x1.3198', u'x1.1937'),
 (u'1x1.3199', u'x1.1937'),
 (u'1x6.5509', u'x9.6083')]
 

當(dāng)然你也可以這樣操作

# 進(jìn)行復(fù)合計(jì)算
# -*- coding: utf-8 -*- 
from math import*
def Distance2(data):# 第二種計(jì)算方法
    lat2=float(data.split("\t")[1])
    lng2=float(data.split("\t")[0])
    lat1=39.8959
    lng1=116.3213
    radlat1=radians(lat1)  
    radlat2=radians(lat2)  
    a=radlat1-radlat2  
    b=radians(lng1)-radians(lng2)  
    s=2*asin(sqrt(pow(sin(a/2),2)+cos(radlat1)*cos(radlat2)*pow(sin(b/2),2)))  
    earth_radius=6378.137  
    s=s*earth_radius  
    if s<0:  
        return -s  
    else:  
        return s

data_order_rdd.filter(lambda x:x.badlat>'31').map(lambda x:x.badlng+'\t'+x.badlat).map(Distance2).collect()


# 輸出
[0.0,
 6.010588654903075,
 1068.8138888545056,
 1068.8177053251293,
 1069.6032160827797,
 1068.969082793839,
 0.07015355066273321]

Spark Streaming

Spark是一個(gè)類似于MapReduce的分布式計(jì)算框架,其核心是彈性分布式數(shù)據(jù)集弃甥,提供了比MapReduce更豐富的模型爽室,可以在快速在內(nèi)存中對(duì)數(shù)據(jù)集進(jìn)行多次迭代,以支持復(fù)雜的數(shù)據(jù)挖掘算法和圖形計(jì)算算法淆攻。Spark Streaming是一種構(gòu)建在Spark上的實(shí)時(shí)計(jì)算框架阔墩,它擴(kuò)展了Spark處理大規(guī)模流式數(shù)據(jù)的能力嘿架。

image

基于云梯Spark on Yarn的Spark Streaming總體架構(gòu)如圖,Spark on Yarn啟動(dòng)后,由Spark AppMaster把Receiver作為一個(gè)Task提交給某一個(gè)Spark Executor啸箫;Receive啟動(dòng)后輸入數(shù)據(jù)耸彪,生成數(shù)據(jù)塊,然后通知Spark AppMaster忘苛;Spark AppMaster會(huì)根據(jù)數(shù)據(jù)塊生成相應(yīng)的Job蝉娜,并把Job的Task提交給空閑Spark Executor 執(zhí)行。圖中藍(lán)色的粗箭頭顯示被處理的數(shù)據(jù)流扎唾,輸入數(shù)據(jù)流可以是磁盤召川、網(wǎng)絡(luò)和HDFS等,輸出可以是HDFS胸遇,數(shù)據(jù)庫(kù)等荧呐。

Spark Streaming的基本原理

將輸入數(shù)據(jù)流以時(shí)間片(秒級(jí))為單位進(jìn)行拆分,然后以類似批處理的方式處理每個(gè)時(shí)間片數(shù)據(jù)纸镊,其基本原理如圖

image

首先坛增,Spark Streaming把實(shí)時(shí)輸入數(shù)據(jù)流以時(shí)間片Δt (如1秒)為單位切分成塊。Spark Streaming會(huì)把每塊數(shù)據(jù)作為一個(gè)RDD薄腻,并使用RDD操作處理每一小塊數(shù)據(jù)星瘾。每個(gè)塊都會(huì)生成一個(gè)Spark Job處理皮假,最終結(jié)果也返回多塊。

Spark Streaming的內(nèi)部原理

使用Spark Streaming編寫的程序與編寫Spark程序非常相似,在Spark程序中蜜笤,主要通過(guò)操作RDD(Resilient Distributed Datasets彈性分布式數(shù)據(jù)集)提供的接口,如map畅蹂、reduce乃坤、filter等,實(shí)現(xiàn)數(shù)據(jù)的批處理弄贿。而在Spark Streaming中春锋,則通過(guò)操作DStream(表示數(shù)據(jù)流的RDD序列)提供的接口,這些接口和RDD提供的接口類似差凹。

image

Spark Streaming把程序中對(duì)DStream的操作轉(zhuǎn)換為DStream Graph

image

對(duì)于每個(gè)時(shí)間片期奔,DStream Graph都會(huì)產(chǎn)生一個(gè)RDD Graph;針對(duì)每個(gè)輸出操作(如print危尿、foreach等)呐萌,Spark Streaming都會(huì)創(chuàng)建一個(gè)Spark action;對(duì)于每個(gè)Spark action谊娇,Spark Streaming都會(huì)產(chǎn)生一個(gè)相應(yīng)的Spark job肺孤,并交給JobManager。JobManager中維護(hù)著一個(gè)Jobs隊(duì)列, Spark job存儲(chǔ)在這個(gè)隊(duì)列中,JobManager把Spark job提交給Spark Scheduler赠堵,Spark Scheduler負(fù)責(zé)調(diào)度Ta

Spark Streaming優(yōu)缺點(diǎn)

優(yōu)點(diǎn)

  1. Spark Streaming 內(nèi)部的實(shí)現(xiàn)和調(diào)度方式高度依賴 Spark 的 DAG 調(diào)度器和 RDD小渊,這就決定了 Spark Streaming 的設(shè)計(jì)初衷必須是粗粒度方式的,同時(shí)茫叭,由于 Spark 內(nèi)部調(diào)度器足夠快速和高效酬屉,可以快速地處理小批量數(shù)據(jù),這就獲得準(zhǔn)實(shí)時(shí)的特性杂靶。
  2. Spark Streaming 的粗粒度執(zhí)行方式使其確卑鸸撸“處理且僅處理一次”的特性,同時(shí)也可以更方便地實(shí)現(xiàn)容錯(cuò)恢復(fù)機(jī)制吗垮。
  3. 由于 Spark Streaming 的 DStream 本質(zhì)是 RDD 在流式數(shù)據(jù)上的抽象垛吗,因此基于 RDD 的各種操作也有相應(yīng)的基于 DStream 的版本,這樣就大大降低了用戶對(duì)于新框架的學(xué)習(xí)成本烁登,在了解 Spark 的情況下用戶將很容易使用 Spark Streaming怯屉。
  4. 由于 DStream 是在 RDD 上的抽象,那么也就更容易與 RDD 進(jìn)行交互操作饵沧,在需要將流式數(shù)據(jù)和批處理數(shù)據(jù)結(jié)合進(jìn)行分析的情況下锨络,將會(huì)變得非常方便。

缺點(diǎn)

  1. Spark Streaming 的粗粒度處理方式也造成了不可避免的延遲狼牺。在細(xì)粒度處理方式下羡儿,理想情況下每一條記錄都會(huì)被實(shí)時(shí)處理,而在 Spark Streaming 中是钥,數(shù)據(jù)需要匯總到一定的量后再一次性處理掠归,這就增加了數(shù)據(jù)處理的延遲,這種延遲是由框架的設(shè)計(jì)引入的悄泥,并不是由網(wǎng)絡(luò)或其他情況造成的虏冻。
  2. Spark Streaming 當(dāng)前版本穩(wěn)定性不是很好。[spark 1.5]

如何使用Spark Streaming

可參考:Spark Streaming 集成 Kafka 總結(jié)

作為構(gòu)建于Spark之上的應(yīng)用框架弹囚,Spark Streaming承襲了Spark的編程風(fēng)格厨相,對(duì)于已經(jīng)了解Spark的用戶來(lái)說(shuō)能夠快速地上手。接下來(lái)以Spark Streaming官方提供的WordCount代碼為例來(lái)介紹Spark Streaming的使用方式鸥鹉。

// scala語(yǔ)言編寫spark streaming

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
 
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
 
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
 
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
 
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()              // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

Spark在機(jī)器學(xué)習(xí)中應(yīng)用

第一步蛮穿,新建python 文件,取名svmwithsgd.py]

# -*- coding:utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
appName ="hellospark" #你的應(yīng)用程序名稱
master= "local"#設(shè)置單機(jī)
conf = SparkConf().setAppName(appName).setMaster(master)#配置SparkContext
sc = SparkContext(conf=conf)

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.strip().split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("newdata.txt")
parsedData = data.map(parsePoint)

# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
#model.save(sc, "myModelPath")
#sameModel = SVMModel.load(sc, "myModelPath")
# 最后兩段會(huì)加載出錯(cuò)
#SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
#SLF4J: Defaulting to no-operation (NOP) logger implementation
#SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

第二步宋舷,進(jìn)行spark-submit 提交任務(wù)

$ spark-submit svmwithsgd.py

Pyspark在工作流中套路

詳見(jiàn)我的另一篇博客Spark日志清洗一般流程


總結(jié)

在實(shí)習(xí)的過(guò)程中绪撵,對(duì)于不同的任務(wù)啟用不同的工具,處理手段祝蝠,技巧等,而對(duì)編程語(yǔ)言的選擇,其實(shí)并不是那么重要绎狭,這是一種實(shí)現(xiàn)形式罷了细溅,效率的核心還是對(duì)數(shù)據(jù)結(jié)構(gòu)和算法的理解上,大數(shù)據(jù)處理在我現(xiàn)在的認(rèn)知范圍內(nèi)儡嘶,只不過(guò)是一種海量數(shù)據(jù)處理的技術(shù)喇聊,就像一臺(tái)機(jī)器不夠算了,那就搞兩臺(tái)蹦狂,n臺(tái)誓篱,機(jī)械硬盤算起來(lái)不夠快了,那就加載到內(nèi)存中算(spark)凯楔,當(dāng)然機(jī)器之間的通信窜骄,任務(wù)的派發(fā),最后的匯總這些也是非常值得琢磨的摆屯,而設(shè)置分配內(nèi)存邻遏,設(shè)置mapreduce個(gè)數(shù)這類的,很多時(shí)候都是靠經(jīng)驗(yàn)來(lái)總結(jié)虐骑,對(duì)數(shù)據(jù)的把握程度准验,預(yù)估上進(jìn)行判斷,如何更快速的處理數(shù)據(jù)并且開銷更低廷没,這里都是屬于性能調(diào)優(yōu)里面的糊饱。總之颠黎,能處理數(shù)據(jù)另锋,了解t和a用法,并不能說(shuō)掌握spark盏缤,只能說(shuō)會(huì)用這個(gè)工具而已砰蠢,而現(xiàn)實(shí)中大多數(shù)任務(wù)只是挑選工具的過(guò)程而已,我們通常是為了close掉任務(wù)而去學(xué)習(xí)一種新的更快的工具唉铜,我想這對(duì)于工具的理解和學(xué)習(xí)來(lái)說(shuō)是十分不利的台舱,而越來(lái)越多的任務(wù)迫使我們沒(méi)有太多的余力靜下心去去更深入的學(xué)習(xí)和了解,記錄和總結(jié)潭流。而我竞惋,并不想這樣。


致謝

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末灰嫉,一起剝皮案震驚了整個(gè)濱河市拆宛,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌讼撒,老刑警劉巖浑厚,帶你破解...
    沈念sama閱讀 206,602評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件股耽,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡钳幅,警方通過(guò)查閱死者的電腦和手機(jī)物蝙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)敢艰,“玉大人诬乞,你說(shuō)我怎么就攤上這事∧频迹” “怎么了震嫉?”我有些...
    開封第一講書人閱讀 152,878評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)牡属。 經(jīng)常有香客問(wèn)我票堵,道長(zhǎng),這世上最難降的妖魔是什么湃望? 我笑而不...
    開封第一講書人閱讀 55,306評(píng)論 1 279
  • 正文 為了忘掉前任换衬,我火速辦了婚禮,結(jié)果婚禮上证芭,老公的妹妹穿的比我還像新娘瞳浦。我一直安慰自己,他們只是感情好废士,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評(píng)論 5 373
  • 文/花漫 我一把揭開白布叫潦。 她就那樣靜靜地躺著,像睡著了一般官硝。 火紅的嫁衣襯著肌膚如雪矗蕊。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,071評(píng)論 1 285
  • 那天氢架,我揣著相機(jī)與錄音傻咖,去河邊找鬼。 笑死岖研,一個(gè)胖子當(dāng)著我的面吹牛卿操,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播孙援,決...
    沈念sama閱讀 38,382評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼害淤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了拓售?” 一聲冷哼從身側(cè)響起窥摄,我...
    開封第一講書人閱讀 37,006評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎础淤,沒(méi)想到半個(gè)月后崭放,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體哨苛,經(jīng)...
    沈念sama閱讀 43,512評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評(píng)論 2 325
  • 正文 我和宋清朗相戀三年莹菱,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了移国。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吱瘩。...
    茶點(diǎn)故事閱讀 38,094評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡道伟,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出使碾,到底是詐尸還是另有隱情蜜徽,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評(píng)論 4 323
  • 正文 年R本政府宣布票摇,位于F島的核電站拘鞋,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏矢门。R本人自食惡果不足惜盆色,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望祟剔。 院中可真熱鬧隔躲,春花似錦、人聲如沸物延。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)叛薯。三九已至浑吟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間耗溜,已是汗流浹背组力。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留抖拴,地道東北人燎字。 一個(gè)月前我還...
    沈念sama閱讀 45,536評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像城舞,于是被迫代替她去往敵國(guó)和親轩触。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容