Structure Streaming[Official Document]

1. Overview:

Structured Streaming是基于Spark SQL引擎的可擴(kuò)展娜搂、具有容錯(cuò)性的流處理引擎校摩。系統(tǒng)通過checkpointing和寫Ahead Logs的方式保證端到端的只執(zhí)行一次的容錯(cuò)保證圣猎。Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing。

2. Word Count

下面實(shí)現(xiàn)一個(gè)簡(jiǎn)單的基于Structured Streaming的WordCount程序。程序監(jiān)聽服務(wù)器的TCP Socket营袜,對(duì)監(jiān)聽的內(nèi)容做word count操作。為了測(cè)試丑罪,在服務(wù)器端開啟命令行來(lái)發(fā)送數(shù)據(jù):

$nc -lk 9999

1.首先荚板,為了使用Structured Streaming的API凤壁,需要先引入SparkSQL和Spark Structured Streaming的maven依賴:

    <!-- Apache Spark Structured Streaming -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <!-- Apache Spark Structured SQL -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>

2.代碼實(shí)現(xiàn)(scala):
lines(是一個(gè)DataFrame)代表一個(gè)包含流文本數(shù)據(jù)的無(wú)邊界的表,表只包含一個(gè)字段"value",流數(shù)據(jù)中的每一行就是該表的一條記錄跪另。
scala的實(shí)現(xiàn)方法:

object StructureStreamingWordCount {
  def main(args: Array[String]) {
    //create a local SparkSession
    val spark = SparkSession
      .builder
      .appName("SparkWordCountNetWork")
      .getOrCreate()
    //listen to 10.198.193.189:9999拧抖,獲得來(lái)自該端口的流數(shù)據(jù)。
    import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host","10.198.193.189")
      .option("port",9999)
      .load()
    //需要使用as方法將DataFrame轉(zhuǎn)換成DataSet免绿,才能使用flatMap轉(zhuǎn)換操作唧席。
    val words = lines.as[String].flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()

    val query = wordCounts.writeStream
      //print the complete set of counts
      .outputMode("complete")
      .format("console")
      .start()
    query.awaitTermination()
  }
}
  1. 打包代碼成jar包,提交這個(gè)jar包:
./bin/spark-submit /home/natty/spark/log-analyzer.jar \
com.pmpa.bigdata.spark.app.core.StructureStreamingWordCount

4.通過nc命令發(fā)送句子嘲驾,測(cè)試結(jié)果淌哟。

+------+-----+
| value|count|
+------+-----+
|apache|    1|
|hadoop|    3|
+------+-----+

3. Programming Model

Structured Streaming的核心思想是把實(shí)時(shí)數(shù)據(jù)當(dāng)做一個(gè)持續(xù)寫入的表,這樣一來(lái)辽故,流處理模型和批處理模型就很相似了徒仓。
1.基本概念:
將輸入數(shù)據(jù)流理解為“Input Table”。數(shù)據(jù)流中每一條記錄是新插入input table的一條記錄榕暇。


將數(shù)據(jù)流理解為無(wú)邊界表

針對(duì)輸入的查詢會(huì)生成“Result Table”蓬衡。每個(gè)觸發(fā)間隔(下圖中是1s),新的記錄追加到input table彤枢,并最終更新Result Table狰晚。每當(dāng)Result Table被更新,我們都會(huì)將變化的記錄行寫入外部sink缴啡。


Programming Model

“Output”是指需要寫入外部存儲(chǔ)的內(nèi)容壁晒。output可以定義為3種模式:
2.WordCount程序分析:
對(duì)于上邊的Word Count程序,DataFrame “l(fā)ines”是Input Table业栅,DataFrame “wordCounts”是result table秒咐。
model of wordcount

Spark Structured Streaming模型與其他的流處理引擎有著很大的區(qū)別。很多流系統(tǒng)要求用戶自行執(zhí)行聚合計(jì)算碘裕,所以需要自行保證容錯(cuò)携取、數(shù)據(jù)一致性(至少計(jì)算一次、至多計(jì)算一次帮孔、只計(jì)算一次)雷滋。但是在Spark Structured Streaming中,在有新的數(shù)據(jù)更新時(shí)文兢,Spark來(lái)負(fù)責(zé)更新Result Table晤斩。

4. spark streaming:

Spark
SparkCore
RDD
SparkContext
SparkSQL
DataFrame
SQLContext
SparkStreaming
DStream
將流式數(shù)據(jù)分成很多RDD,按照時(shí)間間隔1s/5s batch
StreamingContext
streaminContext = new StreamingContext(sc, Seconds(5))
HADOOP:
MapReduce
Hive/HBase
Storm

實(shí)時(shí)流式計(jì)算框架
Storm
storm.apache.org
JStorm
捐獻(xiàn)Apache
互聯(lián)網(wǎng)電商企業(yè)姆坚,如果大數(shù)據(jù)的話澳泵,思想:BAT公司都使用的東西,小的公司當(dāng)然也可以解決數(shù)據(jù)分析處理兼呵。

用戶推薦
JAVA/C
Mahout
0.9.x以后兔辅,就不支持MapReduce腊敲,遷移到Spark,2014年初宣布

SparkStreaming
-1,實(shí)時(shí)統(tǒng)計(jì)幢妄,累加
kafka + sparkstreaming(updateStatByKey)
-2,實(shí)時(shí)統(tǒng)計(jì)兔仰,最近一段時(shí)間指標(biāo)
實(shí)時(shí)查看最近一個(gè)小時(shí)之內(nèi)的用戶點(diǎn)擊量,各省或者重點(diǎn)城市
window

拋出問題:
SparkStreaming
-1,數(shù)據(jù)來(lái)源
Socket
Flume
Kafka ---- 最多
val kafkaDStream = streamingContext.kafka()
-2,數(shù)據(jù)處理
DStream#flatMap(),map(),filter(),reduceByKey,...
-3,數(shù)據(jù)輸出(Output)
內(nèi)存數(shù)據(jù)庫(kù)
Redis
HBase
JDBC

        dstream#foreachRDD(rdd => {
          rdd.toDF.write.jdbc
        })

官網(wǎng)案例:
SparkStreamin從Socket接收數(shù)據(jù)蕉鸳,WordCount統(tǒng)計(jì)乎赴,結(jié)果輸出控制臺(tái)

