Spark 學習筆記(一)-RDD編程

創(chuàng)建RDD

把程序中一個已有的集合傳給 SparkContext 的 parallelize() 方法(主要用于測試)

JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));

更常用的方式是從外部讀取數(shù)據(jù)來創(chuàng)建RDD

JavaRDD<String> testFile = js.textFile("G:/sparkRS/readtest.txt");

RDD操作

轉(zhuǎn)化操作:惰性求值嫩痰,返回一 個新的 RDD 的操作丧靡,比如 map() 和 filter(),
行動操作:向驅(qū)動器程序返回結(jié)果或把結(jié)果寫入外部系統(tǒng)的操作,會觸發(fā)實際的計算远豺,比如 count() 和 first()收夸。

  • 轉(zhuǎn)化操作
    只有在行動操作中用到這些 RDD 時才會被計算。許多轉(zhuǎn)化操作都是針對各個元素的验懊,這些轉(zhuǎn)化操作每次只會操作 RDD 中的一個元素。不過并不是所有的轉(zhuǎn)化操作都是這樣的
         //filter
        SparkConf  conf = new SparkConf().setMaster("local").setAppName("My App");
        JavaSparkContext js = new JavaSparkContext(conf);

        JavaRDD<String> lines = js.parallelize(Arrays.asList("coffe","coffe","panda","monkey","tea"));
        long result = lines.filter(x-> x.contains("coffe")).count();
        System.out.println(result); 
        js.close();

        //union將兩個RDD合并
        SparkConf  conf = new SparkConf().setMaster("local").setAppName("My App");
        JavaSparkContext js = new JavaSparkContext(conf);

        JavaRDD<String> lines = js.parallelize(Arrays.asList("coffe","coffe","panda","monkey","tea"));
        JavaRDD<String> lines1 = js.parallelize(Arrays.asList("coffe","coffe","panda","monkey","tea"));
        JavaRDD<String> result = lines.filter(x-> x.contains("coffe"));
        JavaRDD<String> result1 = lines1.filter(x-> x.contains("tea"));
        JavaRDD<String> outcome = result.union(result1);
        System.out.println(outcome.collect());  
        js.close();

通過轉(zhuǎn)化操作尸变,從已有的 RDD 中派生出新的 RDD义图,Spark 會使用譜系圖(lineage graph)來記錄這些不同 RDD 之間的依賴關(guān)系。Spark 需要用這些信息來按需計算每個 RDD召烂,也可以依靠譜系圖在持久化的 RDD 丟失部分數(shù)據(jù)時恢復(fù)所丟失的數(shù)據(jù)

  • 行動操作
    對數(shù)據(jù)進行實際的計算歌溉,行動操作需要生成實際的輸出,它們會強制執(zhí)行那些求值必須用到的RDD轉(zhuǎn)化操作
System.out.println(outcome.collect());  //collect將RDD中的所有數(shù)據(jù)進行收集骑晶,需要大內(nèi)存
System.out.println(outcome.count());
RDD.take(10)

使用 take() 獲取了RDD 中的少量元素集痛垛。然后在本地遍歷這些元素,并在驅(qū)動器端打印出來桶蛔。RDD還有一個 collect() 函數(shù)匙头,可以用來獲取整 個 RDD中的數(shù)據(jù)。只有當你的整個數(shù)據(jù)集能在單臺機器的內(nèi)存中放得下時仔雷,才能使用collect蹂析,因此,collect()不能用在大規(guī)模數(shù)據(jù)集上碟婆。在大多數(shù)情況下电抚,RDD 不能通過 collect() 收集到驅(qū)動器進程中,因為它們一般都很大竖共。每當我們調(diào)用一個新的行動操作時蝙叛,整個 RDD 都會從頭開始計算。要避免這種低效的行為公给,用戶可以將中間結(jié)果持久化

惰性操作

