# 1 什么是RDD
*Resilient Distributed Dataset彈性分布式數(shù)據(jù)集
*Represents an immutable(不可變)partitioned(分區(qū)的)collection of elements that can be operated on in parallel(并行操作).?
* 操作包括: `map`, `filter`, and `persist`(持久化)
*有KV把还,Double够委,Sequence文件類型
什么是不可變?——>對于變量不可修改嘴拢,每次操作之后都會(huì)生成一個(gè)新的變量。
spark編程和scala類似,可以無縫對接
```
abstract class RDD[T: ClassTag](
? ? @transient private var _sc: SparkContext,
? ? @transient private var deps: Seq[Dependency[_]]
? ) extends Serializable with Logging
```
1)抽象類,只有在子類里面實(shí)現(xiàn)了(子類繼承父類)
2)帶泛型的即彪,可以支持多種類型(string,person活尊,user)
單機(jī)存儲/計(jì)算==>分布式存儲/計(jì)算
1數(shù)據(jù)存儲:切割? HDFS BLock
2數(shù)據(jù)計(jì)算:切割(并行計(jì)算)Mapreduce隶校、spark
3存儲和計(jì)算:HDFS/S3 + Mapreduce/Spark
# 2 RDD特性
【面試考點(diǎn)】Internally, each RDD is characterized by five main properties:
- A list of partitions一些列分區(qū)分片
- A function for computing each split可以去計(jì)算每個(gè)分區(qū)分片
rdd.map(_+1)->對每個(gè)分區(qū)做了一個(gè)相同的操作
- A list of dependencies on other RDDs rdd是具有依賴關(guān)系的【重要特點(diǎn)】
rdd1==>rdd2==>rdd3==>=rdd4
rdd1有五個(gè)分區(qū),則rdd2也有五個(gè)分區(qū)蛹锰。如果rdd2第四個(gè)分區(qū)丟失深胳,則數(shù)據(jù)會(huì)從rdd1第四個(gè)分區(qū)再次計(jì)算,而不會(huì)對所有數(shù)據(jù)進(jìn)行計(jì)算铜犬。
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)舞终。可選屬性癣猾,KV分區(qū)可以按照不同策略去設(shè)置分區(qū)hash or range.etc
?? - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
?? ? an HDFS file)可選屬性敛劝,數(shù)據(jù)在哪兒可以把作業(yè)調(diào)度在數(shù)據(jù)所在的節(jié)點(diǎn)進(jìn)行計(jì)算:數(shù)據(jù)移動(dòng)不如移動(dòng)作業(yè)
用戶可自己創(chuàng)建RDD,從文件中讀取數(shù)據(jù)
* refer課程:hadoop入門實(shí)戰(zhàn)課程
# 3 RDD特性在源碼中的體現(xiàn)
RDD有五個(gè)方法:
## 1 computer-特性2
> DeveloperApi ——開發(fā)API
> Implemented(實(shí)現(xiàn))by subclasses(子類)to compute a given partition.
```
def compute(split: Partition, context: TaskContext): Iterator[T]
```
* parition源碼
```
package org.apache.spark
// An identifier for a partition in an RDD.
trait Partition extends Serializable {
? //Get the partition's index within its parent RDD
? def index: Int
? // A better default implementation of HashCode
? override def hashCode(): Int = index
? override def equals(other: Any): Boolean = super.equals(other)
}
```
面試問題:
1為什么重寫equals方法的時(shí)候需要重寫hashcode
2如何實(shí)現(xiàn)hashset/hashmap
## 2 getPartitions-特性1
```
?protected def getPartitions: Array[Partition]
```
This method will only be called once, so it is safe to implement a time-consuming computation in it.
## 3 getDependencies-特性3
```
?protected def getDependencies: Seq[Dependency[_]] = deps
```
返回一堆依賴關(guān)系
## 4 getPreferredLocations-特性5
```
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
```
## 5 partitioner-特性4
```
val partitioner: Option[Partitioner] = None
```
## + hadoop RDD的實(shí)現(xiàn)
?:: DeveloperApi ::
An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
protected def getInputFormat(conf: JobConf): InputFormat[K, V]? ——new出來輸入對象纷宇,然后匹配是否符合輸入夸盟,然后返回。
override def getPartitions: Array[Partition] ——調(diào)用getinputformat像捶,然后分塊上陕,檢查是否為空桩砰,返回非空。
對于上面講的在開發(fā)API中這些方法的實(shí)現(xiàn)沒有理解
## + JDBC RDD的實(shí)現(xiàn)
也存在 上述的集中RDD支持的操作
---
兩個(gè)實(shí)現(xiàn)的方式是說每個(gè)實(shí)現(xiàn)里會(huì)有這幾種基本方式的實(shí)現(xiàn)释簿,但是具怎么實(shí)現(xiàn)亚隅,實(shí)現(xiàn)原理是依賴于其本身的設(shè)計(jì)還是spark RDD的設(shè)計(jì)還是不理解。
logging屬于日志接口
def this是附屬構(gòu)造器
#? 4圖解RDD
# 5 Spark Context庶溶、Spark Conf
1創(chuàng)建SparkContext
鏈接到Spark集群煮纵,可以跑在local、standalone渐尿、yarn、mesos
可以通過SC創(chuàng)建RDD矾瑰,或者廣播變量到集群
?在創(chuàng)建sparkcontext之前砖茸,要?jiǎng)?chuàng)建spark conf
不要硬編碼,最好是通過submit傳入進(jìn)來
啟動(dòng)pyshark(在python的bin目錄里殴穴,可以配置環(huán)境變量后凉夯,支持全局可用)
```
./pyshark
```
在創(chuàng)建spark的時(shí)候初始化了sc,sc的master為local[*]
可以spark000:8080查看運(yùn)行狀況
也可以添加其他屬性
```
./bin/pyshark --master local[4] --py-files code.py
```
用ipython啟動(dòng)
```
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
```
sc.sparkconf
# 6 pyshark腳本解析
?可以用 查詢相關(guān)參數(shù)屬性
./pyshark --help
傳輸參數(shù)
./pyshark --master local[2]
./pyshark --name? "Pyskark" #定義名字
# 7 RDD 創(chuàng)建方式
Once created,?distFile?can be acted on by dataset operations.
## 方法 1 driver
Parallelized Collections采幌,測試場景下使用較多
```
data =? [1,2,3,4,5]
disData = sc.parallelize(data) #把data轉(zhuǎn)換成RDD劲够,默認(rèn)轉(zhuǎn)換成兩個(gè)部分
disData.collect() #查看數(shù)據(jù)里面具體是什么
disData.reduce( lambda a , b : a+b ) #加和所有元素(?那對部分相加怎么辦休傍?)
```
ps:collect征绎,reduce才會(huì)觸發(fā)jobs
One important parameter for parallel collections is the number of*partitions*to cut the dataset into.——也就是說,我們的數(shù)據(jù)是可以切分的磨取。例子如下:
```
disData = sc.parallelize(data人柿,5) #這里的參數(shù)5是表示把RDD切成5個(gè)部分。
```
多分?jǐn)?shù)據(jù)和業(yè)務(wù)邏輯和處理性能有關(guān)忙厌。
每個(gè)CPU可以設(shè)置2-4個(gè)partition
## 方法 2 外部數(shù)據(jù)導(dǎo)入
外部存儲導(dǎo)入
支持讀取的數(shù)據(jù)源格式:HDFS, HBase,S3,Hive凫岖,etc
支持文件類型:text files,?SequenceFiles or any other Hadoop?InputFormat or any data source offering a Hadoop InputFormat
```
# 先在目錄下創(chuàng)建hello.txt文件,內(nèi)容可自定義逢净,記錄下其村吃地址
disFile = sc.textFile("file://本地文件路徑")
# 本地文件
disFile = sc.textFile("file:///home/hadoop/data/hello.txt")
disFile.collection()
#hadoop文件
disFile = sc.textFile("hdfs://hadoop000:8020/hello.txt")
disFile.collection()
#使用示例,求和文件中字段的長度并加和
disFile.map(lambda s:len(s)).reduce(lambda a,b:a+b)
```
*If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes.
注意點(diǎn):如果你的文件在本地哥放,要確保你的運(yùn)行程序在運(yùn)行的節(jié)點(diǎn)上可以訪問本地。一般建議用網(wǎng)路的共享文件系統(tǒng)(use a network-mounted shared file system爹土,類似于S3)甥雕?這就是說 如果在hdfs 上 也算非本地把?
1)上課環(huán)境是單節(jié)點(diǎn)胀茵,hello.txt本地有就可以讀取
2)如果在standalone:Spark集群上:3個(gè)節(jié)點(diǎn)犀农,local path都是表示節(jié)點(diǎn)本地讀取文件。不建議
3)生產(chǎn)上直接用yarm宰掉,不會(huì)用standalone
* 支持整個(gè)文件夾呵哨,或者壓縮文件赁濒。
```
sc.textFile("/my/directory")
sc.textFile("/my/directory/*.gz")
```
* 可以設(shè)置分區(qū)(partitions)數(shù)量,可以設(shè)置更多孟害,單無法設(shè)置更少拒炎。
## python特有的支持類型
*SparkContext.wholeTextFiles
支持全路徑的文件讀取,key是文件村存儲地址挨务,value是文件內(nèi)容
```
sc.wholeTextFiles("file:///home/hadoop/data/hello.txt").collect()
# 讀取內(nèi)容
[(文件路徑击你,文件內(nèi)容)]
```
* RDD.saveAsPickleFile——可以保存為這種文件類SparkContext.pickleFile
* SequenceFile and Hadoop Input/Output Formats
示例 1 序列
```
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
rdd.saveAsSequenceFile("path/to/file")
sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
```
示例2
```
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}? # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
?? ? ? ? ? ? ? ? ? ? ? ? ? ? "org.apache.hadoop.io.NullWritable",
?? ? ? ? ? ? ? ? ? ? ? ? ? ? "org.elasticsearch.hadoop.mr.LinkedMapWritable",
?? ? ? ? ? ? ? ? ? ? ? ? ? ? conf=conf)
>>> rdd.first()? # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
?{u'field1': True,
? u'field2': u'Some Text',
? u'field3': 12345})
```
不知道什么是es
示例3 存儲數(shù)據(jù)
```
data =? [1,2,3,4,5]
disData = sc.parallelize(data)
disData.saveAsTextFile('file:///home/hadoop/data/output/)
```
# 8 Spark 應(yīng)用程序開發(fā)并運(yùn)行
IDE:IDEA pycharm
1 建立一個(gè)新的項(xiàng)目
2 建立一個(gè)py文件
3對py文件的configureations中添加環(huán)境變量
添加spark中pyhton的環(huán)境變量
添加spark本身的環(huán)境變量
4在preference里面的project structure的add content root添加python-lib下的py4j和pyshark兩個(gè)zip包
5使用local進(jìn)行本地測試
6提交pyshark程序
```
?./spark - submit --master local[2]? -name spark -py python文件名
```
# 9 程序運(yùn)行coding小筆記
顯示當(dāng)前全部java進(jìn)程pid的命令jps?
創(chuàng)建hello.txt并對其進(jìn)行編輯vi hello.txt?
添加環(huán)境變量:vi ~/.bashrc
查看一個(gè)文本文件cat hello.txt
hdfs文件傳輸
```
hadoop fs -put /hello.txt
hadoop fs -text /hello.txt
```
?不懂什么時(shí)候需要/ 什么時(shí)候~/ 什么時(shí)候./
refer課程:10個(gè)小時(shí)入門hadoop