對(duì)于SparkStreaming開發(fā)測(cè)試來(lái)說(shuō),尤其是local
至少是
--master local[2]
-#,2:代表的是啟動(dòng)2個(gè)Thread
-#,需要一個(gè)Thread進(jìn)行接收數(shù)據(jù)
Recevier
-#,另外一個(gè)Thread用于運(yùn)行Task
進(jìn)行數(shù)據(jù)的處理

對(duì)于GraphX
一張圖
頂點(diǎn) -> RDD[(Long, T)]
vertexId
vertexAttributes
邊 -> RDD[(Long, Long, T)]
SourceVertexId
DestVertexId
EdgeAttribute

======================================================
-1,Recevier
block interval
默認(rèn)值200ms
接收的數(shù)據(jù)潮尝,分成block榕吼,進(jìn)行存儲(chǔ)在Executors中

-2,StreamingContext
batch interval
處理數(shù)據(jù)的時(shí)間間隔,每次處理多長(zhǎng)時(shí)間范圍內(nèi)的數(shù)據(jù)
ssc = new StreamingContext(sc, Second(1))

t-0 t-200 t-400 t-600 t-800 t-1000 t-1200 t-1400
[blk-01 blk-02 blk-03 blk-04 blk-05 ] blk-06 blk-07
|
RDD
|
Process

RDD
sc.textFile()
DStream
ssc.textFileStream

==
DataFrame
spark.read.json("")
Struct Streaming
spark.read.streaming.json("")

spark-shell
load script
scala> :load /opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/WordCount.scala

作業(yè):
@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}

使用新的API進(jìn)行實(shí)時(shí)累加統(tǒng)計(jì)

陳超
七牛技術(shù)總監(jiān)
國(guó)內(nèi)Spark技術(shù)布道者
0.9.0

=======================================================
DStream#transform(func)

======================================================
需求:
詞頻統(tǒng)計(jì)
每次統(tǒng)計(jì)最近10秒的數(shù)據(jù)

======================================================
SparkStreaming與Flume和Kafka的集成
Recevier
接收數(shù)據(jù)
針對(duì)Flume來(lái)說(shuō)
Flume Agent
Source -> Chennal -> Sink
將數(shù)據(jù)寫給SparkStreaming
-1,push
將數(shù)據(jù)推個(gè)Recevier
block interval
200ms
blk-01 blk-02 blk-03 ...
Executors
batch interval

    特殊情況的時(shí)候
        -1,應(yīng)用全部停止
        -2,未處理的block所在所有的Executor停止
        
    僅僅接收的了數(shù)據(jù)勉失,但是不能保證所有的接收數(shù)據(jù)全部被處理羹蚣?    
    
Spark 1.3.x開始,性能考慮乱凿,出現(xiàn)
    
-2,pull
    拉取數(shù)據(jù)
    最好
    offset
        10     50
    batch job
        RDD:
            sink:10 -50 
        SparkJob的時(shí)候顽素,直接到FlumeSink這邊拿去數(shù)據(jù),然后處理徒蟆,結(jié)構(gòu)輸出胁出,更新offset
        
    -1,數(shù)據(jù)不會(huì)丟失
    -2,數(shù)據(jù)不會(huì)被重復(fù)處理

bin/spark-shell
--master local[3]
--jars externallibs/mysql-connector-java-5.1.27-bin.jar,
externallibs/spark-streaming-flume_2.10-1.6.1.jar,
externallibs/flume-avro-source-1.5.0-cdh5.3.6.jar,
externallibs/flume-ng-sdk-1.5.0-cdh5.3.6.jar

bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/spark-push-flume.properties -Dflume.root.logger=DEBUG,console

SparkStreaming與Kafka集成
Direct方式
沒有Recevier

batch interval
    RDD
    
Kafka
    Topic
        part-01     part-02     part-03     part-04
          index
SparkJob
    val rdd = invoke-kafka-consumer-api-topic-offset
    rdd.process

Kafka Producer API
JAVA
SCALA

bin/spark-shell
--master local[3]
--jars externallibs/mysql-connector-java-5.1.27-bin.jar,
externallibs/spark-streaming-kafka_2.10-1.6.1.jar,
externallibs/kafka_2.10-0.8.2.1.jar,
externallibs/kafka-clients-0.8.2.1.jar,\
externallibs/zkclient-0.3.jar,
externallibs/metrics-core-2.2.0.jar


import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc, Seconds(5))

// Step 1:Recevier Data From Where
val socketDStream = ssc.socketTextStream("hadoop-senior01.ibeifeng.com", 9999)

// Step 2: Process Data Base DStream
// Split each line into words
val words = socketDStream.flatMap(_.split(" "))

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

sc.stop

/**
 =========================================================================
 */

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc, Seconds(5))

val socketDStream = ssc.textFileStream("/user/beifeng/sparkstreaming/hdfsfiles")

val words = socketDStream.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()             
ssc.awaitTermination()  


///
// foreachFunc: (RDD[T], Time) => Unit
dstream.foreachRDD(rdd => {
  // 將分析的數(shù)據(jù)存儲(chǔ)到JDBC中,MySQL數(shù)據(jù)中
  val connection = createJDBCConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.putStateResult(record) // executed at the worker
  }
})


// 建議的模式段审,
/**
    比如講數(shù)據(jù)結(jié)果寫入到MySQL數(shù)據(jù)庫(kù)的某張表中
    -1,首先有一個(gè)工具類全蝶,專門連接數(shù)據(jù)庫(kù)的一個(gè)連接池,池中有一些連接Connection
    -2,RDD操作時(shí)應(yīng)該針對(duì)每個(gè)分區(qū)進(jìn)行操作
        比如每個(gè)分區(qū)獲取一個(gè)數(shù)據(jù)庫(kù)的Connection
    -3,針對(duì)分區(qū)中的每天數(shù)據(jù)寺枉,一個(gè)個(gè)的插入到數(shù)據(jù)庫(kù)的表中 
    -4,最后將數(shù)據(jù)庫(kù)連接放回到連接池中抑淫,以便其他進(jìn)程使用
*/
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}



/**
 =========================================================================
 */

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc, Seconds(2))

// Step 1:Recevier Data From Where
val socketDStream = ssc.socketTextStream("hadoop-senior01.ibeifeng.com", 9999)

// Step 2: Process Data Base DStream
// Split each line into words
val words = socketDStream.flatMap(_.split(" "))

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), Seconds(8), Seconds(4))

// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

sc.stop



// ===============================================

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._

