前言
目前spark sql 主要應用在structure streaming欢搜、etl 和 machine learning 的場景上, 它能對結構化的數(shù)據(jù)進行存儲和操作,結構化的數(shù)據(jù)可以來自HIve、JSON、Parquet桑驱、JDBC/ODBC等數(shù)據(jù)源。由于部門對數(shù)據(jù)的準確性跛蛋,一致性和維護等等要求等業(yè)務特點熬的,我們選擇mysql使用jdbc的方式作為我們的數(shù)據(jù)源,spark 集群用yarn做為資源管理赊级,本文主要分享我們在使用spark sql 過程中遇到的問題和一些常用的開發(fā)實踐總結押框。
運行環(huán)境:spark :2.1.0,hadoop: hadoop-2.5.0-cdh5.3.2 (yarn 資源管理理逊,hdfs)橡伞,mysql:5.7 盒揉,scala: 2.11, java:1.8
spark on yarn
我們先來了解一下spark on yarn 任務的運行機制兑徘。yarn 的基本思想是將JobTracker的兩個主要功能(資源管理和任務調度/監(jiān)控)分離成單獨的組件:RM 和 AM刚盈;新的資源管理器ResourceManager(RM)實現(xiàn)全局的所有應用的計算資源分配,應用控制器ApplicationMaster(AM)實現(xiàn)應用的調度和資源的協(xié)調挂脑;節(jié)點管理器NodeManager(NM)則是每臺機器的代理藕漱,處理來自AM的命令,實現(xiàn)節(jié)點的監(jiān)控與報告崭闲;容器 Container 封裝了內(nèi)存肋联、CPU、磁盤刁俭、網(wǎng)絡等資源牺蹄,是資源隔離的基礎,當AM向RM申請資源時薄翅,RM為AM返回的資源便是以Container表示,如上圖氓奈,spark master分配的 executor 的執(zhí)行環(huán)境便是containner翘魄。目前我們使用yarn 隊列的方式,可以進一步的對應用執(zhí)行進行管理舀奶,讓我們的應用分組和任務分配更加清晰和方便管理暑竟。
開發(fā)實踐
1. 讀取mysql表數(shù)據(jù)
import com.test.spark.db.ConnectionInfos;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
public class SparkSimple01 {
public static void main(String[] args) {
// 創(chuàng)建spark會話,實質上是SQLContext和HiveContext的組合
SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate();
// 設置日志級別育勺,默認會打印DAG,TASK執(zhí)行日志但荤,設置為WARN之后可以只關注應用相關日志
sparkSession.sparkContext().setLogLevel("WARN");
// 分區(qū)方式讀取mysql表數(shù)據(jù)
Dataset<Row> predicateSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people",
(String[]) Arrays.asList(" name = 'tom'", " name = 'sam' ").toArray(), ConnectionInfos.getTestUserAndPasswordProperties());
predicateSet.show();
}
}
為了確認該查詢對mysql發(fā)出的具體sql,我們先查看一下mysql執(zhí)行sql日志涧至,
#mysql 命令窗口執(zhí)行以下命令打開日志記錄
SHOW VARIABLES LIKE "general_log%";
SET GLOBAL general_log = 'ON';
打開Lenovo.log得到以上代碼在mysql上的執(zhí)行情況:
通過分區(qū)查詢獲取表數(shù)據(jù)的方式有以下幾個優(yōu)點:
- 利用表索引查詢提高查詢效率
- 自定義sql條件使分區(qū)數(shù)據(jù)更加均勻腹躁,方便后面的并行計算
- 分區(qū)并發(fā)讀取可以通過控制并發(fā)控制對mysql的查詢壓力
- 可以讀取大數(shù)據(jù)量的mysql表
spark jdbc 讀取msyql表還有直接讀取(無法讀取大數(shù)據(jù)量表),指定字段分區(qū)讀取(分區(qū)不夠均勻)等方式南蓬,通過項目實踐總結纺非,以上的分區(qū)讀取方式是我們目前認為對mysql最友好的方式。
分庫分表的系統(tǒng)也可以利用這種方式讀取各個表在內(nèi)存中union所有spark view得到一張統(tǒng)一的內(nèi)存表赘方,在業(yè)務操作中將分庫分表透明化烧颖。如果線上數(shù)據(jù)表數(shù)據(jù)量較大的時候,在union之前就需要將spark view通過指定字段的方式查詢窄陡,避免on line ddl 在做變更時union表報錯炕淮,因為可能存在部分表已經(jīng)添加新字段,部分表還未加上新字段跳夭,而union要求所有表的表結構一致涂圆,導致報錯们镜。
2. Dataset 分區(qū)數(shù)據(jù)查看
我們都知道 Dataset 的分區(qū)是否均勻,對于結果集的并行處理效果有很重要的作用乘综,spark Java版暫時無法查看partition分區(qū)中的數(shù)據(jù)分布憎账,這里用java調用scala 版api方式查看,線上不推薦使用卡辰,因為這里的分區(qū)查看使用foreachPartition胞皱,多了一次action操作,并且打印出全部數(shù)據(jù)九妈。
import org.apache.spark.sql.{Dataset, Row}
/**
* Created by lesly.lai on 2017/12/25.
*/
class SparkRddTaskInfo {
def getTask(dataSet: Dataset[Row]) {
val size = dataSet.rdd.partitions.length
println(s"==> partition size: $size " )
import scala.collection.Iterator
val showElements = (it: Iterator[Row]) => {
val ns = it.toSeq
import org.apache.spark.TaskContext
val pid = TaskContext.get.partitionId
println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")
}
dataSet.foreachPartition(showElements)
}
}
還是用上面讀取mysql數(shù)據(jù)的例子來演示調用反砌,將predicateSet作為參數(shù)傳入
new SparkRddTaskInfo().getTask(predicateSet);
控制臺打印結果
通過分區(qū)數(shù)據(jù),我們可以看到之前的predicate 方式得到的分區(qū)數(shù)就是predicate size 大小萌朱,并且按照我們想要的數(shù)據(jù)分區(qū)方式分布數(shù)據(jù)宴树,這對于業(yè)務數(shù)據(jù)的批處理,executor的local cache晶疼,spark job執(zhí)行參數(shù)調優(yōu)都很有幫助酒贬,例如調整spark.executor.cores,spark.executor.memory翠霍,GC方式等等锭吨。
這里涉及java和Scala容器轉換的問題,Scala和Java容器庫有很多相似點寒匙,例如零如,他們都包含迭代器、可迭代結構锄弱、集合考蕾、 映射和序列。但是他們有一個重要的區(qū)別会宪。Scala的容器庫特別強調不可變性肖卧,因此提供了大量的新方法將一個容器變換成一個新的容器。
在Scala內(nèi)部掸鹅,這些轉換是通過一系列“包裝”對象完成的喜命,這些對象會將相應的方法調用轉發(fā)至底層的容器對象。所以容器不會在Java和Scala之間拷貝來拷貝去河劝。一個值得注意的特性是壁榕,如果你將一個Java容器轉換成其對應的Scala容器,然后再將其轉換回同樣的Java容器赎瞎,最終得到的是一個和一開始完全相同的容器對象(這里的相同意味著這兩個對象實際上是指向同一片內(nèi)存區(qū)域的引用牌里,容器轉換過程中沒有任何的拷貝發(fā)生)。
3. sql 自定義函數(shù)
自定義函數(shù),可以簡單方便的實現(xiàn)業(yè)務邏輯牡辽。
import com.tes.spark.db.ConnectionInfos;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
public class SparkSimple02 {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate();
sparkSession.sparkContext().setLogLevel("WARN");
Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties());
originSet.cache().createOrReplaceTempView("people");
// action操作 打印原始結果集
originSet.show();
// 注冊自定義函數(shù)
sparkSession.sqlContext().udf().register("genderUdf", gender -> {
if("M".equals(gender)){
return "男";
}else if("F".equals(gender)){
return "女";
}
return "未知";
}, DataTypes.StringType);
// 查詢結果
Dataset<Row> peopleDs = sparkSession.sql("select job_number,name,age, genderUdf(gender) gender, dept_id, salary, create_time from people ");
// action操作 打印函數(shù)處理后結果集
peopleDs.show();
}
}
執(zhí)行結果:
在sql中用使用java代碼實現(xiàn)邏輯操作喳篇,這為sql的處理邏輯能力提升了好幾個層次,將函數(shù)抽取成接口實現(xiàn)類可以方便的管理和維護這類自定義函數(shù)類态辛。此外麸澜,spark也支持自定義內(nèi)聚函數(shù),窗口函數(shù)等等方式奏黑,相比傳統(tǒng)開發(fā)實現(xiàn)的功能方式炊邦,使用spark sql開發(fā)效率可以明顯提高。
4. mysql 查詢連接復用
最近線上任務遇到一個獲取mysql connection blocked的問題熟史,從spark ui的executor thread dump 可以看到blocked的棧信息馁害,如圖:
查看代碼發(fā)現(xiàn)DBConnectionManager 調用了 spark driver注冊mysql driver 使用同步方式的代碼
看到這里我們很容易覺得是注冊driver 導致的blocked,其實再仔細看回報錯棧信息蹂匹,我們會發(fā)現(xiàn)碘菜,這里的getConnection是在dataset 的foreachpartition 中調用,并且是在每次db 操作時獲取一次getConnection 操作限寞,這意味著在該分區(qū)下有多次重復的在同步方法中注冊driver獲取連接的操作忍啸,看到這里線程blocked的原因就很明顯了,這里我們的解決方式是:
a. 在同個partition中的connection 復用進行db操作
b. 為了避免partition數(shù)據(jù)分布不均導致連接active時間過長履植,加上定時釋放連接再從連接池重新獲取連接操作
通過以上的連接處理计雌,解決了blocked問題,tps也達到了4w左右静尼。
5. executor 并發(fā)控制
我們都知道,利用spark 集群分區(qū)并行能力传泊,可以很容易實現(xiàn)較高的并發(fā)處理能力鼠渺,如果是并發(fā)的批處理,那并行處理的能力可以更好眷细,但是拦盹,mysql 在面對這么高的并發(fā)的時候,是有點吃不消的溪椎,因此我們需要適當降低spark 應用的并發(fā)和上下游系統(tǒng)和平相處普舆。控制spark job并發(fā)可以通過很多參數(shù)配置組合校读、集群資源沼侣、yarn隊列限制等方式實現(xiàn),經(jīng)過實踐歉秫,我們選擇以下參數(shù)實現(xiàn):
#需要關閉動態(tài)內(nèi)存分配蛾洛,其他配置才生效
spark.dynamicAllocation.enabled = false
spark.executor.instances = 2
spark.executor.cores = 2
這里發(fā)現(xiàn)除了設置executor配置之外,還需要關閉spark的動態(tài)executor分配機制,spark 的ExecutorAllocationManager 是 一個根據(jù)工作負載動態(tài)分配和刪除 executors 的管家轧膘, ExecutorAllocationManager 維持一個動態(tài)調整的目標executors數(shù)目钞螟, 并且定期同步到資源管理者,也就是 yarn 谎碍,啟動的時候根據(jù)配置設置一個目標executors數(shù)目鳞滨, spark 運行過程中會根據(jù)等待(pending)和正在運行(running)的tasks數(shù)目動態(tài)調整目標executors數(shù)目,因此需要關閉動態(tài)配置資源才能達到控制并發(fā)的效果蟆淀。
除了executor是動態(tài)分配之外拯啦,Spark 1.6 之后引入的統(tǒng)一內(nèi)存管理機制弃甥,與靜態(tài)內(nèi)存管理的區(qū)別在于存儲內(nèi)存和執(zhí)行內(nèi)存共享同一塊空間色乾,可以動態(tài)占用對方的空閑區(qū)域,我們先看看worker中的內(nèi)存規(guī)劃是怎樣的:
worker 可以根據(jù)實例配置主巍,內(nèi)存配置笋敞,cores配置動態(tài)生成executor數(shù)量碱蒙,每一個executor為一個jvm進程,因此executor 的內(nèi)存管理是建立在jvm的內(nèi)存管理之上的夯巷。從本文第一張spark on yarn圖片可以看到赛惩,yarn模式的 executor 是在yarn container 中運行,因此container的內(nèi)存分配大小同樣可以控制executor的數(shù)量趁餐。
RDD 的每個 Partition 經(jīng)過處理后唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )喷兼,從上圖可以看出,開發(fā)過程中常用的分區(qū)(partition)數(shù)據(jù)是以block的方式存儲在堆內(nèi)的storage內(nèi)存區(qū)域的后雷,還有為了減少網(wǎng)絡io而做的broadcast數(shù)據(jù)也存儲在storage區(qū)域季惯;堆內(nèi)的另一個區(qū)域內(nèi)存則主要用于緩存rdd shuffle產(chǎn)生的中間數(shù)據(jù);此外臀突,worker 中的多個executor還共享同一個節(jié)點上的堆外內(nèi)存勉抓,這部分內(nèi)存主要存儲經(jīng)序列化后的二進制數(shù)據(jù),使用的是系統(tǒng)的內(nèi)存候学,可以減少不必要的開銷以及頻繁的GC掃描和回收藕筋。
為了更好的理解executor的內(nèi)存分配,我們再來看一下executor各個內(nèi)存塊的參數(shù)設置:
了解spark 內(nèi)存管理的機制后梳码,就可以根據(jù)mysql的處理能力來設置executor的并發(fā)處理能力隐圾,讓我們的spark 應用處理能力收放自如。調整executor數(shù)量還有另外一個好處掰茶,就是集群資源規(guī)劃暇藏,目前我們的集群隊列是yarn fair 模式,
先看看yarn fair模式濒蒋,舉個例子叨咖,假設有兩個用戶A和B,他們分別擁有一個隊列。當A啟動一個job而B沒有任務時甸各,A會獲得全部集群資源垛贤;當B啟動一個job后,A的job會繼續(xù)運行趣倾,當A的job執(zhí)行完釋放資源后聘惦,不過一會兒之后兩個任務會各自獲得一半的集群資源。如果此時B再啟動第二個job并且其它job還在運行儒恋,則它將會和B的第一個job共享B這個隊列的資源善绎,也就是B的兩個job會用于四分之一的集群資源,而A的job仍然用于集群一半的資源诫尽,結果就是資源最終在兩個用戶之間平等的共享禀酱。
在這種情況下,即使有多個隊列執(zhí)行任務牧嫉,fair模式容易在資源空閑時占用其他隊列資源剂跟,一旦占用時間過長,就會導致其他任務都卡住酣藻,這也是我們遇到的實際問題曹洽。如果我們在一開始能評估任務所用的資源,就可以在yarn隊列的基礎上指定應用的資源辽剧,例如executor的內(nèi)存送淆,cpu,實例個數(shù)怕轿,并行task數(shù)量等等參數(shù)來管理集群資源偷崩,這有點類似于yarn Capacity Scheduler 隊列模式,但又比它有優(yōu)勢撞羽,因為spark 應用可以通過spark context的配置來動態(tài)的設置阐斜,不用在配置yarn 隊列后重啟集群,稍微靈活了一點放吩。
除了以上提到的幾點總結智听,我們還遇到很多其他的疑問和實踐羽杰,例如渡紫,什么時候出現(xiàn)shuffle;如何比較好避開或者利用shuffle考赛;Dataset 的cache操作會不會有性能問題惕澎,如何從spark ui中分析定位問題;spark 任務異常處理等等颜骤,暫時到這里唧喉,待續(xù)...
賴澤坤 @vip.fcs
參考資料:
http://www.cnblogs.com/yangsy0915/p/5118100.html
https://mp.weixin.qq.com/s/KhHy1mURJBiPMGqkl4-JEw
https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html?ca=drs-&utm_source=tuicool&utm_medium=referral
https://docs.scala-lang.org/zh-cn/overviews/collections/conversions-between-java-and-scala-collections.html
http://www.reibang.com/p/e7db5970e68c