spark(1)介紹
1. 快速且通用的集群計(jì)算平臺(tái)
- 擴(kuò)充了流行的MapReduce計(jì)算模型
- 基于內(nèi)存(發(fā)現(xiàn)hadoop在迭代式計(jì)算和交互式上的低效)
- 融合優(yōu)點(diǎn):批處理(hadoop)删铃、迭代式運(yùn)算(機(jī)器學(xué)習(xí)系統(tǒng))哎迄、交互式查詢(Hive)、流處理(Storm)粟按,降低了成本
- 和其他大數(shù)據(jù)工具整合很好抹凳,如hadoop袍啡、kafka
2. spark組件
3. Hadoop Spark 差異
- Hadoop離線處理,對(duì)時(shí)效性要求不高
- Spark用于時(shí)效性要求高及機(jī)器學(xué)習(xí)等領(lǐng)域
- Spark不具備HDFS的存儲(chǔ)能力却桶,要借助HDFS等持久化數(shù)據(jù)
spark(2)下載安裝境输、shell
1. 虛擬機(jī)聯(lián)網(wǎng)
2. 下載
3. 解壓
- tar -zxvf spark-1.6.3-bin-hadoop2.6.tgz
4. 目錄
- bin/spark shell(交互性、實(shí)時(shí)性)pyspark和spark-shell
- core颖系、streaming嗅剖、python主要是組件源代碼
- example包含單機(jī)Spark job
5. scala shell
- 讀取文件
val line = sc.textFile("../../testfile/helloSpark") //加載文本文件,返回RDD
line.count() //計(jì)算行數(shù)
line.first() //第一行
6. 修改日志級(jí)別
- 日志輸出太多嘁扼,改為顯示W(wǎng)ARN日志
conf/log4j
cp log4j.properties.template log4j.properties
vi log4j.properties
log4j.rootCategory=WARN,console(wq)
Spark(3)開發(fā)環(huán)境
1. Scala安裝
- Spark 2.0 —— Scala 2.11
2. IDEA安裝
3. IDEA插件
- Scala
- SBT
4. 新建項(xiàng)目
- Scala項(xiàng)目信粮,用SBT打包
Scala 2.10.5 Spark 1.6.2 SBT 0.13.8 JDK 1.8
5. ssh無密碼登錄
- ssh localhost發(fā)現(xiàn)要輸密碼
ssh-keygen
touch authorized_keys(.ssh下)
cat id_rsa.pub > authorized_keys
chmod 600 authorized_keys
- ssh localhost試驗(yàn)是否登錄成功
6. 第一個(gè)程序WordCount
- 創(chuàng)建一個(gè)Spark Context
- 加載數(shù)據(jù)
- 把每一行分割成單詞
- 轉(zhuǎn)換成pairs并且計(jì)數(shù)
- 新建Scala類(object類型)
- build,打成jar包
7. 啟動(dòng)集群
- (spark文件夾下)啟動(dòng)master ./sbin/start-master.sh
- 啟動(dòng)worker ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost.localdomain:7077
worker上打開localhost:8080趁啸,得到 ://localhost.localdomain:7077 - 提交作業(yè) ./bin/spark-submit --master spark://localhost.localdomain:7077 --class WordCount /home/maixia/soft/imoocpro.jar
Spark(4)RDDS
1.Driver program
- 包含程序的main()方法强缘,RDDs的定義和操作
- 管理很多結(jié)點(diǎn)(executors)
2. SparkContext
- Driver programs 通過SparkContext對(duì)象訪問Spark
- SparkContext對(duì)象代表和一個(gè)集群的連接
- 在Shell中SparkContext自動(dòng)創(chuàng)建好了就是sc
3. RDDs基礎(chǔ)
- Resilient distributed datasets(彈性分布式數(shù)據(jù)集)
- 并行分布在整個(gè)集群中,是Spark分發(fā)數(shù)據(jù)和計(jì)算的基礎(chǔ)抽象類
- 代表一個(gè)不可改變的分布式集合對(duì)象,Spart中所有計(jì)算都通過RDDs的創(chuàng)建不傅、轉(zhuǎn)換旅掂、操作完成
- 一個(gè)RDD內(nèi)部有很多partitions(分片)組成,每個(gè)分片包括一部分?jǐn)?shù)據(jù)访娶,partitions可在集群不同節(jié)點(diǎn)上計(jì)算商虐。
- 分片是Spark并行處理的單元,Spark順序地、并行地處理分片
4. RDDs創(chuàng)建
- val rdd = sc.parallelize(Array(1,2,2,4),4) 測(cè)試用
rdd.foreach(println) - val rddText = sc.textFile("helloSpark.txt") 加載外部數(shù)據(jù)集
5. Scala匿名函數(shù) 類型推斷
lines.filter(line => line.contains("world"))
- 定義一個(gè)匿名函數(shù)秘车,接受一個(gè)參數(shù)line
- 使用line這個(gè)String類型變量上的contains方法典勇,并且返回結(jié)果
- line的類型不需要指定,能夠推斷出來
6. Transformation
- 從一個(gè)RDD構(gòu)建一個(gè)新的RDD叮趴,如map()和filter()
- 逐元素的Transformation
- map()割笙,接收函數(shù),把函數(shù)應(yīng)用到RDD的每一個(gè)元素眯亦,返回新RDD
- filter()伤溉,接收函數(shù),返回只包含滿足filter()函數(shù)的元素的新RDD
- flatMap()搔驼,對(duì)每個(gè)輸入元素谈火,輸出多個(gè)輸出元素侈询,將RDD中元素壓扁后返回一個(gè)新的RDD
- 集合運(yùn)算
- 并集:rdd1.union(rdd2)
- 交集:rdd1.intersection(rdd2)
- 差集:rdd1.subtract(rdd2)
- 去重:rdd.distinct()
7. Action
- 在RDD上計(jì)算出一個(gè)結(jié)果舌涨,把結(jié)果返回給driver program或保存在文件系統(tǒng)
- reduce(),接受一個(gè)函數(shù)扔字,作用在RDD兩個(gè)類型相同的元素上囊嘉,返回新元素,可以實(shí)現(xiàn)RDD中元素累加革为、計(jì)數(shù)等聚集操作
- collect()扭粱,遍歷整個(gè)RDD,向driver program返回RDD的內(nèi)容(需要單機(jī)內(nèi)存能夠容納下震檩,因?yàn)閿?shù)據(jù)要拷貝給driver琢蛤,測(cè)試使用,大數(shù)據(jù)時(shí)使用saveAsTextFile())
- take(n)抛虏,返回RDD的n個(gè)元素博其,同時(shí)嘗試訪問最少的partitions(返回結(jié)果無序,測(cè)試使用)
- top()迂猴,排序(根據(jù)RDD中數(shù)據(jù)的比較器)
- foreach()慕淡,計(jì)算RDD中每個(gè)元素,但不返回到本地(一般配合println打印數(shù)據(jù)沸毁,方便測(cè)試)
8. keyValue對(duì)RDDs
- 使用map()函數(shù)峰髓,返回key/value對(duì)(包含整行數(shù)據(jù)的RDD,把每行數(shù)據(jù)的第一個(gè)單詞作為key)
val rdd2 = rdd1.map(line=>(line.split(" ")(0),line))
9. combineByKey()
- 最常用的基于key的聚合函數(shù)息尺,返回類型可以與輸入類型不一樣
- 遍歷partition中的元素携兵,對(duì)于沒見過的key使用createCombiner()函數(shù),對(duì)于見過的使用mergeValue()函數(shù)搂誉,合計(jì)每個(gè)partition的結(jié)果時(shí)使用mergeCombiners()函數(shù)
//把成績(jī)相加再求平均值
val score2=score.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))
val average=scores2.map{case(name,(num,score))=>(name,score/num)}
10. Spark特性
- 血統(tǒng)關(guān)系
- 延遲計(jì)算(Lazy Evaluation)
- action時(shí)才會(huì)真正計(jì)算眉孩,減少數(shù)據(jù)的傳輸
- 內(nèi)部記錄metadata表明transformations操作已被響應(yīng)
- 加載數(shù)據(jù)也是延遲計(jì)算,數(shù)據(jù)只有在必要時(shí)候才會(huì)加載進(jìn)去
- RDD.persist(),默認(rèn)每次在RDD上進(jìn)行action操作時(shí)浪汪,Spark都重新計(jì)算一遍巴柿,如需可重用使用RDD.persist(),unpersist()從緩存中移除
- SER表示序列號(hào)死遭,對(duì)CPU占用高