val ssc = new StreamingContext(sc, Seconds(5))

// Step 1:Recevier Data From Where
// Flume: FlumeUtils, Kafka: KafkaUtils
val flumeDStream = FlumeUtils.createStream(ssc, "hadoop-senior01.ibeifeng.com", 9988).map(event => new String(event.event.getBody.array()))

// Step 2: Process Data Base DStream
// DStream[Long] 
val words = flumeDStream.flatMap(_.split(" "))

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
    
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

sc.stop

5. spark sql

思考:
Hive 實(shí)質(zhì)什么?
Hive 如何將SQL轉(zhuǎn)換為MapReduce姥闪?

Oozie:
    MapReduce Action

Dremel
-1,Presto
-2,Impala
游戲公司
--1,yum rpm 安裝
--2,CM
Flume + Kafka + HBase + Impala + JAVA + Python
-3,Drill

研發(fā)性學(xué)習(xí):
http://kylin.apache.org/

====================================================
Spark SQL 前世今生
-1,1.0版本以前
Shark = Hive on Spark
-2,1.0.x版本
Spark SQL
alpha 版本
-3,1.3.x版本
DataFrame
release版本
-4,1.5.x版本
鎢絲計(jì)劃
-5,1.6.x版本
DataSet
-6,2.0版本
始苇。。筐喳。埂蕊。

總結(jié)一點(diǎn):
Spark SQL開始的話,替換Hive底層疏唾,spark SQL與Hive完全兼容,尤其HQL語(yǔ)句函似。

Shark
-1,Spark SQL
alpha 版本
-2,Hive on Spark

Hive
-1,MapReduce
-2,Spark
-3,Tez

====================================================

Spark SQL 處理Hive表中的數(shù)據(jù)
-1,MetaStore
MySQL數(shù)據(jù)庫(kù)
hive-site.xml
-2,數(shù)據(jù)庫(kù)驅(qū)動(dòng)
mysql-*.jar

bin/spark-shell --master local[2] --jars /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar

bin/spark-sql --master local[2] --jars /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar

====================================================

-1,對(duì)于Spark Core來(lái)說(shuō)槐脏,應(yīng)用程序的入口
SparkContext
-2,對(duì)于Spark SQL來(lái)說(shuō),應(yīng)用程序的入口
SQLContext -> HiveContext
SparkContext

--1, SparkCore: RDD
scala> case class People(name: String, age: Int)
defined class People

scala> val rdd = sc.textFile("/user/beifeng/people.txt").map(line => line.split(",")).map(x => People(x(0), x(1).trim.toInt))

rdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at <console>:29

--2,SparkSQL: DataFrame
scala> val df = sqlContext.read.json("/user/beifeng/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> val hive_emp_df = sqlContext.read.table("default.emp")
hive_emp_df: org.apache.spark.sql.DataFrame = [empno: int, ename: string, job: string, mgr: int, hiredate: string, sal: double, comm: double, deptno: int]

scala> hive_emp_df.schema
res2: org.apache.spark.sql.types.StructType = StructType(StructField(empno,IntegerType,true), StructField(ename,StringType,true), StructField(job,StringType,true), StructField(mgr,IntegerType,true), StructField(hiredate,StringType,true), StructField(sal,DoubleType,true), StructField(comm,DoubleType,true), StructField(deptno,IntegerType,true))

==========================================
如何創(chuàng)建DataFrame
-1,外部數(shù)據(jù)源-內(nèi)置
--1,json
--2,hive
--3,jdbc
--4,parquet/orc
--5,text -沒有用處
-2,從RDD轉(zhuǎn)換
--1,方式一:
RDD[CACECLASS]
--2,方式二:
自定義schema
-3,外部數(shù)據(jù)源 - 需要自己依據(jù)接口開發(fā)
--1,ES
檢索
--2,HBase
華為撇寞,開源
--3,solr
...

scala> val hive_dept_df = sqlContext.read.table("default.dept")

def jdbc(url: String, table: String, connectionProperties: Properties)
需求:
將DataFrame數(shù)據(jù)寫到MYSQL數(shù)據(jù)庫(kù)表中

val url = "jdbc:mysql://hadoop-senior01.ibeifeng.com:3306/test?user=root&password=123456"
import java.util.Properties
val props = new Properties()

hive_dept_df.write.jdbc(url, "tb_dept", props)

問題:
scala> hive_dept_df.write.jdbc(url, "tb_dept", props)
java.sql.SQLException: No suitable driver
at java.sql.DriverManager.getDriver(DriverManager.java:278)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:49)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)
解決方案:
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/externallibs/mysql-connector-java-5.1.27-bin.jar
bin/spark-shell --master local[2]

案例:
對(duì)不同數(shù)據(jù)源的表進(jìn)行JOIN顿天,一個(gè)是在Hive中堂氯,一個(gè)是在MySQL
-1,emp
hive
-2,dept
mysql
-3,join

val url = "jdbc:mysql://hadoop-senior01.ibeifeng.com:3306/test?user=root&password=123456"
import java.util.Properties
val props = new Properties()

val mysql_dept_df = sqlContext.read.jdbc(url, "tb_dept", props)

val hive_emp_df = sqlContext.read.table("default.emp")

// join
val join_df = hive_emp_df.join(mysql_dept_df, "deptno")

join_df.registerTempTable("join_emp_dept")

sqlContext.sql("select empno, ename, deptno, deptname, sal from join_emp_dept order by empno").show

===========================================================
如何從RDD創(chuàng)建DataFrame
方式一:
The first method uses reflection to infer the schema of an RDD that contains specific types of objects.
簡(jiǎn)單說(shuō)法:
RDD[CASE CLASS]
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

方式二:
The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD.
a DataFrame can be created programmatically with three steps:
--1,RDD -> RDD[Row]
import org.apache.spark.sql._
演示:
val rdd = sc.textFile("/user/beifeng/people.txt")
import org.apache.spark.sql._
val rowRdd = rdd.map(line => line.split(", ")).map(x => Row(x(0), x(1).toInt))
--2,Create the schema
import org.apache.spark.sql.types._
val schema = StructType(StructField("name",StringType,true) :: StructField("age",IntegerType,true) :: Nil)
--3,Apply the schema to the RDD of Rows
val people_df = sqlContext.createDataFrame(rowRdd, schema)

=========================================================
Spark 1.6
Dataset
數(shù)據(jù)集

相關(guān)文章:
-1,https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
-2,https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

