spark chapter 3 RDD

# 1 什么是RDD

*Resilient Distributed Dataset彈性分布式數(shù)據(jù)集

*Represents an immutable(不可變)partitioned(分區(qū)的)collection of elements that can be operated on in parallel(并行操作).?

* 操作包括: `map`, `filter`, and `persist`(持久化)

*有KV把还,Double够委,Sequence文件類型

什么是不可變?——>對于變量不可修改嘴拢,每次操作之后都會(huì)生成一個(gè)新的變量。

spark編程和scala類似,可以無縫對接

```

abstract class RDD[T: ClassTag](

? ? @transient private var _sc: SparkContext,

? ? @transient private var deps: Seq[Dependency[_]]

? ) extends Serializable with Logging

```

1)抽象類,只有在子類里面實(shí)現(xiàn)了(子類繼承父類)

2)帶泛型的即彪,可以支持多種類型(string,person活尊,user)

單機(jī)存儲/計(jì)算==>分布式存儲/計(jì)算

1數(shù)據(jù)存儲:切割? HDFS BLock

2數(shù)據(jù)計(jì)算:切割(并行計(jì)算)Mapreduce隶校、spark

3存儲和計(jì)算:HDFS/S3 + Mapreduce/Spark

# 2 RDD特性

【面試考點(diǎn)】Internally, each RDD is characterized by five main properties:

- A list of partitions一些列分區(qū)分片

- A function for computing each split可以去計(jì)算每個(gè)分區(qū)分片

rdd.map(_+1)->對每個(gè)分區(qū)做了一個(gè)相同的操作

- A list of dependencies on other RDDs rdd是具有依賴關(guān)系的【重要特點(diǎn)】

rdd1==>rdd2==>rdd3==>=rdd4

rdd1有五個(gè)分區(qū),則rdd2也有五個(gè)分區(qū)蛹锰。如果rdd2第四個(gè)分區(qū)丟失深胳,則數(shù)據(jù)會(huì)從rdd1第四個(gè)分區(qū)再次計(jì)算,而不會(huì)對所有數(shù)據(jù)進(jìn)行計(jì)算铜犬。

- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)舞终。可選屬性癣猾,KV分區(qū)可以按照不同策略去設(shè)置分區(qū)hash or range.etc

?? - Optionally, a list of preferred locations to compute each split on (e.g. block locations for

?? ? an HDFS file)可選屬性敛劝,數(shù)據(jù)在哪兒可以把作業(yè)調(diào)度在數(shù)據(jù)所在的節(jié)點(diǎn)進(jìn)行計(jì)算:數(shù)據(jù)移動(dòng)不如移動(dòng)作業(yè)

用戶可自己創(chuàng)建RDD,從文件中讀取數(shù)據(jù)

* refer課程:hadoop入門實(shí)戰(zhàn)課程

# 3 RDD特性在源碼中的體現(xiàn)

RDD有五個(gè)方法:

## 1 computer-特性2

> DeveloperApi ——開發(fā)API

> Implemented(實(shí)現(xiàn))by subclasses(子類)to compute a given partition.

```

def compute(split: Partition, context: TaskContext): Iterator[T]

```

* parition源碼

```

package org.apache.spark

// An identifier for a partition in an RDD.

trait Partition extends Serializable {

? //Get the partition's index within its parent RDD

? def index: Int

? // A better default implementation of HashCode

? override def hashCode(): Int = index

? override def equals(other: Any): Boolean = super.equals(other)

}

```

面試問題:

1為什么重寫equals方法的時(shí)候需要重寫hashcode

2如何實(shí)現(xiàn)hashset/hashmap

## 2 getPartitions-特性1

```

?protected def getPartitions: Array[Partition]

```

This method will only be called once, so it is safe to implement a time-consuming computation in it.

## 3 getDependencies-特性3

```

?protected def getDependencies: Seq[Dependency[_]] = deps

```

返回一堆依賴關(guān)系

## 4 getPreferredLocations-特性5

```

protected def getPreferredLocations(split: Partition): Seq[String] = Nil

```

## 5 partitioner-特性4

```

val partitioner: Option[Partitioner] = None

```

## + hadoop RDD的實(shí)現(xiàn)

?:: DeveloperApi ::

An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).

protected def getInputFormat(conf: JobConf): InputFormat[K, V]? ——new出來輸入對象纷宇,然后匹配是否符合輸入夸盟,然后返回。

override def getPartitions: Array[Partition] ——調(diào)用getinputformat像捶,然后分塊上陕,檢查是否為空桩砰,返回非空。

對于上面講的在開發(fā)API中這些方法的實(shí)現(xiàn)沒有理解

## + JDBC RDD的實(shí)現(xiàn)

也存在 上述的集中RDD支持的操作