惰性求值意味著當我們對 RDD 調(diào)用轉(zhuǎn)化操作借帘,操作不會立即執(zhí)行蜘渣。 Spark 會在內(nèi)部記錄下所要求執(zhí)行的操作的相關(guān)信息。我們不應(yīng)該把 RDD 看作存放著特定數(shù)據(jù)的數(shù)據(jù)集肺然,而最好把每個 RDD 當作我們通過轉(zhuǎn)化操作構(gòu)建出來的蔫缸、記錄如何計算數(shù)據(jù)的指令列表。把數(shù)據(jù)讀取到 RDD 的操作也同樣是惰性的际起。和轉(zhuǎn)化操作一樣的是拾碌, 讀取數(shù)據(jù)的操作也有可能會多次執(zhí)行。雖然轉(zhuǎn)化操作是惰性求值的街望,但還是可以隨時通過運行一個行動操作來強制 Spark 執(zhí)行 RDD 的轉(zhuǎn)化操作倦沧,比如使用 count()。
Spark 使用惰性求值它匕,這樣就可以把一些操作合并到一起來減少計算數(shù)據(jù)的步驟展融。( Hadoop MapReduce 的系統(tǒng)中,開發(fā)者常吃ゼ恚花費大量時間考慮如何把操作組合到一起告希,以減少 MapReduce 的周期數(shù))

傳遞函數(shù)

Spark 的大部分轉(zhuǎn)化操作和一部分行動操作,都需要依賴用戶傳遞的函數(shù)來計算烧给。支持的三種主要語言中都略有不同(函數(shù)接口)

Java

在 Java 中燕偶,函數(shù)需要作為實現(xiàn)了 Spark 的 org.apache.spark.api.java.function 包中的任 一函數(shù)接口的對象來傳遞,不同返回類型有不同接口


image.png
//匿名類進行函數(shù)傳遞
RDD<String> errors = lines.filter(new Function<String, Boolean>() {   
    public Boolean call(String x) { 
        return x.contains("error"); 
        } });

//使用具名類進行函數(shù)傳遞础嫡,繼承xx接口指么,在實例化時就可自動向上轉(zhuǎn)型當做接口類型
class ContainsError implements Function<String, Boolean> {   
    public Boolean call(String x) { 
        return x.contains("error"); 
        } } 

RDD<String> errors = lines.filter(new ContainsError());

常見的轉(zhuǎn)化操作和行動操作

包含特定數(shù)據(jù)類型的 RDD 還支持一些附加操作,例如榴鼎,數(shù)字類型的 RDD 支持統(tǒng)計型函數(shù)操作伯诬,而鍵值對形式的 RDD 則支持諸如根據(jù)鍵聚合數(shù)據(jù)的鍵值對操作

針對各個元素的轉(zhuǎn)化操作

map() 接收一個函數(shù),把這個函數(shù)用于 RDD 中的每個元素巫财,將函數(shù)的返回結(jié)果作為結(jié)果RDD 中對應(yīng)元素的值
filter() 則接收一個函數(shù)盗似,并將 RDD 中滿足該函數(shù)的 元素放入新的 RDD 中返回


image.png

map() 的返回值類型不需要和輸入類型一樣
對每個輸入元素生成多個輸出元素。 flatMap() 返回值序列的迭代器平项。輸出的 RDD 倒不是由迭代器得到的是一個包含各個迭代器可訪問的所有元素的 RDD赫舒。flatMap() 的一個簡 單用途是把輸入的字符串切分為單詞

//數(shù)組中的iterator方法可以將數(shù)組轉(zhuǎn)換為迭代器
JavaRDD<String> words = word.flatMap(x->Arrays.asList(x.split(",")).iterator() );

偽集合操作

image.png

RDD 中最常缺失的集合屬性是元素的唯一性,因為常常有重復(fù)的元素闽瓢。RDD.distinct() 轉(zhuǎn)化操作來生成一個只包含不同元素的新RDD接癌。distinct() 操作的開銷很大,因為它需要將所有數(shù)據(jù)通過網(wǎng)絡(luò)進行混洗(shuf?e)扣讼,以確保每個元素都只有一份