val url = "jdbc:mysql://hadoop-senior01.ibeifeng.com:3306/test"
import java.util.Properties
val props = new Properties()
props.put("user","root")
props.put("password","123456")

val mysql_dept_df = sqlContext.read.jdbc(url, "tb_dept", props)

Spark SQL & ES
http://blog.csdn.net/stark_summer/article/details/49743687

回顧SparkSQL
-1,處理的數(shù)據(jù)90%來(lái)源Hive表
Hive數(shù)據(jù)倉(cāng)庫(kù)
-2,過程
HADOOP -> HIVE -> SparkSQL
建議:
編寫項(xiàng)目時(shí),最后寫上使用SparkSQL在調(diào)研牌废,在運(yùn)行咽白,嘗試

-3,程序入口
    SQLContext & HiveContext
    DataFrame
        DataFrame -> Dataset -> RDD
    -a,RDD[ROW] = DataFrame
        創(chuàng)建兩種方式
        -#,CASE CLASS
        -#,RDD[ROW],schema(StructType,StructField)
    -b,外部數(shù)據(jù)源
        sqlContext.read.
            .json
            .parquet
            .jdbc
            .table
        方便了異步數(shù)據(jù)源的數(shù)據(jù)連接處理JOIN
    -c,與Hive的集成
        sqlContext.sql("select * from emp").show
    -d,ThriftServer
        多,啟動(dòng)一個(gè)Spark Application,多個(gè)客戶連接
        beeline
        jdbc
    -e,存儲(chǔ)分析結(jié)果
        dataframe.write
            .jdbc
            .table
            .text
            .json

========================================================
Spark如何與HBase進(jìn)行集成鸟缕?晶框?
Spark 如何與HBase進(jìn)行交互?
1:讀數(shù)據(jù)
2: 寫數(shù)據(jù)

-1,SparkCore
    read:
        hbase-table -> RDD
    write:
        RDD -> Cell
    
    作業(yè):
        如何使用saveAsNewAPIHadoopDataset將Spark RDD數(shù)據(jù)保存到HBase表中懂从。
    
-2,SparkSQL
    SparkSQL -> Hive  -> HBase
    此方式
        目前來(lái)說(shuō)授段,僅限于讀取,不建議寫入番甩,可能有問題侵贵。
    研發(fā)型作業(yè):
        測(cè)試集成方式
        注意:
            -#,Hive與HBase集成時(shí),需要將HBASE CLIENT相關(guān)JAR包放入到$HIVE_HOME/lib
            -#,SPARKSQL名義看是讀取Hive表的數(shù)據(jù),其實(shí)讀取的還是HBase表的數(shù)據(jù)缘薛,需要將HBASE CLIENT相關(guān)JAR包放入CLASSPATH
    
-3,SparkSQL External DataSource API
    https://github.com/Huawei-Spark/Spark-SQL-on-HBase

MapReduce 框架中
input -> map -> shuffle -> reduce -> output

1:intput
    讀取要處理的數(shù)據(jù)
    // set input path
    FileInputFormat.setPath(new Path("/user/beifeng/mr-wc-in"))
    
    (Key, Value)
        Key:
            offset 偏移量
        value:
            每一行的數(shù)據(jù)
            
    RecordReader
        每條記錄讀取

    TextInputFormat
        LineRecordReader

MapReduce API
從HADOOP 0.20.0版本開始窍育,有了新的API
-#,old
org.apache.hadoop.mapred.*
Mapper\Reducer -> Interface
-#,new
org.apache.hadoop.mapreduce.*
Mapper\Reducer -> class

val rdd = sc.textFile("Readme.md")

Spark去讀取HDFS(HFile)上的文件數(shù)據(jù)時(shí),與MapReduce讀取數(shù)據(jù)時(shí)一樣一樣的

HBase與MapReduce集成
-1,Mapper
TableMapper
Mapper從HBase表中讀取數(shù)據(jù)
(ImmutableBytesWritable, Result)
mapreduce如果從HBase表讀取數(shù)據(jù)宴胧,也是一條一條的讀取漱抓,
TableInputFormat
TableRecordReader

===================================================
SparkSQL
-1,SparkSQL中的聚合函數(shù)

在DataFrame類中,有如下五組函數(shù)

  • @groupname basic
    Basic DataFrame functions
  • @groupname dfops
    Language Integrated Queries
  • @groupname rdd
    RDD Operations
  • @groupname output
    Output Operations
  • @groupname action
    Actions

SQL and DSL(Domain )
emp_df.agg("sal" -> "avg", "comm" -> "max").show
emp_df.agg(Map("sal" -> "avg", "comm" -> "max")).show
emp_df.agg(max($"comm"), avg($"sal")).show

emp_df.groupBy($"deptno").agg(max($"comm"), avg($"sal")).show


-2,自定義函數(shù)UDF
    回顧一下:
        Hive中自定義函數(shù)
            -1,extends UDF
            -2,override evaluate
    對(duì)于Spark來(lái)說(shuō)
        SCALA牺汤,函數(shù)式編程
        匿名函數(shù)

if函數(shù)
if(condition, true-value, false-value)

sqlContext.udf.register(
"trans_comm", // 函數(shù)名稱
(comm: Double) => {
if(comm.toString == ){
0.0
}else{
comm
}
}// 函數(shù)
)

===========================================================
如何自定義UDAF辽旋?
avg:
求平均值
多 對(duì) 一
輸入多個(gè)值,輸出一個(gè)值

平均值:
    -0,total = 0.0  // 3000 + 2500 + 4200 + 2100 + 5000
    -1,count = 0 // 5
    
    緩沖數(shù)據(jù)

Spark/MapReduce
block block block
| | |
part-01 part-02 part-03
| | |
Task Task Task
avg avg avg

sqlContext.udf.register(
"avg_sal",
AvgSalUDAF
)

sqlContext.sql("select deptno, avg(sal) avg_sal, avg_sal(sal) as_self from emp group by deptno").show

==================================================
Hive中分析函數(shù)
http://lxw1234.com/archives/tag/hive-window-functions
SparkSQL同樣也支持
Spark 1.4.0
SPARK-1442: Window functions in Spark SQL and DataFrames

ROW_NUMBER

針對(duì)emp表來(lái)說(shuō)
    獲取各部門中工資Top3的人員信息
-1,按照部門進(jìn)行分組
    group by deptno
-2,對(duì)各部門人員工資降序排序
    order by sal desc