---

兩個(gè)實(shí)現(xiàn)的方式是說每個(gè)實(shí)現(xiàn)里會(huì)有這幾種基本方式的實(shí)現(xiàn)释簿,但是具怎么實(shí)現(xiàn)亚隅,實(shí)現(xiàn)原理是依賴于其本身的設(shè)計(jì)還是spark RDD的設(shè)計(jì)還是不理解。

logging屬于日志接口

def this是附屬構(gòu)造器

#? 4圖解RDD

# 5 Spark Context庶溶、Spark Conf

1創(chuàng)建SparkContext

鏈接到Spark集群煮纵,可以跑在local、standalone渐尿、yarn、mesos

可以通過SC創(chuàng)建RDD矾瑰,或者廣播變量到集群

?在創(chuàng)建sparkcontext之前砖茸,要?jiǎng)?chuàng)建spark conf

不要硬編碼,最好是通過submit傳入進(jìn)來

啟動(dòng)pyshark(在python的bin目錄里殴穴,可以配置環(huán)境變量后凉夯,支持全局可用)

```

./pyshark

```

在創(chuàng)建spark的時(shí)候初始化了sc,sc的master為local[*]

可以spark000:8080查看運(yùn)行狀況

也可以添加其他屬性

```

./bin/pyshark --master local[4] --py-files code.py

```

用ipython啟動(dòng)

```

$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

```

sc.sparkconf

# 6 pyshark腳本解析

?可以用 查詢相關(guān)參數(shù)屬性

./pyshark --help

傳輸參數(shù)

./pyshark --master local[2]

./pyshark --name? "Pyskark" #定義名字

# 7 RDD 創(chuàng)建方式

Once created,?distFile?can be acted on by dataset operations.

## 方法 1 driver

Parallelized Collections采幌,測試場景下使用較多

```

data =? [1,2,3,4,5]

disData = sc.parallelize(data) #把data轉(zhuǎn)換成RDD劲够,默認(rèn)轉(zhuǎn)換成兩個(gè)部分

disData.collect() #查看數(shù)據(jù)里面具體是什么

disData.reduce( lambda a , b : a+b ) #加和所有元素(?那對部分相加怎么辦休傍?)

```

ps:collect征绎,reduce才會(huì)觸發(fā)jobs

One important parameter for parallel collections is the number of*partitions*to cut the dataset into.——也就是說,我們的數(shù)據(jù)是可以切分的磨取。例子如下:

```

disData = sc.parallelize(data人柿,5) #這里的參數(shù)5是表示把RDD切成5個(gè)部分。

```

多分?jǐn)?shù)據(jù)和業(yè)務(wù)邏輯和處理性能有關(guān)忙厌。

每個(gè)CPU可以設(shè)置2-4個(gè)partition

## 方法 2 外部數(shù)據(jù)導(dǎo)入

外部存儲導(dǎo)入

支持讀取的數(shù)據(jù)源格式:HDFS, HBase,S3,Hive凫岖,etc

支持文件類型:text files,?SequenceFiles or any other Hadoop?InputFormat or any data source offering a Hadoop InputFormat

```

# 先在目錄下創(chuàng)建hello.txt文件,內(nèi)容可自定義逢净,記錄下其村吃地址

disFile = sc.textFile("file://本地文件路徑")

# 本地文件

disFile = sc.textFile("file:///home/hadoop/data/hello.txt")

disFile.collection()

#hadoop文件

disFile = sc.textFile("hdfs://hadoop000:8020/hello.txt")

disFile.collection()

#使用示例,求和文件中字段的長度并加和

disFile.map(lambda s:len(s)).reduce(lambda a,b:a+b)


```


*If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes.

注意點(diǎn):如果你的文件在本地哥放,要確保你的運(yùn)行程序在運(yùn)行的節(jié)點(diǎn)上可以訪問本地。一般建議用網(wǎng)路的共享文件系統(tǒng)(use a network-mounted shared file system爹土,類似于S3)甥雕?這就是說 如果在hdfs 上 也算非本地把?

1)上課環(huán)境是單節(jié)點(diǎn)胀茵,hello.txt本地有就可以讀取

2)如果在standalone:Spark集群上:3個(gè)節(jié)點(diǎn)犀农,local path都是表示節(jié)點(diǎn)本地讀取文件。不建議

3)生產(chǎn)上直接用yarm宰掉,不會(huì)用standalone

* 支持整個(gè)文件夾呵哨,或者壓縮文件赁濒。

```

sc.textFile("/my/directory")

sc.textFile("/my/directory/*.gz")

```

* 可以設(shè)置分區(qū)(partitions)數(shù)量,可以設(shè)置更多孟害,單無法設(shè)置更少拒炎。

## python特有的支持類型

