1. Overview:
Structured Streaming是基于Spark SQL引擎的可擴(kuò)展娜搂、具有容錯(cuò)性的流處理引擎校摩。系統(tǒng)通過checkpointing和寫Ahead Logs的方式保證端到端的只執(zhí)行一次的容錯(cuò)保證圣猎。Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing。
2. Word Count
下面實(shí)現(xiàn)一個(gè)簡(jiǎn)單的基于Structured Streaming的WordCount程序。程序監(jiān)聽服務(wù)器的TCP Socket营袜,對(duì)監(jiān)聽的內(nèi)容做word count操作。為了測(cè)試丑罪,在服務(wù)器端開啟命令行來(lái)發(fā)送數(shù)據(jù):
$nc -lk 9999
1.首先荚板,為了使用Structured Streaming的API凤壁,需要先引入SparkSQL和Spark Structured Streaming的maven依賴:
<!-- Apache Spark Structured Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- Apache Spark Structured SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
2.代碼實(shí)現(xiàn)(scala):
lines(是一個(gè)DataFrame)代表一個(gè)包含流文本數(shù)據(jù)的無(wú)邊界的表,表只包含一個(gè)字段"value",流數(shù)據(jù)中的每一行就是該表的一條記錄跪另。
scala的實(shí)現(xiàn)方法:
object StructureStreamingWordCount {
def main(args: Array[String]) {
//create a local SparkSession
val spark = SparkSession
.builder
.appName("SparkWordCountNetWork")
.getOrCreate()
//listen to 10.198.193.189:9999拧抖,獲得來(lái)自該端口的流數(shù)據(jù)。
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host","10.198.193.189")
.option("port",9999)
.load()
//需要使用as方法將DataFrame轉(zhuǎn)換成DataSet免绿,才能使用flatMap轉(zhuǎn)換操作唧席。
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
//print the complete set of counts
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
- 打包代碼成jar包,提交這個(gè)jar包:
./bin/spark-submit /home/natty/spark/log-analyzer.jar \
com.pmpa.bigdata.spark.app.core.StructureStreamingWordCount
4.通過nc命令發(fā)送句子嘲驾,測(cè)試結(jié)果淌哟。
+------+-----+
| value|count|
+------+-----+
|apache| 1|
|hadoop| 3|
+------+-----+
3. Programming Model
Structured Streaming的核心思想是把實(shí)時(shí)數(shù)據(jù)當(dāng)做一個(gè)持續(xù)寫入的表,這樣一來(lái)辽故,流處理模型和批處理模型就很相似了徒仓。
1.基本概念:
將輸入數(shù)據(jù)流理解為“Input Table”。數(shù)據(jù)流中每一條記錄是新插入input table的一條記錄榕暇。
針對(duì)輸入的查詢會(huì)生成“Result Table”蓬衡。每個(gè)觸發(fā)間隔(下圖中是1s),新的記錄追加到input table彤枢,并最終更新Result Table狰晚。每當(dāng)Result Table被更新,我們都會(huì)將變化的記錄行寫入外部sink缴啡。
“Output”是指需要寫入外部存儲(chǔ)的內(nèi)容壁晒。output可以定義為3種模式:
2.WordCount程序分析:
對(duì)于上邊的Word Count程序,DataFrame “l(fā)ines”是Input Table业栅,DataFrame “wordCounts”是result table秒咐。
Spark Structured Streaming模型與其他的流處理引擎有著很大的區(qū)別。很多流系統(tǒng)要求用戶自行執(zhí)行聚合計(jì)算碘裕,所以需要自行保證容錯(cuò)携取、數(shù)據(jù)一致性(至少計(jì)算一次、至多計(jì)算一次帮孔、只計(jì)算一次)雷滋。但是在Spark Structured Streaming中,在有新的數(shù)據(jù)更新時(shí)文兢,Spark來(lái)負(fù)責(zé)更新Result Table晤斩。
4. spark streaming:
Spark
SparkCore
RDD
SparkContext
SparkSQL
DataFrame
SQLContext
SparkStreaming
DStream
將流式數(shù)據(jù)分成很多RDD,按照時(shí)間間隔1s/5s batch
StreamingContext
streaminContext = new StreamingContext(sc, Seconds(5))
HADOOP:
MapReduce
Hive/HBase
Storm
實(shí)時(shí)流式計(jì)算框架
Storm
storm.apache.org
JStorm
捐獻(xiàn)Apache
互聯(lián)網(wǎng)電商企業(yè)姆坚,如果大數(shù)據(jù)的話澳泵,思想:BAT公司都使用的東西,小的公司當(dāng)然也可以解決數(shù)據(jù)分析處理兼呵。
用戶推薦
JAVA/C
Mahout
0.9.x以后兔辅,就不支持MapReduce腊敲,遷移到Spark,2014年初宣布
SparkStreaming
-1,實(shí)時(shí)統(tǒng)計(jì)幢妄,累加
kafka + sparkstreaming(updateStatByKey)
-2,實(shí)時(shí)統(tǒng)計(jì)兔仰,最近一段時(shí)間指標(biāo)
實(shí)時(shí)查看最近一個(gè)小時(shí)之內(nèi)的用戶點(diǎn)擊量,各省或者重點(diǎn)城市
window
拋出問題:
SparkStreaming
-1,數(shù)據(jù)來(lái)源
Socket
Flume
Kafka ---- 最多
val kafkaDStream = streamingContext.kafka()
-2,數(shù)據(jù)處理
DStream#flatMap(),map(),filter(),reduceByKey,...
-3,數(shù)據(jù)輸出(Output)
內(nèi)存數(shù)據(jù)庫(kù)
Redis
HBase
JDBC
dstream#foreachRDD(rdd => {
rdd.toDF.write.jdbc
})
官網(wǎng)案例:
SparkStreamin從Socket接收數(shù)據(jù)蕉鸳,WordCount統(tǒng)計(jì)乎赴,結(jié)果輸出控制臺(tái)
對(duì)于SparkStreaming開發(fā)測(cè)試來(lái)說(shuō),尤其是local
至少是
--master local[2]
-#,2:代表的是啟動(dòng)2個(gè)Thread
-#,需要一個(gè)Thread進(jìn)行接收數(shù)據(jù)
Recevier
-#,另外一個(gè)Thread用于運(yùn)行Task
進(jìn)行數(shù)據(jù)的處理
對(duì)于GraphX
一張圖
頂點(diǎn) -> RDD[(Long, T)]
vertexId
vertexAttributes
邊 -> RDD[(Long, Long, T)]
SourceVertexId
DestVertexId
EdgeAttribute
======================================================
-1,Recevier
block interval
默認(rèn)值200ms
接收的數(shù)據(jù)潮尝,分成block榕吼,進(jìn)行存儲(chǔ)在Executors中
-2,StreamingContext
batch interval
處理數(shù)據(jù)的時(shí)間間隔,每次處理多長(zhǎng)時(shí)間范圍內(nèi)的數(shù)據(jù)
ssc = new StreamingContext(sc, Second(1))
t-0 t-200 t-400 t-600 t-800 t-1000 t-1200 t-1400
[blk-01 blk-02 blk-03 blk-04 blk-05 ] blk-06 blk-07
|
RDD
|
Process
RDD
sc.textFile()
DStream
ssc.textFileStream
==
DataFrame
spark.read.json("")
Struct Streaming
spark.read.streaming.json("")
spark-shell
load script
scala> :load /opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/WordCount.scala
作業(yè):
@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}
使用新的API進(jìn)行實(shí)時(shí)累加統(tǒng)計(jì)
陳超
七牛技術(shù)總監(jiān)
國(guó)內(nèi)Spark技術(shù)布道者
0.9.0
=======================================================
DStream#transform(func)
======================================================
需求:
詞頻統(tǒng)計(jì)
每次統(tǒng)計(jì)最近10秒的數(shù)據(jù)
======================================================
SparkStreaming與Flume和Kafka的集成
Recevier
接收數(shù)據(jù)
針對(duì)Flume來(lái)說(shuō)
Flume Agent
Source -> Chennal -> Sink
將數(shù)據(jù)寫給SparkStreaming
-1,push
將數(shù)據(jù)推個(gè)Recevier
block interval
200ms
blk-01 blk-02 blk-03 ...
Executors
batch interval
特殊情況的時(shí)候
-1,應(yīng)用全部停止
-2,未處理的block所在所有的Executor停止
僅僅接收的了數(shù)據(jù)勉失,但是不能保證所有的接收數(shù)據(jù)全部被處理羹蚣?
Spark 1.3.x開始,性能考慮乱凿,出現(xiàn)
-2,pull
拉取數(shù)據(jù)
最好
offset
10 50
batch job
RDD:
sink:10 -50
SparkJob的時(shí)候顽素,直接到FlumeSink這邊拿去數(shù)據(jù),然后處理徒蟆,結(jié)構(gòu)輸出胁出,更新offset
-1,數(shù)據(jù)不會(huì)丟失
-2,數(shù)據(jù)不會(huì)被重復(fù)處理
bin/spark-shell
--master local[3]
--jars externallibs/mysql-connector-java-5.1.27-bin.jar,
externallibs/spark-streaming-flume_2.10-1.6.1.jar,
externallibs/flume-avro-source-1.5.0-cdh5.3.6.jar,
externallibs/flume-ng-sdk-1.5.0-cdh5.3.6.jar
bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/spark-push-flume.properties -Dflume.root.logger=DEBUG,console
SparkStreaming與Kafka集成
Direct方式
沒有Recevier
batch interval
RDD
Kafka
Topic
part-01 part-02 part-03 part-04
index
SparkJob
val rdd = invoke-kafka-consumer-api-topic-offset
rdd.process
Kafka Producer API
JAVA
SCALA
bin/spark-shell
--master local[3]
--jars externallibs/mysql-connector-java-5.1.27-bin.jar,
externallibs/spark-streaming-kafka_2.10-1.6.1.jar,
externallibs/kafka_2.10-0.8.2.1.jar,
externallibs/kafka-clients-0.8.2.1.jar,\
externallibs/zkclient-0.3.jar,
externallibs/metrics-core-2.2.0.jar
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(5))
// Step 1:Recevier Data From Where
val socketDStream = ssc.socketTextStream("hadoop-senior01.ibeifeng.com", 9999)
// Step 2: Process Data Base DStream
// Split each line into words
val words = socketDStream.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
sc.stop
/**
=========================================================================
*/
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(5))
val socketDStream = ssc.textFileStream("/user/beifeng/sparkstreaming/hdfsfiles")
val words = socketDStream.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
///
// foreachFunc: (RDD[T], Time) => Unit
dstream.foreachRDD(rdd => {
// 將分析的數(shù)據(jù)存儲(chǔ)到JDBC中,MySQL數(shù)據(jù)中
val connection = createJDBCConnection() // executed at the driver
rdd.foreach { record =>
connection.putStateResult(record) // executed at the worker
}
})
// 建議的模式段审,
/**
比如講數(shù)據(jù)結(jié)果寫入到MySQL數(shù)據(jù)庫(kù)的某張表中
-1,首先有一個(gè)工具類全蝶,專門連接數(shù)據(jù)庫(kù)的一個(gè)連接池,池中有一些連接Connection
-2,RDD操作時(shí)應(yīng)該針對(duì)每個(gè)分區(qū)進(jìn)行操作
比如每個(gè)分區(qū)獲取一個(gè)數(shù)據(jù)庫(kù)的Connection
-3,針對(duì)分區(qū)中的每天數(shù)據(jù)寺枉,一個(gè)個(gè)的插入到數(shù)據(jù)庫(kù)的表中
-4,最后將數(shù)據(jù)庫(kù)連接放回到連接池中抑淫,以便其他進(jìn)程使用
*/
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
/**
=========================================================================
*/
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(2))
// Step 1:Recevier Data From Where
val socketDStream = ssc.socketTextStream("hadoop-senior01.ibeifeng.com", 9999)
// Step 2: Process Data Base DStream
// Split each line into words
val words = socketDStream.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), Seconds(8), Seconds(4))
// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
sc.stop
// ===============================================
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
val ssc = new StreamingContext(sc, Seconds(5))
// Step 1:Recevier Data From Where
// Flume: FlumeUtils, Kafka: KafkaUtils
val flumeDStream = FlumeUtils.createStream(ssc, "hadoop-senior01.ibeifeng.com", 9988).map(event => new String(event.event.getBody.array()))
// Step 2: Process Data Base DStream
// DStream[Long]
val words = flumeDStream.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
sc.stop
5. spark sql
思考:
Hive 實(shí)質(zhì)什么?
Hive 如何將SQL轉(zhuǎn)換為MapReduce姥闪?
Oozie:
MapReduce Action
Dremel
-1,Presto
-2,Impala
游戲公司
--1,yum rpm 安裝
--2,CM
Flume + Kafka + HBase + Impala + JAVA + Python
-3,Drill
研發(fā)性學(xué)習(xí):
http://kylin.apache.org/
====================================================
Spark SQL 前世今生
-1,1.0版本以前
Shark = Hive on Spark
-2,1.0.x版本
Spark SQL
alpha 版本
-3,1.3.x版本
DataFrame
release版本
-4,1.5.x版本
鎢絲計(jì)劃
-5,1.6.x版本
DataSet
-6,2.0版本
始苇。。筐喳。埂蕊。
總結(jié)一點(diǎn):
Spark SQL開始的話,替換Hive底層疏唾,spark SQL與Hive完全兼容,尤其HQL語(yǔ)句函似。
Shark
-1,Spark SQL
alpha 版本
-2,Hive on Spark
Hive
-1,MapReduce
-2,Spark
-3,Tez
====================================================
Spark SQL 處理Hive表中的數(shù)據(jù)
-1,MetaStore
MySQL數(shù)據(jù)庫(kù)
hive-site.xml
-2,數(shù)據(jù)庫(kù)驅(qū)動(dòng)
mysql-*.jar
bin/spark-shell --master local[2] --jars /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar
bin/spark-sql --master local[2] --jars /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar
====================================================
-1,對(duì)于Spark Core來(lái)說(shuō)槐脏,應(yīng)用程序的入口
SparkContext
-2,對(duì)于Spark SQL來(lái)說(shuō),應(yīng)用程序的入口
SQLContext -> HiveContext
SparkContext
--1, SparkCore: RDD
scala> case class People(name: String, age: Int)
defined class People
scala> val rdd = sc.textFile("/user/beifeng/people.txt").map(line => line.split(",")).map(x => People(x(0), x(1).trim.toInt))
rdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at <console>:29
--2,SparkSQL: DataFrame
scala> val df = sqlContext.read.json("/user/beifeng/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> val hive_emp_df = sqlContext.read.table("default.emp")
hive_emp_df: org.apache.spark.sql.DataFrame = [empno: int, ename: string, job: string, mgr: int, hiredate: string, sal: double, comm: double, deptno: int]
scala> hive_emp_df.schema
res2: org.apache.spark.sql.types.StructType = StructType(StructField(empno,IntegerType,true), StructField(ename,StringType,true), StructField(job,StringType,true), StructField(mgr,IntegerType,true), StructField(hiredate,StringType,true), StructField(sal,DoubleType,true), StructField(comm,DoubleType,true), StructField(deptno,IntegerType,true))
==========================================
如何創(chuàng)建DataFrame
-1,外部數(shù)據(jù)源-內(nèi)置
--1,json
--2,hive
--3,jdbc
--4,parquet/orc
--5,text -沒有用處
-2,從RDD轉(zhuǎn)換
--1,方式一:
RDD[CACECLASS]
--2,方式二:
自定義schema
-3,外部數(shù)據(jù)源 - 需要自己依據(jù)接口開發(fā)
--1,ES
檢索
--2,HBase
華為撇寞,開源
--3,solr
...
scala> val hive_dept_df = sqlContext.read.table("default.dept")
def jdbc(url: String, table: String, connectionProperties: Properties)
需求:
將DataFrame數(shù)據(jù)寫到MYSQL數(shù)據(jù)庫(kù)表中
val url = "jdbc:mysql://hadoop-senior01.ibeifeng.com:3306/test?user=root&password=123456"
import java.util.Properties
val props = new Properties()
hive_dept_df.write.jdbc(url, "tb_dept", props)
問題:
scala> hive_dept_df.write.jdbc(url, "tb_dept", props)
java.sql.SQLException: No suitable driver
at java.sql.DriverManager.getDriver(DriverManager.java:278)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:49)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)
解決方案:
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/externallibs/mysql-connector-java-5.1.27-bin.jar
bin/spark-shell --master local[2]
案例:
對(duì)不同數(shù)據(jù)源的表進(jìn)行JOIN顿天,一個(gè)是在Hive中堂氯,一個(gè)是在MySQL
-1,emp
hive
-2,dept
mysql
-3,join
val url = "jdbc:mysql://hadoop-senior01.ibeifeng.com:3306/test?user=root&password=123456"
import java.util.Properties
val props = new Properties()
val mysql_dept_df = sqlContext.read.jdbc(url, "tb_dept", props)
val hive_emp_df = sqlContext.read.table("default.emp")
// join
val join_df = hive_emp_df.join(mysql_dept_df, "deptno")
join_df.registerTempTable("join_emp_dept")
sqlContext.sql("select empno, ename, deptno, deptname, sal from join_emp_dept order by empno").show
===========================================================
如何從RDD創(chuàng)建DataFrame
方式一:
The first method uses reflection to infer the schema of an RDD that contains specific types of objects.
簡(jiǎn)單說(shuō)法:
RDD[CASE CLASS]
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
方式二:
The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD.
a DataFrame can be created programmatically with three steps:
--1,RDD -> RDD[Row]
import org.apache.spark.sql._
演示:
val rdd = sc.textFile("/user/beifeng/people.txt")
import org.apache.spark.sql._
val rowRdd = rdd.map(line => line.split(", ")).map(x => Row(x(0), x(1).toInt))
--2,Create the schema
import org.apache.spark.sql.types._
val schema = StructType(StructField("name",StringType,true) :: StructField("age",IntegerType,true) :: Nil)
--3,Apply the schema to the RDD of Rows
val people_df = sqlContext.createDataFrame(rowRdd, schema)
=========================================================
Spark 1.6
Dataset
數(shù)據(jù)集
相關(guān)文章:
-1,https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
-2,https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
val url = "jdbc:mysql://hadoop-senior01.ibeifeng.com:3306/test"
import java.util.Properties
val props = new Properties()
props.put("user","root")
props.put("password","123456")
val mysql_dept_df = sqlContext.read.jdbc(url, "tb_dept", props)
Spark SQL & ES
http://blog.csdn.net/stark_summer/article/details/49743687
回顧SparkSQL
-1,處理的數(shù)據(jù)90%來(lái)源Hive表
Hive數(shù)據(jù)倉(cāng)庫(kù)
-2,過程
HADOOP -> HIVE -> SparkSQL
建議:
編寫項(xiàng)目時(shí),最后寫上使用SparkSQL在調(diào)研牌废,在運(yùn)行咽白,嘗試
-3,程序入口
SQLContext & HiveContext
DataFrame
DataFrame -> Dataset -> RDD
-a,RDD[ROW] = DataFrame
創(chuàng)建兩種方式
-#,CASE CLASS
-#,RDD[ROW],schema(StructType,StructField)
-b,外部數(shù)據(jù)源
sqlContext.read.
.json
.parquet
.jdbc
.table
方便了異步數(shù)據(jù)源的數(shù)據(jù)連接處理JOIN
-c,與Hive的集成
sqlContext.sql("select * from emp").show
-d,ThriftServer
多,啟動(dòng)一個(gè)Spark Application,多個(gè)客戶連接
beeline
jdbc
-e,存儲(chǔ)分析結(jié)果
dataframe.write
.jdbc
.table
.text
.json
========================================================
Spark如何與HBase進(jìn)行集成鸟缕?晶框?
Spark 如何與HBase進(jìn)行交互?
1:讀數(shù)據(jù)
2: 寫數(shù)據(jù)
-1,SparkCore
read:
hbase-table -> RDD
write:
RDD -> Cell
作業(yè):
如何使用saveAsNewAPIHadoopDataset將Spark RDD數(shù)據(jù)保存到HBase表中懂从。
-2,SparkSQL
SparkSQL -> Hive -> HBase
此方式
目前來(lái)說(shuō)授段,僅限于讀取,不建議寫入番甩,可能有問題侵贵。
研發(fā)型作業(yè):
測(cè)試集成方式
注意:
-#,Hive與HBase集成時(shí),需要將HBASE CLIENT相關(guān)JAR包放入到$HIVE_HOME/lib
-#,SPARKSQL名義看是讀取Hive表的數(shù)據(jù),其實(shí)讀取的還是HBase表的數(shù)據(jù)缘薛,需要將HBASE CLIENT相關(guān)JAR包放入CLASSPATH
-3,SparkSQL External DataSource API
https://github.com/Huawei-Spark/Spark-SQL-on-HBase
MapReduce 框架中
input -> map -> shuffle -> reduce -> output
1:intput
讀取要處理的數(shù)據(jù)
// set input path
FileInputFormat.setPath(new Path("/user/beifeng/mr-wc-in"))
(Key, Value)
Key:
offset 偏移量
value:
每一行的數(shù)據(jù)
RecordReader
每條記錄讀取
TextInputFormat
LineRecordReader
MapReduce API
從HADOOP 0.20.0版本開始窍育,有了新的API
-#,old
org.apache.hadoop.mapred.*
Mapper\Reducer -> Interface
-#,new
org.apache.hadoop.mapreduce.*
Mapper\Reducer -> class
val rdd = sc.textFile("Readme.md")
Spark去讀取HDFS(HFile)上的文件數(shù)據(jù)時(shí),與MapReduce讀取數(shù)據(jù)時(shí)一樣一樣的
HBase與MapReduce集成
-1,Mapper
TableMapper
Mapper從HBase表中讀取數(shù)據(jù)
(ImmutableBytesWritable, Result)
mapreduce如果從HBase表讀取數(shù)據(jù)宴胧,也是一條一條的讀取漱抓,
TableInputFormat
TableRecordReader
===================================================
SparkSQL
-1,SparkSQL中的聚合函數(shù)
在DataFrame類中,有如下五組函數(shù)
- @groupname basic
Basic DataFrame functions - @groupname dfops
Language Integrated Queries - @groupname rdd
RDD Operations - @groupname output
Output Operations - @groupname action
Actions
SQL and DSL(Domain )
emp_df.agg("sal" -> "avg", "comm" -> "max").show
emp_df.agg(Map("sal" -> "avg", "comm" -> "max")).show
emp_df.agg(max($"comm"), avg($"sal")).show
emp_df.groupBy($"deptno").agg(max($"comm"), avg($"sal")).show
-2,自定義函數(shù)UDF
回顧一下:
Hive中自定義函數(shù)
-1,extends UDF
-2,override evaluate
對(duì)于Spark來(lái)說(shuō)
SCALA牺汤,函數(shù)式編程
匿名函數(shù)
if函數(shù)
if(condition, true-value, false-value)
sqlContext.udf.register(
"trans_comm", // 函數(shù)名稱
(comm: Double) => {
if(comm.toString == ){
0.0
}else{
comm
}
}// 函數(shù)
)
===========================================================
如何自定義UDAF辽旋?
avg:
求平均值
多 對(duì) 一
輸入多個(gè)值,輸出一個(gè)值
平均值:
-0,total = 0.0 // 3000 + 2500 + 4200 + 2100 + 5000
-1,count = 0 // 5
緩沖數(shù)據(jù)
Spark/MapReduce
block block block
| | |
part-01 part-02 part-03
| | |
Task Task Task
avg avg avg
sqlContext.udf.register(
"avg_sal",
AvgSalUDAF
)
sqlContext.sql("select deptno, avg(sal) avg_sal, avg_sal(sal) as_self from emp group by deptno").show
==================================================
Hive中分析函數(shù)
http://lxw1234.com/archives/tag/hive-window-functions
SparkSQL同樣也支持
Spark 1.4.0
SPARK-1442: Window functions in Spark SQL and DataFrames
ROW_NUMBER
針對(duì)emp表來(lái)說(shuō)
獲取各部門中工資Top3的人員信息
-1,按照部門進(jìn)行分組
group by deptno
-2,對(duì)各部門人員工資降序排序
order by sal desc
-3,獲取前幾個(gè)
TopKey
SELECT
empno, ename, sal, deptno,
ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk
FROM
emp ;
empno ename sal deptno rnk
7839 KING 5000.0 10 1
7782 CLARK 2450.0 10 2
7934 MILLER 1300.0 10 3
7788 SCOTT 3000.0 20 1
7902 FORD 3000.0 20 2
7566 JONES 2975.0 20 3
7876 ADAMS 1100.0 20 4
7369 SMITH 800.0 20 5
7698 BLAKE 2850.0 30 1
7499 ALLEN 1600.0 30 2
7844 TURNER 1500.0 30 3
7521 WARD 1250.0 30 4
7654 MARTIN 1250.0 30 5
7900 JAMES 950.0 30 6
SELECT
empno, ename, sal, deptno
FROM(
SELECT
empno, ename, sal, deptno,
ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk
FROM
emp
) t
WHERE
t.rnk <= 3 ;
empno ename sal deptno
7839 KING 5000.0 10
7782 CLARK 2450.0 10
7934 MILLER 1300.0 10
7788 SCOTT 3000.0 20
7902 FORD 3000.0 20
7566 JONES 2975.0 20
7698 BLAKE 2850.0 30
7499 ALLEN 1600.0 30
7844 TURNER 1500.0 30
數(shù)據(jù)統(tǒng)計(jì)分析:
-1,過濾檐迟,清洗
-2,分組
-3,統(tǒng)計(jì)
-4,排序
-5,TopKey
6 spark core
關(guān)鍵知識(shí)
-1,MapReduce
思想
shuffle
-2,Hive
ETL
Spark SQL
前世今生
數(shù)據(jù)倉(cāng)庫(kù)
整理數(shù)據(jù)
DataFrame
===================================================================
Spark 課程安排:
-1,SCALA
一天課程
-2,Spark Core
兩天課程
-3,SparkSQL
一天課程
-4,Spark Streaming
一天課程
-5,項(xiàng)目
三天課程
-6,Spark MLlib(可選)
一天課程
===========================================================
騰訊:
08年的時(shí)候补胚,幾百臺(tái)服務(wù)器,日志數(shù)據(jù)信息追迟。
分析日志數(shù)據(jù):
-1,服務(wù)器是否出現(xiàn)故障溶其,運(yùn)維
-2,為產(chǎn)品的設(shè)計(jì)
產(chǎn)品經(jīng)理
階段一:
Python + MySQL
實(shí)現(xiàn):
-1,每天服務(wù)器上面,都會(huì)安裝MySQL數(shù)據(jù)庫(kù)
日志數(shù)據(jù)存儲(chǔ)到MySQL數(shù)據(jù)庫(kù)表中
--1,Python
腳本
--2,某個(gè)條件敦间,對(duì)日志數(shù)據(jù)進(jìn)行分類
不同的數(shù)據(jù)存儲(chǔ)到不同的MySQL數(shù)據(jù)庫(kù)表
-2,基于SQL進(jìn)行分析
分散 -> 存儲(chǔ) -> 分析 -> 聚合 -> 分散 -> 存儲(chǔ) -> 分析
階段二:
HADOOP + Hive
實(shí)現(xiàn):
-1,將日志數(shù)據(jù)放到HDFS中瓶逃,使用MapReduce進(jìn)行數(shù)據(jù)分類、清洗廓块、過濾
數(shù)據(jù)加載到Hive表中
-2,HiveQL
批處理厢绝,離線分析妖滔,速度相對(duì)來(lái)說(shuō)力细,較慢认罩,尤其針對(duì)數(shù)據(jù)量非常大的時(shí)候衷佃。
業(yè)務(wù)需求的增加:
機(jī)器學(xué)習(xí)榆鼠,數(shù)據(jù)分析事期,數(shù)據(jù)挖掘
---1,迭代計(jì)算
循環(huán)
MapReduce:
Map Task -> local filesystem -> Reduce Task -> hdfs
基于MapReduce的機(jī)器學(xué)習(xí)框架
Mahout
--0,在2014年上半年官方就說(shuō)了溃睹,底層不在支持MapReduce丑婿,支持Spark
階段三:
HADOOP + SPARK
實(shí)現(xiàn):
-1,數(shù)據(jù)還是存儲(chǔ)在HDFS/Hive
-2,數(shù)據(jù)分析
Spark
其他框架作為輔助
Hive + Python + 。娄周。涕侈。。
為什么Spark煤辨?裳涛??掷酗?调违??泻轰?
數(shù)據(jù)結(jié)構(gòu):
RDD
-1,集合(List[Type])
-2,數(shù)據(jù)存儲(chǔ)在內(nèi)存中
-3,分區(qū)存儲(chǔ)
hdfs 分塊block
-4,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行運(yùn)算
mapreduce input -> block : map task
input -> process -> output
MapReduce:
input -> (map -> shuffle -> reduce) -> output
Spark:
input -> (rdd -> rdd -> rdd -> rdd -> ...) -> output
val list = List(1,2,3,4)
list.map(x => x * 2)
AMPLab
A:算法 M: 機(jī)器 P:人類
Spark 學(xué)習(xí)建議:
-1,把握核心
RDD
DataFrame
DStream
-2,思考技肩,源碼,對(duì)比
案例
=============================================================
Apache Spark? is a fast and general engine for large-scale data processing.
關(guān)鍵詞:
-1,海量數(shù)據(jù)處理計(jì)算框架
核心:數(shù)據(jù)結(jié)構(gòu)RDD
-2,fast
速度快
比較性:
MapReduce
比較的數(shù)據(jù):
相同數(shù)據(jù)浮声,相同邏輯虚婿,不同的編碼
--1,內(nèi)存
100+
--2,磁盤
10+
-3,general
通用
HADOOP生態(tài)系統(tǒng):
MapReduce Hive Mahout Storm Graphi
Spark 生態(tài)棧:
Core SparkSQL MLlib SparkStreaming Graphx
JAR包:
庫(kù)
-1,Spark Application運(yùn)行everywhere
local、yarn泳挥、memsos然痊、standalon、ec2
-2,處理數(shù)據(jù)
來(lái)源一切(Spark SQL:json\parquet\orc\xml\jdbc\solr\es\tsv...)
hdfs\hive\hbase...
==============================================================
Spark Timeline
-1,Spark 1.0以前
0.9
-2,1.0
版本發(fā)布
-3屉符,Spark 2015年的發(fā)展
1.2\1.3\1.4\1.5\1.6
五大版本
1.3
Spark SQL:DataFrame
鎢絲計(jì)劃
性能優(yōu)化
-4,Spark 2.0
更加簡(jiǎn)單剧浸、更加快速、更加智能
學(xué)習(xí)Spark三大網(wǎng)站:
-1,官方文檔
http://spark.apache.org/
-2,源碼
https://github.com/apache/spark
-3,官方博客
https://databricks.com/blog
Hive 執(zhí)行引擎:
-1,MapReduce
-2,Spark
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
-3,Tez
Hortonworks
針對(duì)APACH HADOOP
./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0 -Phive -Phive-thriftserver -Pyarn
針對(duì)CDH HADOOP
./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.6 -Phive -Phive-thriftserver -Pyarn
========================================================
Local
YARN/Standalone
--Spark Local
-1,JAVA
-2,HDFS
-3,SCALA
-4,Spark
大記状V印:
bin/spark-submit
類似于
bin/yarn
提交Spark Application的腳本就是spark-submit
spark-shell
spark-submit
=====================================================
所有的Spark程序來(lái)說(shuō)唆香,程序的入口是SparkContext
分析數(shù)據(jù):
step 1: input
rdd = sc.textFile("/user/beifeng/xx")
step 2: process
rdd.map().filter().reduceByKey()
step 3: output
rdd.saveAsTextFile()
16/06/26 20:01:56 INFO BlockManagerMaster: Registered BlockManager
16/06/26 20:01:56 INFO SparkILoop: Created spark context..
Spark context available as sc.
rdd = sc.textFile("/user/beifeng/xx")
類比:
默認(rèn)情況下,MapTask如何讀取數(shù)據(jù)吨艇,一行一行的讀取數(shù)據(jù)<key,value>
在Spark中
也是一行一行的讀取數(shù)據(jù)躬它,每行數(shù)據(jù)是String
WortCount
-1,MapReduce
-2,Spark
-3,Flink
http://flink.apache.org/
input
val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")
process
val wordCountRdd = rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a, b) => (a + b))
output
wordCountRdd.saveAsTextFile("/user/beifeng/spark/wordcount/output")
=====================
-1,不是說(shuō)MapReduce框架,思想不好东涡,而不是不適合做某些事情
(key,value)
-2,Spark 數(shù)據(jù)處理中
我們更多是與MapReduce類似冯吓,將數(shù)據(jù)轉(zhuǎn)換為(key,value)進(jìn)行處理
val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")
rdd.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)
==================================================================
總結(jié):
-1,認(rèn)識(shí)Spark
RDD
--1,有哪些?
--2,用哪些疮跑?
Spark Applicaiton???
--1,YARN
目前
--2,Standalone/Mesos
自身帶分布式資源管理管理和任務(wù)調(diào)度
hadoop 2.x release 2.2.0 2013/10/15
hadoop 2.0.x - al
hadoop 2.1.x - beta
Cloudera
cdh3.x - 0.20.2
cdh4.x - 2.0.0
HDFS -> HA:QJM ; Federation
Cloudera Manager 4.x
cdh5.x
================================================
Spark Standalon Mode
認(rèn)為
Spark 本身自帶的一個(gè)分布式資源管理系統(tǒng)以及任務(wù)調(diào)度的框架
類似于YARN這樣的框架
分布式
主節(jié)點(diǎn):
Master - ResourceManager
從節(jié)點(diǎn):
Works - NodeManagers
start-slaves.sh
啟動(dòng)所有的從節(jié)點(diǎn)组贺,也就是Work
注意:
使用此命令時(shí),運(yùn)行此命令的機(jī)器祖娘,必須要配置與其他機(jī)器的SSH無(wú)密鑰登錄锣披,否則啟動(dòng)的時(shí)候會(huì)出現(xiàn)一些問題,比如說(shuō)輸入密碼之類的。
對(duì)于Spark Application
兩部分組成:
-1,Driver Program -> 4040 4041 4042
main
SparkContext ---最最重要
-2,Executor
JVM(進(jìn)程)
運(yùn)行我們Job的Task
REPL:
shell交互式命令
Spark Application
Job-01
count
Job-02
Stage-01
Task-01(線程) -> Map Task(進(jìn)程)
Task-02(線程) -> Map Task(進(jìn)程)
每個(gè)Stage中的所有Task雹仿,業(yè)務(wù)都是相同的,處理的數(shù)據(jù)不同
Stage-02
Job-03
從上述運(yùn)行程序案例來(lái)看:
如果RDD調(diào)用的函數(shù)整以,返回值不是RDD的時(shí)候胧辽,就會(huì)觸發(fā)一個(gè)JOB,進(jìn)行執(zhí)行公黑。
思考:
reduceByKey 到底做了什么事情呢邑商?
-1,分組
將相同key的value放在一起
-2,對(duì)value進(jìn)行reduce
進(jìn)行合并
經(jīng)分析,對(duì)比MapReduce中WordCount程序運(yùn)行凡蚜,推斷出Spark JOB中Stage的劃分依據(jù)RDD之間是否產(chǎn)生Shuffle進(jìn)行劃分的人断。
=============================================================
val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")
val wordCountRdd = rdd.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)
需求:
安裝詞頻進(jìn)行倒排,獲取前3個(gè)單詞(TOP KEY)
wordCountRdd
================================
在企業(yè)中朝蜘,如何開發(fā)Spark Application
spark-shell + IDEA
-1,在IDEA中編寫代碼
-2,在spark-shell中執(zhí)行代碼
-3,使用IDEA將代碼打包成JAR包恶迈,使用bin/spark-submit進(jìn)行提交應(yīng)用
================================
Spark HistoryServer
監(jiān)控運(yùn)行完成的Spark Applicaiton。
分為兩個(gè)部分:
第一谱醇、設(shè)置SparkApplicaiton在運(yùn)行時(shí)暇仲,需要記錄日志信息
第二、啟動(dòng)HistoryServer副渴,通過界面查看
=================================================
需求一:
The average, min, and max content size of responses returned from the server.
ContentSize
需求二:
A count of response code's returned.
responseCode
需求三:
All IPAddresses that have accessed this server more than N times.
ipAddresses
需求四:
The top endpoints requested by count.
endPoint
mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scala-tools.org/repo-releases -DgroupId=com.ibeifeng.bigdata.spark.app -DartifactId=log-analyzer -Dversion=1.0
Hive:
將文件中的數(shù)據(jù)映射到一張表中奈附,
正則表達(dá)式:
Option
Some
有值
None
無(wú)值
16/07/16 18:46:51 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package
at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)
at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)
at java.lang.ClassLoader.defineClass(ClassLoader.java:794)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at org.spark-project.jetty.servlet.ServletContextHandler.<init>(ServletContextHandler.java:136)
at org.spark-project.jetty.servlet.ServletContextHandler.<init>(ServletContextHandler.java:129)
at org.spark-project.jetty.servlet.ServletContextHandler.<init>(ServletContextHandler.java:98)
at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:126)
at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:113)
at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:78)
at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62)
at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:62)
at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:63)
at org.apache.spark.ui.SparkUI.<init>(SparkUI.scala:76)
at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:195)
at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:146)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:473)
at com.ibeifeng.bigdata.spark.app.core.LogAnalyzer$.main(LogAnalyzer.scala:16)
上述原因:
由于JAR沖突導(dǎo)致。
回顧:
-1,了解認(rèn)識(shí)Spark
MapReduce比較
“四大優(yōu)勢(shì)”
--1,速度快
--2,使用簡(jiǎn)單
--3,一棧式
--4,無(wú)處不在的運(yùn)行
開發(fā)測(cè)試
SCALA: REPL/Python
-2,Spark Core
兩大抽象概念
--1,RDD
集合煮剧,存儲(chǔ)不同類型的數(shù)據(jù) - List
---1,內(nèi)存
memory
---2,分區(qū)
hdfs: block
---3,對(duì)每個(gè)分區(qū)上數(shù)據(jù)進(jìn)行操作
function
--2,共享變量shared variables
---1,廣播變量
---2,累加器
計(jì)數(shù)器
-3,環(huán)境與開發(fā)
--1,Local Mode
spark-shell
--2,Spark Standalone
配置
啟動(dòng)
監(jiān)控
使用
--3,HistoryServer
-1,針對(duì)每個(gè)應(yīng)用是否記錄eventlog
-2,HistoryServer進(jìn)行展示
--4,如何使用IDE開發(fā)Spark Application
-1,SCALA PROJECt
如何添加Spark JAR包
-2,MAVEN PROJECT
=================================================
Spark 開發(fā)
step 1:
input data -> rdd/dataframe
step 2:
process data -> rdd##xx() / df#xx | "select xx, * from xx ..."
step 3:
output data -> rdd.saveXxxx / df.write.jdbc/json/xxx
================================================
問題:
16/07/23 09:38:13 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/07/23 09:38:13 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.RuntimeException: Cannot parse log line: mail.geovariances.fr - - [09/Mar/2004:05:02:11 -0800] "GET /twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif HTTP/1.1" 304 -
at com.ibeifeng.bigdata.spark.app.core.ApacheAccessLog$.parseLogLine(ApacheAccessLog.scala:38)
at com.ibeifeng.bigdata.spark.app.core.LogAnalyzer$$anonfun$5.apply(LogAnalyzer.scala:25)
at com.ibeifeng.bigdata.spark.app.core.LogAnalyzer$$anonfun$5.apply(LogAnalyzer.scala:25)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
閉包c(diǎn)losures:
JS/Jquery
Internally, each RDD is characterized by five main properties:
- A list of partitions
protected def getPartitions: Array[Partition]
- A list of partitions
- A function for computing each split
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
- A function for computing each split
- A list of dependencies on other RDDs
protected def getDependencies: Seq[Dependency[_]] = deps
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
an HDFS file)
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
RDD
回顧WordCount
val rdd = sc.textFile("xxx")
val wordRdd = rdd.flatMap(.split(" "))
val kvRdd = wordRdd.map((, 1))
kvRdd.groupByKey().map(tuple => {
(tuple._1, tuple.2.list.reduce( + _))
})
val wordcountRdd = kvRdd.reduceByKey(_ + _)
wordcountRdd.saveAsTextFile("yy")
kvRdd <- wordRdd <- rdd
wordRdd <- rdd
創(chuàng)建RDD的兩種方式:
方式一:
并行化集合
List\Seq\Array
SparkContext:
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T]
方式二:
外部存儲(chǔ)系統(tǒng)
方式:
RDD Transformation
==================================
對(duì)于分布式計(jì)算框架來(lái)說(shuō)斥滤,性能瓶頸
IO
-1,磁盤IO
-2,網(wǎng)絡(luò)IO
rdd1 -> rdd2
Shuffle
============================================
groupByKey() & reduceByKey()
在實(shí)際開發(fā)中,如果可以使用reduceByKey實(shí)現(xiàn)的功能勉盅,就不要使用groupBykey
使用reduceByKey有聚合功能佑颇,類似MapReduce中啟用了Combiner
===============
join()
-1,等值鏈接
-2,左連接
數(shù)據(jù)去重
結(jié)果數(shù)據(jù)
res-pre.txt - rdd1
新數(shù)據(jù)進(jìn)行處理
web.tsv - 10GB - rdd2
解析里面的url,
如果res-pre.txt中包含菇篡,就不放入漩符,不包含就加入或者不包含url進(jìn)行特殊處理
rdd2.leftJoin(rdd1)
===================================================
Group Top Key
aa 78
bb 98
aa 80
cc 98
aa 69
cc 87
bb 97
cc 86
aa 97
bb 78
bb 34
cc 85
bb 92
cc 72
bb 32
bb 23
rdd.map(line => line.split(" ")).map(arr => (arr(0), arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.takeRight(3).reverse))
=======================================================
SparkContext
-1,向Master(主節(jié)點(diǎn),集群管理的主節(jié)點(diǎn))申請(qǐng)資源驱还,運(yùn)行所有Executor
-2,創(chuàng)建RDD的入口
sc.textFile("") // 從外部存儲(chǔ)系統(tǒng)創(chuàng)建
sc.parxx() // 并行化嗜暴,從Driver 中的集合創(chuàng)建
-3,調(diào)度管理JOB運(yùn)行
DAGScheduler 、 TaskScheduler
--3.1
為每個(gè)Job構(gòu)建DAG圖
--3.2
DAG圖劃分為Stage
按照RDD之間是否存在Shuffle
倒推(Stack)
--3.3
每個(gè)Stage中TaskSet
每個(gè)階段中Task代碼相同议蟆,僅僅處理數(shù)據(jù)不同
前面講解WordCount程序
詞頻統(tǒng)計(jì)
. ? # $ 毫無(wú)意義闷沥,無(wú)需統(tǒng)計(jì)
val list = List(".", "?", "#","$")
val rdd = sc.textFile("xxx")
val wordRdd = rdd.flatMap(_.split(" "))
val filterRdd = wordRdd.filter(word => !list.contains(word))
val kvRdd = filterRdd.map((, 1))
val wordcountRdd = kvRdd.reduceByKey( + _)
wordcountRdd.saveAsTextFile("yy")
使用廣播變量:
val list = List(".", "?", "!", "#", "$")
val braodCastList = sc.broadcast(list)
val wordRdd = sc.textFile("")
wordRdd.filter(word => {
braodCastList.value.contains(word)
})
對(duì)于Spark Applicaiton來(lái)說(shuō),很多初學(xué)者咐容,頭痛的一個(gè)問題就是
外部依賴JAR包
如下幾種方式:
方式一:
--jars JARS
Comma-separated list of local jars to include on the driver and executor classpaths.
jar包的位置一定要寫決定路徑舆逃。
方式二:
--driver-class-path
Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.
方式三:
SPARK_CLASSPATH
配置此環(huán)境變量
企業(yè)中Spark Application提交,shell 腳本
spark-app-submit.sh:
!/bin/sh
SPARK_HOME
SPARK_HOME=/opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6
SPARK CLASSPATH
SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jars/sparkexternale/xx.jar:/opt/jars/sparkexternale/yy.jar
${SPARK_HOME}/bin/spark-submit --master spark://hadoop-senior01.ibeifeng.com:7077 --deploy-mode cluster /opt/tools/scalaProject.jar
====================================================
YARN
-1,分布式資源管理
主節(jié)點(diǎn):ResouceManager
從節(jié)點(diǎn):NodeManager -> 負(fù)責(zé)管理每臺(tái)機(jī)器上的資源(內(nèi)存和CPU Core)
-2,資源調(diào)度
--1,容器Container
AM/Task
--2,對(duì)于運(yùn)行在YARN上的每個(gè)應(yīng)用,一個(gè)應(yīng)用的管理者ApplicaitonMaster 資源申請(qǐng)和任務(wù)調(diào)度
Spark Application
-1,Driver Program
資源申請(qǐng)和任務(wù)調(diào)度
-2,Executors
每一個(gè)Executor其實(shí)就是一個(gè)JVM路狮,就是一個(gè)進(jìn)程
以spark deploy mode : client
AM
-- 全部都允許在Container中
Executor s
運(yùn)行在Container中虫啥,類似于MapReduce任務(wù)中Map Task和Reduce Task一樣
Driver -> AM -> RM
============================================
如何判斷RDD之間是窄依賴還是寬依賴:
父RDD的每個(gè)分區(qū)數(shù)據(jù) 給 子RDD的每個(gè)分區(qū)數(shù)據(jù)
1 -> 1
1 -> N : MapReduce 中 Shuffle
val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")
val wordRdd = rdd.flatMap(.split(" "))
val kvRdd = wordRdd.map((, 1))
val wordcountRdd = kvRdd.reduceByKey(_ + _)
wordcountRdd.collect
input -> rdd -> wordRdd -> kvRdd : Stage-01 -> ShuffleMapStage -> SMT
->
wordcountRdd -> output :Stage-02 -> ResultStage -> ResultTask
- Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
- other stage(s), or a result stage, in which case its tasks directly compute a Spark action
- (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
- track the nodes that each output partition is on.