-3,獲取前幾個(gè)
    TopKey

SELECT
empno, ename, sal, deptno,
ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk
FROM
emp ;

empno ename sal deptno rnk
7839 KING 5000.0 10 1
7782 CLARK 2450.0 10 2
7934 MILLER 1300.0 10 3

7788 SCOTT 3000.0 20 1
7902 FORD 3000.0 20 2
7566 JONES 2975.0 20 3
7876 ADAMS 1100.0 20 4
7369 SMITH 800.0 20 5

7698 BLAKE 2850.0 30 1
7499 ALLEN 1600.0 30 2
7844 TURNER 1500.0 30 3
7521 WARD 1250.0 30 4
7654 MARTIN 1250.0 30 5
7900 JAMES 950.0 30 6

SELECT
empno, ename, sal, deptno
FROM(
SELECT
empno, ename, sal, deptno,
ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk
FROM
emp
) t
WHERE
t.rnk <= 3 ;

empno ename sal deptno
7839 KING 5000.0 10
7782 CLARK 2450.0 10
7934 MILLER 1300.0 10

7788 SCOTT 3000.0 20
7902 FORD 3000.0 20
7566 JONES 2975.0 20

7698 BLAKE 2850.0 30
7499 ALLEN 1600.0 30
7844 TURNER 1500.0 30

數(shù)據(jù)統(tǒng)計(jì)分析:
-1,過濾檐迟,清洗
-2,分組
-3,統(tǒng)計(jì)
-4,排序
-5,TopKey

6 spark core

關(guān)鍵知識(shí)
-1,MapReduce
思想
shuffle
-2,Hive
ETL
Spark SQL
前世今生
數(shù)據(jù)倉(cāng)庫(kù)
整理數(shù)據(jù)
DataFrame

===================================================================
Spark 課程安排:
-1,SCALA
一天課程
-2,Spark Core
兩天課程
-3,SparkSQL
一天課程
-4,Spark Streaming
一天課程
-5,項(xiàng)目
三天課程
-6,Spark MLlib(可選)
一天課程

===========================================================

騰訊:
08年的時(shí)候补胚,幾百臺(tái)服務(wù)器,日志數(shù)據(jù)信息追迟。

分析日志數(shù)據(jù):
-1,服務(wù)器是否出現(xiàn)故障溶其,運(yùn)維

-2,為產(chǎn)品的設(shè)計(jì)
    產(chǎn)品經(jīng)理

階段一:
Python + MySQL
實(shí)現(xiàn):
-1,每天服務(wù)器上面,都會(huì)安裝MySQL數(shù)據(jù)庫(kù)
日志數(shù)據(jù)存儲(chǔ)到MySQL數(shù)據(jù)庫(kù)表中
--1,Python
腳本
--2,某個(gè)條件敦间,對(duì)日志數(shù)據(jù)進(jìn)行分類
不同的數(shù)據(jù)存儲(chǔ)到不同的MySQL數(shù)據(jù)庫(kù)表
-2,基于SQL進(jìn)行分析
分散 -> 存儲(chǔ) -> 分析 -> 聚合 -> 分散 -> 存儲(chǔ) -> 分析

階段二:
HADOOP + Hive
實(shí)現(xiàn):
-1,將日志數(shù)據(jù)放到HDFS中瓶逃,使用MapReduce進(jìn)行數(shù)據(jù)分類、清洗廓块、過濾
數(shù)據(jù)加載到Hive表中
-2,HiveQL
批處理厢绝,離線分析妖滔,速度相對(duì)來(lái)說(shuō)力细,較慢认罩,尤其針對(duì)數(shù)據(jù)量非常大的時(shí)候衷佃。

業(yè)務(wù)需求的增加:
    機(jī)器學(xué)習(xí)榆鼠,數(shù)據(jù)分析事期,數(shù)據(jù)挖掘
        ---1,迭代計(jì)算
            循環(huán)

        MapReduce:
            Map Task -> local filesystem -> Reduce Task -> hdfs 

    基于MapReduce的機(jī)器學(xué)習(xí)框架
        Mahout
        --0,在2014年上半年官方就說(shuō)了溃睹,底層不在支持MapReduce丑婿,支持Spark

階段三:
HADOOP + SPARK
實(shí)現(xiàn):
-1,數(shù)據(jù)還是存儲(chǔ)在HDFS/Hive

    -2,數(shù)據(jù)分析
        Spark
        其他框架作為輔助
        Hive + Python + 。娄周。涕侈。。

為什么Spark煤辨?裳涛??掷酗?调违??泻轰?
    數(shù)據(jù)結(jié)構(gòu):
        RDD
            -1,集合(List[Type])
            -2,數(shù)據(jù)存儲(chǔ)在內(nèi)存中
            -3,分區(qū)存儲(chǔ)
                hdfs 分塊block
            -4,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行運(yùn)算
                mapreduce input -> block : map task

            input  -> process -> output

MapReduce:
input -> (map -> shuffle -> reduce) -> output
Spark:
input -> (rdd -> rdd -> rdd -> rdd -> ...) -> output

val list = List(1,2,3,4)
list.map(x => x * 2)

AMPLab
A:算法 M: 機(jī)器 P:人類

Spark 學(xué)習(xí)建議:
-1,把握核心
RDD
DataFrame
DStream
-2,思考技肩,源碼,對(duì)比
案例

=============================================================

Apache Spark? is a fast and general engine for large-scale data processing.
關(guān)鍵詞:
-1,海量數(shù)據(jù)處理計(jì)算框架
核心:數(shù)據(jù)結(jié)構(gòu)RDD
-2,fast
速度快
比較性:
MapReduce

        比較的數(shù)據(jù):
            相同數(shù)據(jù)浮声,相同邏輯虚婿,不同的編碼
            --1,內(nèi)存
                100+
            --2,磁盤
                10+
-3,general
    通用

HADOOP生態(tài)系統(tǒng):
MapReduce Hive Mahout Storm Graphi

Spark 生態(tài)棧:
Core SparkSQL MLlib SparkStreaming Graphx
JAR包:
庫(kù)

-1,Spark Application運(yùn)行everywhere
local、yarn泳挥、memsos然痊、standalon、ec2
-2,處理數(shù)據(jù)
來(lái)源一切(Spark SQL:json\parquet\orc\xml\jdbc\solr\es\tsv...)
hdfs\hive\hbase...

==============================================================

Spark Timeline
-1,Spark 1.0以前
0.9