集合操作 union(other)缺猛,返回一個包含兩個 RDD 中所有元素的 RDD。Spark 的 union() 操作也會包含這些重復(fù)數(shù)據(jù) (可通過 distinct() 實現(xiàn)相同的效果)。
Spark 還提供了交集 intersection(other) 方法枯夜,與union方法相似弯汰,只返回兩個 RDD 中都有的元素艰山。但是intersection() 的性能卻要差很多湖雹,它需要網(wǎng)絡(luò)混洗數(shù)據(jù)發(fā)現(xiàn)共有數(shù)據(jù)
subtract(other) 函數(shù)接收另一個 RDD 作為參數(shù),返回 一個由只存在于第一個 RDD 中而不存在于第二個 RDD 中的所有元素組成的 RDD曙搬。需要數(shù)據(jù)混洗摔吏。
計算兩個 RDD 的笛卡兒積,cartesian(other) 轉(zhuǎn)化操作會返回所有可能的 (a, b) 對纵装。笛卡兒積在我們希望考慮所有可能的組合的相似度時比較有用(產(chǎn)品的預(yù)期興趣程度)征讲,開銷巨大。

image.png

image.png

行動操作

對RDD數(shù)據(jù)進行實際計算
基本 RDD 上最常見的行動操作 reduce()橡娄。接收一個函數(shù)作為參數(shù)诗箍,這個函數(shù)要操作兩個 RDD 的元素類型的數(shù)據(jù)并返回一個同樣類型的新元素

Integer results =  counts.reduce((x,y)->{ return x+y; });

折疊方法fold() 和 reduce() 類似,接收一個與 reduce() 接收的函數(shù)簽名相同的函數(shù)挽唉,再加上一個 “初始值”來作為每個分區(qū)第一次調(diào)用時的結(jié)果滤祖。使用你的函數(shù)對這個初始值進行多次計算不會改變結(jié)果,通過原地修改并返回兩個參數(shù)中的前一個的值來節(jié)約在 fold() 中創(chuàng)建對象的開銷fold() 和 reduce() 都要求函數(shù)的返回值類型需要和我們所操作的 RDD 中的元素類型相同瓶籽。在計算平均值時匠童,需要記錄遍歷過程中的計數(shù)以及元素的數(shù)量,這就需要我們返回一 個二元組塑顺。對數(shù)據(jù)使用 map() 操作汤求,來把元素轉(zhuǎn)為該元素和 1 的二元組

         //reduce求平均
        JavaPairRDD<String,Integer> counts = words.mapToPair(s -> new Tuple2<String, Integer>(s,1));

        //reduce求總數(shù)和總次數(shù),Tuple2的字段_1和_2是final型不能
        //改變严拒,必須有一個可以操作的變量才能對Tuple2中的數(shù)進行計算
        //所以扬绪,先將第一個RDD的Tuple2賦值給a、b
        //然后和y(第二個數(shù))進行計算裤唠,返回第一次調(diào)用的計算結(jié)果
        //然后第一次的計算結(jié)果再和第三個Tuple2進行計算返回第二次的調(diào)用結(jié)果勒奇。。巧骚。
            Tuple2<Integer, Integer> results1 = counts.reduce((x,y)->{
            Integer a = x._1();
            Integer b = x._2();     
            a+=y._1();
            b+=y._2();
            return new Tuple2(a,b);
        });

        //fold求平均赊颠,過程與上大致一樣
        Integer reduce = line.fold(0, (x,y) -> x+y);

aggregate 函數(shù)則把我們從返回值類型必須與所操作的RDD類型相同的限制中解放出來。使用 aggregate() 時劈彪,需要提供我們期待返回的類型(自定義)的初始值竣蹦。然后通過一個函數(shù)把 RDD 中的元素合并起來放入累加器〔着考慮到每個節(jié)點是在本地進行累加的痘括,最終,還需要提供第二個函數(shù)來將累加器兩兩合并。