*SparkContext.wholeTextFiles

支持全路徑的文件讀取,key是文件村存儲地址挨务,value是文件內(nèi)容

```

sc.wholeTextFiles("file:///home/hadoop/data/hello.txt").collect()

# 讀取內(nèi)容

[(文件路徑击你,文件內(nèi)容)]

```

* RDD.saveAsPickleFile——可以保存為這種文件類SparkContext.pickleFile

* SequenceFile and Hadoop Input/Output Formats

示例 1 序列

```

rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))

rdd.saveAsSequenceFile("path/to/file")

sorted(sc.sequenceFile("path/to/file").collect())

[(1, u'a'), (2, u'aa'), (3, u'aaa')]

```

示例2

```

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar

>>> conf = {"es.resource" : "index/type"}? # assume Elasticsearch is running on localhost defaults

>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",

?? ? ? ? ? ? ? ? ? ? ? ? ? ? "org.apache.hadoop.io.NullWritable",

?? ? ? ? ? ? ? ? ? ? ? ? ? ? "org.elasticsearch.hadoop.mr.LinkedMapWritable",

?? ? ? ? ? ? ? ? ? ? ? ? ? ? conf=conf)

>>> rdd.first()? # the result is a MapWritable that is converted to a Python dict

(u'Elasticsearch ID',

?{u'field1': True,

? u'field2': u'Some Text',

? u'field3': 12345})

```

不知道什么是es

示例3 存儲數(shù)據(jù)

```

data =? [1,2,3,4,5]

disData = sc.parallelize(data)

disData.saveAsTextFile('file:///home/hadoop/data/output/)

```

# 8 Spark 應(yīng)用程序開發(fā)并運(yùn)行

IDE:IDEA pycharm

1 建立一個(gè)新的項(xiàng)目

2 建立一個(gè)py文件

3對py文件的configureations中添加環(huán)境變量

添加spark中pyhton的環(huán)境變量

添加spark本身的環(huán)境變量

4在preference里面的project structure的add content root添加python-lib下的py4j和pyshark兩個(gè)zip包

5使用local進(jìn)行本地測試

6提交pyshark程序

```

?./spark - submit --master local[2]? -name spark -py python文件名

```

# 9 程序運(yùn)行coding小筆記

顯示當(dāng)前全部java進(jìn)程pid的命令jps?

創(chuàng)建hello.txt并對其進(jìn)行編輯vi hello.txt?

添加環(huán)境變量:vi ~/.bashrc

查看一個(gè)文本文件cat hello.txt

hdfs文件傳輸

```

hadoop fs -put /hello.txt

hadoop fs -text /hello.txt

```

?不懂什么時(shí)候需要/ 什么時(shí)候~/ 什么時(shí)候./

refer課程:10個(gè)小時(shí)入門hadoop

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末谎柄,一起剝皮案震驚了整個(gè)濱河市丁侄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌朝巫,老刑警劉巖鸿摇,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異劈猿,居然都是意外死亡拙吉,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進(jìn)店門揪荣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來筷黔,“玉大人,你說我怎么就攤上這事仗颈》鸩眨” “怎么了?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵挨决,是天一觀的道長名眉。 經(jīng)常有香客問我,道長凰棉,這世上最難降的妖魔是什么损拢? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮撒犀,結(jié)果婚禮上福压,老公的妹妹穿的比我還像新娘。我一直安慰自己或舞,他們只是感情好荆姆,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著映凳,像睡著了一般胆筒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天仆救,我揣著相機(jī)與錄音抒和,去河邊找鬼。 笑死彤蔽,一個(gè)胖子當(dāng)著我的面吹牛摧莽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播顿痪,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼镊辕,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蚁袭?” 一聲冷哼從身側(cè)響起征懈,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎揩悄,沒想到半個(gè)月后卖哎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡虏束,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年棉饶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了厦章。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片镇匀。...
    茶點(diǎn)故事閱讀 39,745評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖袜啃,靈堂內(nèi)的尸體忽然破棺而出汗侵,到底是詐尸還是另有隱情,我是刑警寧澤群发,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布晰韵,位于F島的核電站,受9級特大地震影響熟妓,放射性物質(zhì)發(fā)生泄漏雪猪。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一起愈、第九天 我趴在偏房一處隱蔽的房頂上張望只恨。 院中可真熱鬧,春花似錦抬虽、人聲如沸官觅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽休涤。三九已至,卻和暖如春笛辟,著一層夾襖步出監(jiān)牢的瞬間功氨,已是汗流浹背序苏。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留疑故,地道東北人杠览。 一個(gè)月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓,卻偏偏與公主長得像纵势,于是被迫代替她去往敵國和親踱阿。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,652評論 2 354

推薦閱讀更多精彩內(nèi)容