-2,1.0
版本發(fā)布

-3屉符,Spark 2015年的發(fā)展
1.2\1.3\1.4\1.5\1.6
五大版本
1.3
Spark SQL:DataFrame
鎢絲計(jì)劃
性能優(yōu)化
-4,Spark 2.0
更加簡(jiǎn)單剧浸、更加快速、更加智能

學(xué)習(xí)Spark三大網(wǎng)站:
-1,官方文檔
http://spark.apache.org/
-2,源碼
https://github.com/apache/spark
-3,官方博客
https://databricks.com/blog

Hive 執(zhí)行引擎:
-1,MapReduce
-2,Spark
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
-3,Tez
Hortonworks

針對(duì)APACH HADOOP
./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0 -Phive -Phive-thriftserver -Pyarn

針對(duì)CDH HADOOP
./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.6 -Phive -Phive-thriftserver -Pyarn

========================================================
Local
YARN/Standalone

--Spark Local
-1,JAVA
-2,HDFS
-3,SCALA
-4,Spark

大記状V印:
bin/spark-submit
類似于
bin/yarn
提交Spark Application的腳本就是spark-submit

spark-shell
spark-submit

=====================================================
所有的Spark程序來(lái)說(shuō)唆香,程序的入口是SparkContext

分析數(shù)據(jù):
step 1: input
rdd = sc.textFile("/user/beifeng/xx")
step 2: process
rdd.map().filter().reduceByKey()
step 3: output
rdd.saveAsTextFile()

16/06/26 20:01:56 INFO BlockManagerMaster: Registered BlockManager
16/06/26 20:01:56 INFO SparkILoop: Created spark context..
Spark context available as sc.

rdd = sc.textFile("/user/beifeng/xx")
類比:
默認(rèn)情況下,MapTask如何讀取數(shù)據(jù)吨艇,一行一行的讀取數(shù)據(jù)<key,value>
在Spark中
也是一行一行的讀取數(shù)據(jù)躬它,每行數(shù)據(jù)是String

WortCount
-1,MapReduce
-2,Spark
-3,Flink
http://flink.apache.org/

input

val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")

process

val wordCountRdd = rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a, b) => (a + b))

output

wordCountRdd.saveAsTextFile("/user/beifeng/spark/wordcount/output")

=====================
-1,不是說(shuō)MapReduce框架,思想不好东涡,而不是不適合做某些事情
(key,value)
-2,Spark 數(shù)據(jù)處理中
我們更多是與MapReduce類似冯吓,將數(shù)據(jù)轉(zhuǎn)換為(key,value)進(jìn)行處理

val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")

rdd.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)

==================================================================

總結(jié):
-1,認(rèn)識(shí)Spark
RDD
--1,有哪些?
--2,用哪些疮跑?
Spark Applicaiton???
--1,YARN
目前
--2,Standalone/Mesos
自身帶分布式資源管理管理和任務(wù)調(diào)度

hadoop 2.x release 2.2.0 2013/10/15
hadoop 2.0.x - al
hadoop 2.1.x - beta

Cloudera
cdh3.x - 0.20.2
cdh4.x - 2.0.0
HDFS -> HA:QJM ; Federation
Cloudera Manager 4.x
cdh5.x

================================================

Spark Standalon Mode
認(rèn)為
Spark 本身自帶的一個(gè)分布式資源管理系統(tǒng)以及任務(wù)調(diào)度的框架

類似于YARN這樣的框架
分布式
主節(jié)點(diǎn):
Master - ResourceManager
從節(jié)點(diǎn):
Works - NodeManagers

start-slaves.sh
啟動(dòng)所有的從節(jié)點(diǎn)组贺,也就是Work
注意:
使用此命令時(shí),運(yùn)行此命令的機(jī)器祖娘,必須要配置與其他機(jī)器的SSH無(wú)密鑰登錄锣披,否則啟動(dòng)的時(shí)候會(huì)出現(xiàn)一些問題,比如說(shuō)輸入密碼之類的。

對(duì)于Spark Application
兩部分組成:
-1,Driver Program -> 4040 4041 4042
main
SparkContext ---最最重要
-2,Executor
JVM(進(jìn)程)
運(yùn)行我們Job的Task

REPL:
shell交互式命令

Spark Application
Job-01
count
Job-02
Stage-01
Task-01(線程) -> Map Task(進(jìn)程)
Task-02(線程) -> Map Task(進(jìn)程)
每個(gè)Stage中的所有Task雹仿,業(yè)務(wù)都是相同的,處理的數(shù)據(jù)不同
Stage-02
Job-03

從上述運(yùn)行程序案例來(lái)看:
如果RDD調(diào)用的函數(shù)整以,返回值不是RDD的時(shí)候胧辽,就會(huì)觸發(fā)一個(gè)JOB,進(jìn)行執(zhí)行公黑。

思考:
reduceByKey 到底做了什么事情呢邑商?
-1,分組
將相同key的value放在一起
-2,對(duì)value進(jìn)行reduce
進(jìn)行合并

經(jīng)分析,對(duì)比MapReduce中WordCount程序運(yùn)行凡蚜,推斷出Spark JOB中Stage的劃分依據(jù)RDD之間是否產(chǎn)生Shuffle進(jìn)行劃分的人断。

=============================================================

val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")

val wordCountRdd = rdd.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)

需求:
安裝詞頻進(jìn)行倒排,獲取前3個(gè)單詞(TOP KEY)

wordCountRdd

================================
在企業(yè)中朝蜘,如何開發(fā)Spark Application
spark-shell + IDEA
-1,在IDEA中編寫代碼

-2,在spark-shell中執(zhí)行代碼

-3,使用IDEA將代碼打包成JAR包恶迈,使用bin/spark-submit進(jìn)行提交應(yīng)用

================================
Spark HistoryServer
監(jiān)控運(yùn)行完成的Spark Applicaiton。

分為兩個(gè)部分:
第一谱醇、設(shè)置SparkApplicaiton在運(yùn)行時(shí)暇仲,需要記錄日志信息

第二、啟動(dòng)HistoryServer副渴,通過界面查看

=================================================
需求一:
The average, min, and max content size of responses returned from the server.
ContentSize
需求二:
A count of response code's returned.
responseCode
需求三:
All IPAddresses that have accessed this server more than N times.
ipAddresses
需求四:
The top endpoints requested by count.
endPoint

mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scala-tools.org/repo-releases -DgroupId=com.ibeifeng.bigdata.spark.app -DartifactId=log-analyzer -Dversion=1.0

Hive:
將文件中的數(shù)據(jù)映射到一張表中奈附,

正則表達(dá)式:

Option
Some
有值
None
無(wú)值
16/07/16 18:46:51 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package
at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)
at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)
at java.lang.ClassLoader.defineClass(ClassLoader.java:794)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at org.spark-project.jetty.servlet.ServletContextHandler.<init>(ServletContextHandler.java:136)
at org.spark-project.jetty.servlet.ServletContextHandler.<init>(ServletContextHandler.java:129)
at org.spark-project.jetty.servlet.ServletContextHandler.<init>(ServletContextHandler.java:98)
at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:126)
at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:113)
at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:78)
at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62)
at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:62)
at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:63)
at org.apache.spark.ui.SparkUI.<init>(SparkUI.scala:76)
at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:195)
at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:146)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:473)
at com.ibeifeng.bigdata.spark.app.core.LogAnalyzer$.main(LogAnalyzer.scala:16)
上述原因:
由于JAR沖突導(dǎo)致。

回顧:
-1,了解認(rèn)識(shí)Spark
MapReduce比較
“四大優(yōu)勢(shì)”
--1,速度快
--2,使用簡(jiǎn)單
--3,一棧式
--4,無(wú)處不在的運(yùn)行
開發(fā)測(cè)試
SCALA: REPL/Python
-2,Spark Core
兩大抽象概念
--1,RDD
集合煮剧,存儲(chǔ)不同類型的數(shù)據(jù) - List
---1,內(nèi)存
memory
---2,分區(qū)
hdfs: block
---3,對(duì)每個(gè)分區(qū)上數(shù)據(jù)進(jìn)行操作
function
--2,共享變量shared variables
---1,廣播變量

        ---2,累加器
            計(jì)數(shù)器
-3,環(huán)境與開發(fā)
    --1,Local Mode
        spark-shell
    --2,Spark Standalone
        配置
        啟動(dòng)
        監(jiān)控
        使用
    --3,HistoryServer
        -1,針對(duì)每個(gè)應(yīng)用是否記錄eventlog
        -2,HistoryServer進(jìn)行展示
    --4,如何使用IDE開發(fā)Spark Application
        -1,SCALA PROJECt
            如何添加Spark JAR包
        -2,MAVEN PROJECT

=================================================
Spark 開發(fā)
step 1:
input data -> rdd/dataframe
step 2:
process data -> rdd##xx() / df#xx | "select xx, * from xx ..."
step 3:
output data -> rdd.saveXxxx / df.write.jdbc/json/xxx

================================================
問題:
16/07/23 09:38:13 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/07/23 09:38:13 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.RuntimeException: Cannot parse log line: mail.geovariances.fr - - [09/Mar/2004:05:02:11 -0800] "GET /twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif HTTP/1.1" 304 -
at com.ibeifeng.bigdata.spark.app.core.ApacheAccessLog$.parseLogLine(ApacheAccessLog.scala:38)
at com.ibeifeng.bigdata.spark.app.core.LogAnalyzer$$anonfun$5.apply(LogAnalyzer.scala:25)
at com.ibeifeng.bigdata.spark.app.core.LogAnalyzer$$anonfun$5.apply(LogAnalyzer.scala:25)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

閉包c(diǎn)losures:
JS/Jquery

  • Internally, each RDD is characterized by five main properties:

    • A list of partitions
      protected def getPartitions: Array[Partition]
    • A function for computing each split
      @DeveloperApi
      def compute(split: Partition, context: TaskContext): Iterator[T]
    • A list of dependencies on other RDDs
      protected def getDependencies: Seq[Dependency[_]] = deps
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
      /** Optionally overridden by subclasses to specify how they are partitioned. */
      @transient val partitioner: Option[Partitioner] = None
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
  • an HDFS file)
    protected def getPreferredLocations(split: Partition): Seq[String] = Nil

RDD

回顧WordCount

val rdd = sc.textFile("xxx")

val wordRdd = rdd.flatMap(.split(" "))
val kvRdd = wordRdd.map((
, 1))

kvRdd.groupByKey().map(tuple => {
(tuple._1, tuple.2.list.reduce( + _))
})

val wordcountRdd = kvRdd.reduceByKey(_ + _)

wordcountRdd.saveAsTextFile("yy")

kvRdd <- wordRdd <- rdd

wordRdd <- rdd

創(chuàng)建RDD的兩種方式:
方式一:
并行化集合
List\Seq\Array
SparkContext:
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T]
方式二:
外部存儲(chǔ)系統(tǒng)

方式:
RDD Transformation

==================================
對(duì)于分布式計(jì)算框架來(lái)說(shuō)斥滤,性能瓶頸
IO
-1,磁盤IO
-2,網(wǎng)絡(luò)IO

rdd1 -> rdd2
    Shuffle

============================================
groupByKey() & reduceByKey()

在實(shí)際開發(fā)中,如果可以使用reduceByKey實(shí)現(xiàn)的功能勉盅,就不要使用groupBykey
使用reduceByKey有聚合功能佑颇,類似MapReduce中啟用了Combiner

===============
join()
-1,等值鏈接

-2,左連接

數(shù)據(jù)去重
結(jié)果數(shù)據(jù)
res-pre.txt - rdd1
新數(shù)據(jù)進(jìn)行處理
web.tsv - 10GB - rdd2
解析里面的url,
如果res-pre.txt中包含菇篡,就不放入漩符,不包含就加入或者不包含url進(jìn)行特殊處理

rdd2.leftJoin(rdd1)

===================================================

Group Top Key

aa 78
bb 98
aa 80
cc 98
aa 69
cc 87
bb 97
cc 86
aa 97
bb 78
bb 34
cc 85
bb 92
cc 72
bb 32
bb 23

rdd.map(line => line.split(" ")).map(arr => (arr(0), arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.takeRight(3).reverse))

=======================================================
SparkContext
-1,向Master(主節(jié)點(diǎn),集群管理的主節(jié)點(diǎn))申請(qǐng)資源驱还,運(yùn)行所有Executor
-2,創(chuàng)建RDD的入口
sc.textFile("") // 從外部存儲(chǔ)系統(tǒng)創(chuàng)建
sc.parxx() // 并行化嗜暴,從Driver 中的集合創(chuàng)建
-3,調(diào)度管理JOB運(yùn)行
DAGScheduler 、 TaskScheduler
--3.1
為每個(gè)Job構(gòu)建DAG圖
--3.2
DAG圖劃分為Stage
按照RDD之間是否存在Shuffle
倒推(Stack)
--3.3
每個(gè)Stage中TaskSet
每個(gè)階段中Task代碼相同议蟆,僅僅處理數(shù)據(jù)不同