//用aggregate()來計算RDD的平均值
public class Operation {

    public static void main(String[] args) throws InterruptedException {
        // TODO 自動生成的方法存根
        SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
        JavaSparkContext jsc = new JavaSparkContext(conf);


        JavaRDD<Integer> lines = jsc.parallelize(Arrays.asList(1,2,3,4));
        JavaRDD<Integer> line = jsc.parallelize(Arrays.asList(4,5,6));


        AvgCount a =new AvgCount(0,0);
        Function2<AvgCount,Integer,AvgCount> addAndCount = new Function2<AvgCount,Integer,AvgCount>(){
            private static final long serialVersionUID = 1L;            @Override
        public AvgCount call(AvgCount arg0, Integer arg1) throws Exception {
                // TODO 自動生成的方法存根
                arg0.total += arg1;
                arg0.num += 1;              
                return arg0;
            }   
        };

        Function2<AvgCount,AvgCount,AvgCount> conbine = new Function2<AvgCount,AvgCount,AvgCount>(){
            private static final long serialVersionUID = 1L;
            @Override
            public AvgCount call(AvgCount arg0, AvgCount arg1) throws Exception {
                // TODO 自動生成的方法存根
                arg0.total += arg1.total;
                arg0.num += arg1.num;   
                return arg0;
            }   
        };


        line.aggregate(a,(x,y)->{           
                            x.total += y;
                            x.num += 1;             
                            return x;
                            }, 
                         (x,y)->{               
                            x.total +=y.total;
                            x.num +=y.num;  
                            return x;}
                );

        AvgCount sum = line.aggregate(a, addAndCount, conbine);
        System.out.println( sum.total+":"+sum.num+"--------avg:"+(sum.total/sum.num));

        jsc.close();

    }

}
class AvgCount implements Serializable{
    public int total;
    public int num;
    private static final long serialVersionUID = 3325529460700487293L;
    public AvgCount(int total,int num){
        this.total = total;
        this.num = num;
    }
}

RDD 的一些行動操作會以普通集合或者值的形式將 RDD 的部分或全部數(shù)據(jù)返回驅(qū)動器程序中纲菌。
collect() 通常在單元測試中使用挠日,因為此時 RDD 的整個內(nèi)容不會很大,可以放在內(nèi)存中take(n) 返回 RDD 中的 n 個元素集合翰舌,并且嘗試只訪問盡量少的分區(qū)嚣潜,因此該操作會得到一個不均衡的集合。這些操作返回元素的順序與你預(yù)期的可能不一樣椅贱。這些操作對于單元測試和快速調(diào)試都很有用懂算,但是在處理大規(guī)模數(shù)據(jù)時會遇到瓶頸”勇螅可以用 JSON 格式把數(shù)據(jù)發(fā)送到一個網(wǎng)絡(luò)服務(wù)器上计技,或者把數(shù) 據(jù)存到數(shù)據(jù)庫中。都可以使用 foreach() 行動操作來對 RDD 中的每個元 素進行操作山橄,而不需要把 RDD 發(fā)回本地垮媒。


image.png

在不同RDD類型間轉(zhuǎn)換

有些函數(shù)只能用于特定類型的 RDD,比如 mean() 和 variance() 只能用在數(shù)值 RDD 上航棱, 而 join() 只能用在鍵值對 RDD 上

Java

要從 T 類型的 RDD 創(chuàng)建出一個 DoubleRDD睡雇,我們就應(yīng)當在映射操作中使用 DoubleFunction<T> 來替代 Function<T, Double>


image.png

生成JavaDoubleRDD、計算 RDD 中每個元素的平方值丧诺,這樣就可以調(diào)用 DoubleRDD 獨有的函數(shù)了入桂,比如平均是 mean() 和方差 variance()。

JavaDoubleRDD result = rdd.mapToDouble(   
new DoubleFunction<Integer>() {    
     public double call(Integer x) {      
          return (double) x * x; 
    } }); 
