通過詞頻統(tǒng)計(jì)功能學(xué)習(xí)Spark-submit的使用:
先打開一個(gè)命令窗口輸入nc -lk 9999
然后在另一個(gè)窗口呵哨,spark的bin文件夾下輸入
./spark-submit --master local[2] \
--class org.apache.spark.examples.streaming.NetworkWordCount \
--name NetworkWordCount \
/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop000 9999
在netcat窗口輸入a a a a b b之后再spark窗口的流式輸出會(huì)見到詞頻統(tǒng)計(jì)的結(jié)果吏祸。
sparkStreaming工作原理(粗粒度)
Spark Streaming接收到實(shí)時(shí)數(shù)據(jù)流褐墅,把數(shù)據(jù)按照指定的時(shí)間段切成一片片小的數(shù)據(jù)塊循签,然后把小的數(shù)據(jù)塊傳給Spark Engine處理厢钧。
sparkStreaming工作原理(細(xì)粒度)
首先皿哨,spark應(yīng)用程序運(yùn)行在driver端长酗,driver需要在Executor(電腦)中啟動(dòng)Receiver接收器,接收數(shù)據(jù)流智玻,并且分模塊接收遂唧,可能還會(huì)以副本的方式存儲(chǔ),接收了一個(gè)周期之后吊奢,Executor會(huì)向spark應(yīng)用程序返回接收情況(分塊數(shù)量盖彭,副本數(shù)量等等)應(yīng)用程序會(huì)將任務(wù)分發(fā)到Executor中。
DStream概念:對(duì)DStream進(jìn)行操作,比如map/flatMap,其實(shí)底層會(huì)被翻譯為對(duì)DStream中的每個(gè)RDD都做相同的操作召边,因?yàn)橐粋€(gè)DStream是由不同批次的RDD所構(gòu)成的铺呵。
每一個(gè)輸入流Input DStreamings 都要對(duì)應(yīng)一個(gè)receivers來接收它,Input DStreamings的種類:文件系統(tǒng)隧熙,socket傳輸片挂,Kafka,F(xiàn)lume贞盯。
Output Operation 的種類:print()宴卖,saveAsTextFiles保存到文件系統(tǒng),saveAsHadoopFiles等邻悬。
實(shí)戰(zhàn):spark streaming 處理socket數(shù)據(jù)
object NetworkWorldCount {
? def main(args: Array[String]): Unit = {
? ? val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWorldCount")
? ? val ssc = new StreamingContext(sparkConf, Seconds(5))
? ? val lines = ssc.socketTextStream("localhost",6789)
? ? val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
? ? result.print()
? ? ssc.start()
? ? ssc.awaitTermination()
? }
}
在另外一個(gè)控制臺(tái)里輸入
nc -lk 6789
a a a a c c c d d d?
結(jié)果:
實(shí)戰(zhàn):spark streaming 處理socket數(shù)據(jù)并寫入mysql數(shù)據(jù)庫(kù)
object ForeachRDDApp {
? def main(args: Array[String]): Unit = {
? ? val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
? ? val ssc = new StreamingContext(sparkConf, Seconds(5))
? ? val lines = ssc.socketTextStream("localhost", 6789)
? ? val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//前幾行不變。随闽。
? ? result.foreachRDD(rdd => {? ? ? ?//循環(huán)每一個(gè)Rdd
? ? ? rdd.foreachPartition(partitionOfRecords => {? //在一個(gè)rdd里循環(huán)每一個(gè)partition
? ? ? ? val connection = createCOnnection()? ? ?//獲取mysql連接
? ? ? ? partitionOfRecords.foreach(record => {? ? ? ? //在每一個(gè)partition里獲取一條記錄
? ? ? ? ? val sql = "insert into wordcount(word, wordcount) values('" + record._1+ "'," + record._2+")"
? ? ? ? ? connection.createStatement().execute(sql)
? ? ? ? })
? ? ? ? connection.close()
? ? ? })
? ? })
? ? ssc.start()
? ? ssc.awaitTermination()
? }
? def createCOnnection() = {
? ? Class.forName("com.mysql.jdbc.Driver")
? ? DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark","root","root")
? }
結(jié)果:
spark streaming從socket接收數(shù)據(jù)后根據(jù)標(biāo)準(zhǔn)過濾數(shù)據(jù)實(shí)戰(zhàn)(黑名單例子)
//構(gòu)建黑名單
object TransformApp {
? def main(args: Array[String]): Unit = {
? ? val sparkConf = new SparkConf().setAppName("TransformApp").setMaster("local[2]")
? ? val ssc = new StreamingContext(sparkConf, Seconds(5))
//跟前面一樣
? ? val blacks = List("zs","ls")? ? //構(gòu)建黑名單List
? ? val blackRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))? ? ? ? //將List轉(zhuǎn)成(zs,true)的這種RDD類型
? ? val lines = ssc.socketTextStream("localhost", 6789)? ? ? ? //lines是DSTream類型
? ? val clicklog = lines.map(x => (x.split(",")(1),x)).transform(rdd => {? ? ? ?
//lines是這種類型的數(shù)據(jù)(20160410,zs) 根據(jù)逗號(hào)分隔后重整為(zs:20160410,zs),即為(x.split(",")(1),x))的結(jié)果父丰,得到的結(jié)果仍然是RDD類型,transform函數(shù)是將每個(gè)Rdd拿出來操作掘宪。
? ? ? rdd.leftOuterJoin(blackRDD)? ?//每個(gè)rdd都跟blackRDD進(jìn)行l(wèi)eftOuterJoin蛾扇,得到(zs:[<20160410,zs>,<true>])這種類型的數(shù)據(jù)
? ? ? ? .filter(x=> x._2._2.getOrElse(false) != true)? ? ? ? ?//過濾,將參數(shù)的第二個(gè)中的第二個(gè)為true的過濾掉魏滚。
? ? ? ? .map(x =>x._2._1)? ? ? //重整,將結(jié)構(gòu)變?yōu)閞dd中第二個(gè)的第一個(gè)镀首,即為<20160410,zs>
? ? })
? ? clicklog.print()
? ? ssc.start()
? ? ssc.awaitTermination()
? }
}
在nc -lk 6789中輸入
20160410,zs
20160410,ls
20160410,ww
20160410,zs
20160410,ls
20160410,ww
20160410,zs
20160410,ls
20160410,ww
控制臺(tái)輸出