前面講解WordCount程序
詞頻統(tǒng)計(jì)

. ? # $ 毫無(wú)意義闷沥,無(wú)需統(tǒng)計(jì)

val list = List(".", "?", "#","$")

val rdd = sc.textFile("xxx")

val wordRdd = rdd.flatMap(_.split(" "))

val filterRdd = wordRdd.filter(word => !list.contains(word))

val kvRdd = filterRdd.map((, 1))
val wordcountRdd = kvRdd.reduceByKey(
+ _)

wordcountRdd.saveAsTextFile("yy")

使用廣播變量:
val list = List(".", "?", "!", "#", "$")
val braodCastList = sc.broadcast(list)
val wordRdd = sc.textFile("")
wordRdd.filter(word => {
braodCastList.value.contains(word)
})

對(duì)于Spark Applicaiton來(lái)說(shuō),很多初學(xué)者咐容,頭痛的一個(gè)問題就是
外部依賴JAR包
如下幾種方式:
方式一:
--jars JARS
Comma-separated list of local jars to include on the driver and executor classpaths.
jar包的位置一定要寫決定路徑舆逃。

方式二:
--driver-class-path
Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.

方式三:
SPARK_CLASSPATH
配置此環(huán)境變量

企業(yè)中Spark Application提交,shell 腳本

spark-app-submit.sh:

!/bin/sh

SPARK_HOME

SPARK_HOME=/opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6

SPARK CLASSPATH

SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jars/sparkexternale/xx.jar:/opt/jars/sparkexternale/yy.jar

${SPARK_HOME}/bin/spark-submit --master spark://hadoop-senior01.ibeifeng.com:7077 --deploy-mode cluster /opt/tools/scalaProject.jar

====================================================
YARN
-1,分布式資源管理
主節(jié)點(diǎn):ResouceManager
從節(jié)點(diǎn):NodeManager -> 負(fù)責(zé)管理每臺(tái)機(jī)器上的資源(內(nèi)存和CPU Core)
-2,資源調(diào)度
--1,容器Container
AM/Task
--2,對(duì)于運(yùn)行在YARN上的每個(gè)應(yīng)用,一個(gè)應(yīng)用的管理者ApplicaitonMaster 資源申請(qǐng)和任務(wù)調(diào)度

Spark Application
-1,Driver Program
資源申請(qǐng)和任務(wù)調(diào)度
-2,Executors
每一個(gè)Executor其實(shí)就是一個(gè)JVM路狮,就是一個(gè)進(jìn)程

以spark deploy mode : client
AM
-- 全部都允許在Container中
Executor s
運(yùn)行在Container中虫啥,類似于MapReduce任務(wù)中Map Task和Reduce Task一樣

Driver -> AM -> RM

============================================
如何判斷RDD之間是窄依賴還是寬依賴:
父RDD的每個(gè)分區(qū)數(shù)據(jù) 給 子RDD的每個(gè)分區(qū)數(shù)據(jù)

    1    ->     1

    1    ->     N    :  MapReduce 中 Shuffle

val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")

val wordRdd = rdd.flatMap(.split(" "))
val kvRdd = wordRdd.map((
, 1))
val wordcountRdd = kvRdd.reduceByKey(_ + _)

wordcountRdd.collect

input -> rdd  -> wordRdd -> kvRdd : Stage-01 -> ShuffleMapStage -> SMT

->

wordcountRdd -> output            :Stage-02 -> ResultStage -> ResultTask
  • Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
  • other stage(s), or a result stage, in which case its tasks directly compute a Spark action
  • (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
  • track the nodes that each output partition is on.
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市奄妨,隨后出現(xiàn)的幾起案子涂籽,更是在濱河造成了極大的恐慌,老刑警劉巖砸抛,帶你破解...
    沈念sama閱讀 207,248評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件评雌,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡直焙,警方通過查閱死者的電腦和手機(jī)景东,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)奔誓,“玉大人斤吐,你說(shuō)我怎么就攤上這事∷坷铮” “怎么了曲初?”我有些...
    開封第一講書人閱讀 153,443評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)杯聚。 經(jīng)常有香客問我臼婆,道長(zhǎng),這世上最難降的妖魔是什么幌绍? 我笑而不...
    開封第一講書人閱讀 55,475評(píng)論 1 279
  • 正文 為了忘掉前任颁褂,我火速辦了婚禮,結(jié)果婚禮上傀广,老公的妹妹穿的比我還像新娘颁独。我一直安慰自己,他們只是感情好伪冰,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評(píng)論 5 374
  • 文/花漫 我一把揭開白布誓酒。 她就那樣靜靜地躺著,像睡著了一般贮聂。 火紅的嫁衣襯著肌膚如雪靠柑。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,185評(píng)論 1 284
  • 那天吓懈,我揣著相機(jī)與錄音歼冰,去河邊找鬼。 笑死耻警,一個(gè)胖子當(dāng)著我的面吹牛隔嫡,可吹牛的內(nèi)容都是我干的甸怕。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼腮恩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼梢杭!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起秸滴,我...
    開封第一講書人閱讀 37,112評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤式曲,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后缸榛,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,609評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡兰伤,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評(píng)論 2 325
  • 正文 我和宋清朗相戀三年内颗,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片敦腔。...
    茶點(diǎn)故事閱讀 38,163評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡均澳,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出符衔,到底是詐尸還是另有隱情找前,我是刑警寧澤,帶...
    沈念sama閱讀 33,803評(píng)論 4 323
  • 正文 年R本政府宣布判族,位于F島的核電站躺盛,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏形帮。R本人自食惡果不足惜槽惫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望辩撑。 院中可真熱鬧界斜,春花似錦、人聲如沸合冀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)君躺。三九已至峭判,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間晰洒,已是汗流浹背朝抖。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留谍珊,地道東北人治宣。 一個(gè)月前我還...
    沈念sama閱讀 45,636評(píng)論 2 355
  • 正文 我出身青樓急侥,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親侮邀。 傳聞我的和親對(duì)象是個(gè)殘疾皇子坏怪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容