System.out.println(result.mean());

持久化(緩存)

Spark RDD 是惰性求值的驳阎,而有時我們希望能多次使用同一個 RDD抗愁。如果簡單地對 RDD 調(diào)用行動操作,Spark 每次都會重算 RDD 以及它的所有依賴
迭代算法中消耗格外大呵晚,因為迭代算法常常會多次使用同一組數(shù)據(jù)

為了避免多次計算同一個 RDD蜘腌,可以讓 Spark 對數(shù)據(jù)進行持久化。當我們讓 Spark 持久化 存儲一個 RDD 時饵隙,計算出 RDD 的節(jié)點會分別保存它們所求出的分區(qū)數(shù)據(jù)撮珠。如果一個有持久化數(shù)據(jù)的節(jié)點發(fā)生故障,Spark 會在需要用到緩存的數(shù)據(jù)時重算丟失的數(shù)據(jù)分區(qū)金矛。如果希望節(jié)點故障的情況不會拖累我們的執(zhí)行速度芯急,也可以把數(shù)據(jù)備份到多個節(jié)點上。

默認情況下persist會把數(shù)據(jù)以序列化的形式緩存在JVM的堆空間中(實際數(shù)據(jù)區(qū))
Java 中驶俊,默認情況下 persist() 會把數(shù)據(jù)以序列化的形式緩存在 JVM 的堆空間中


image.png
//對result進行緩存
result.persist(StorageLevel.DISK_ONLY) 
result.persist(StorageLevel.DISK_ONLY_2)

persist() 調(diào)用本身不會觸發(fā)強制求值
如果要緩存的數(shù)據(jù)太多娶耍,內(nèi)存中放不下,Spark 會自動利用最近最少使用(LRU)的緩存策略把最老的分區(qū)從內(nèi)存中移除饼酿。對于僅把數(shù)據(jù)存放在內(nèi)存中的緩存級別榕酒,下一次要用到已經(jīng)被移除的分區(qū)時胚膊,這些分區(qū)就需要重新計算。但是對于使用內(nèi)存與磁盤的緩存級別的分區(qū)來說想鹰,被移除的分區(qū)都會寫入磁盤

RDD 還有一個方法叫作 unpersist()紊婉,調(diào)用該方法可以手動把持久化的 RDD 從緩 存中移除

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市辑舷,隨后出現(xiàn)的幾起案子喻犁,更是在濱河造成了極大的恐慌,老刑警劉巖惩妇,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件株汉,死亡現(xiàn)場離奇詭異筐乳,居然都是意外死亡歌殃,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門蝙云,熙熙樓的掌柜王于貴愁眉苦臉地迎上來氓皱,“玉大人,你說我怎么就攤上這事勃刨〔ú模” “怎么了?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵身隐,是天一觀的道長廷区。 經(jīng)常有香客問我,道長贾铝,這世上最難降的妖魔是什么隙轻? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮垢揩,結(jié)果婚禮上玖绿,老公的妹妹穿的比我還像新娘。我一直安慰自己叁巨,他們只是感情好斑匪,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著锋勺,像睡著了一般蚀瘸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上庶橱,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天贮勃,我揣著相機與錄音,去河邊找鬼悬包。 笑死衙猪,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播垫释,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼丝格,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了棵譬?” 一聲冷哼從身側(cè)響起显蝌,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎订咸,沒想到半個月后曼尊,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡脏嚷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年骆撇,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片父叙。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡神郊,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出趾唱,到底是詐尸還是另有隱情涌乳,我是刑警寧澤,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布甜癞,位于F島的核電站夕晓,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏悠咱。R本人自食惡果不足惜蒸辆,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望乔煞。 院中可真熱鬧吁朦,春花似錦、人聲如沸渡贾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽空骚。三九已至纺讲,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間囤屹,已是汗流浹背熬甚。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留肋坚,地道東北人乡括。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓肃廓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親诲泌。 傳聞我的和親對象是個殘疾皇子盲赊,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容