spark sql 在mysql的應用實踐

前言

目前spark sql 主要應用在structure streaming欢搜、etl 和 machine learning 的場景上, 它能對結構化的數(shù)據(jù)進行存儲和操作,結構化的數(shù)據(jù)可以來自HIve、JSON、Parquet桑驱、JDBC/ODBC等數(shù)據(jù)源。由于部門對數(shù)據(jù)的準確性跛蛋,一致性和維護等等要求等業(yè)務特點熬的,我們選擇mysql使用jdbc的方式作為我們的數(shù)據(jù)源,spark 集群用yarn做為資源管理赊级,本文主要分享我們在使用spark sql 過程中遇到的問題和一些常用的開發(fā)實踐總結押框。

運行環(huán)境:spark :2.1.0,hadoop: hadoop-2.5.0-cdh5.3.2 (yarn 資源管理理逊,hdfs)橡伞,mysql:5.7 盒揉,scala: 2.11, java:1.8


spark on yarn

spark on yarn 運行機制

我們先來了解一下spark on yarn 任務的運行機制兑徘。yarn 的基本思想是將JobTracker的兩個主要功能(資源管理和任務調度/監(jiān)控)分離成單獨的組件:RM 和 AM刚盈;新的資源管理器ResourceManager(RM)實現(xiàn)全局的所有應用的計算資源分配,應用控制器ApplicationMaster(AM)實現(xiàn)應用的調度和資源的協(xié)調挂脑;節(jié)點管理器NodeManager(NM)則是每臺機器的代理藕漱,處理來自AM的命令,實現(xiàn)節(jié)點的監(jiān)控與報告崭闲;容器 Container 封裝了內(nèi)存肋联、CPU、磁盤刁俭、網(wǎng)絡等資源牺蹄,是資源隔離的基礎,當AM向RM申請資源時薄翅,RM為AM返回的資源便是以Container表示,如上圖氓奈,spark master分配的 executor 的執(zhí)行環(huán)境便是containner翘魄。目前我們使用yarn 隊列的方式,可以進一步的對應用執(zhí)行進行管理舀奶,讓我們的應用分組和任務分配更加清晰和方便管理暑竟。

yarn 隊列

開發(fā)實踐

1. 讀取mysql表數(shù)據(jù)

import com.test.spark.db.ConnectionInfos;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;

public class SparkSimple01 {

    public static void main(String[] args) {

        // 創(chuàng)建spark會話,實質上是SQLContext和HiveContext的組合
        SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate();

        // 設置日志級別育勺,默認會打印DAG,TASK執(zhí)行日志但荤,設置為WARN之后可以只關注應用相關日志
        sparkSession.sparkContext().setLogLevel("WARN");

        // 分區(qū)方式讀取mysql表數(shù)據(jù)
        Dataset<Row> predicateSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people",
                (String[]) Arrays.asList(" name = 'tom'", " name = 'sam' ").toArray(), ConnectionInfos.getTestUserAndPasswordProperties());

        predicateSet.show();

    }
}

為了確認該查詢對mysql發(fā)出的具體sql,我們先查看一下mysql執(zhí)行sql日志涧至,

#mysql 命令窗口執(zhí)行以下命令打開日志記錄
SHOW VARIABLES LIKE "general_log%";
SET GLOBAL general_log = 'ON';
mysql log.png

打開Lenovo.log得到以上代碼在mysql上的執(zhí)行情況:


分區(qū)執(zhí)行sql

通過分區(qū)查詢獲取表數(shù)據(jù)的方式有以下幾個優(yōu)點:

  • 利用表索引查詢提高查詢效率
  • 自定義sql條件使分區(qū)數(shù)據(jù)更加均勻腹躁,方便后面的并行計算
  • 分區(qū)并發(fā)讀取可以通過控制并發(fā)控制對mysql的查詢壓力
  • 可以讀取大數(shù)據(jù)量的mysql表

spark jdbc 讀取msyql表還有直接讀取(無法讀取大數(shù)據(jù)量表),指定字段分區(qū)讀取(分區(qū)不夠均勻)等方式南蓬,通過項目實踐總結纺非,以上的分區(qū)讀取方式是我們目前認為對mysql最友好的方式。
分庫分表的系統(tǒng)也可以利用這種方式讀取各個表在內(nèi)存中union所有spark view得到一張統(tǒng)一的內(nèi)存表赘方,在業(yè)務操作中將分庫分表透明化烧颖。如果線上數(shù)據(jù)表數(shù)據(jù)量較大的時候,在union之前就需要將spark view通過指定字段的方式查詢窄陡,避免on line ddl 在做變更時union表報錯炕淮,因為可能存在部分表已經(jīng)添加新字段,部分表還未加上新字段跳夭,而union要求所有表的表結構一致涂圆,導致報錯们镜。

2. Dataset 分區(qū)數(shù)據(jù)查看

我們都知道 Dataset 的分區(qū)是否均勻,對于結果集的并行處理效果有很重要的作用乘综,spark Java版暫時無法查看partition分區(qū)中的數(shù)據(jù)分布憎账,這里用java調用scala 版api方式查看,線上不推薦使用卡辰,因為這里的分區(qū)查看使用foreachPartition胞皱,多了一次action操作,并且打印出全部數(shù)據(jù)九妈。

import org.apache.spark.sql.{Dataset, Row}

/**
  * Created by lesly.lai on 2017/12/25.
  */
class SparkRddTaskInfo {
  def getTask(dataSet: Dataset[Row]) {
    val size = dataSet.rdd.partitions.length
    println(s"==> partition size: $size " )
    import scala.collection.Iterator
    val showElements = (it: Iterator[Row]) => {
      val ns = it.toSeq
      import org.apache.spark.TaskContext
      val pid = TaskContext.get.partitionId
      println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")
    }
    dataSet.foreachPartition(showElements)
  }
}

還是用上面讀取mysql數(shù)據(jù)的例子來演示調用反砌,將predicateSet作為參數(shù)傳入

new SparkRddTaskInfo().getTask(predicateSet);

控制臺打印結果


分區(qū)結果.png

通過分區(qū)數(shù)據(jù),我們可以看到之前的predicate 方式得到的分區(qū)數(shù)就是predicate size 大小萌朱,并且按照我們想要的數(shù)據(jù)分區(qū)方式分布數(shù)據(jù)宴树,這對于業(yè)務數(shù)據(jù)的批處理,executor的local cache晶疼,spark job執(zhí)行參數(shù)調優(yōu)都很有幫助酒贬,例如調整spark.executor.cores,spark.executor.memory翠霍,GC方式等等锭吨。
這里涉及java和Scala容器轉換的問題,Scala和Java容器庫有很多相似點寒匙,例如零如,他們都包含迭代器、可迭代結構锄弱、集合考蕾、 映射和序列。但是他們有一個重要的區(qū)別会宪。Scala的容器庫特別強調不可變性肖卧,因此提供了大量的新方法將一個容器變換成一個新的容器。
在Scala內(nèi)部掸鹅,這些轉換是通過一系列“包裝”對象完成的喜命,這些對象會將相應的方法調用轉發(fā)至底層的容器對象。所以容器不會在Java和Scala之間拷貝來拷貝去河劝。一個值得注意的特性是壁榕,如果你將一個Java容器轉換成其對應的Scala容器,然后再將其轉換回同樣的Java容器赎瞎,最終得到的是一個和一開始完全相同的容器對象(這里的相同意味著這兩個對象實際上是指向同一片內(nèi)存區(qū)域的引用牌里,容器轉換過程中沒有任何的拷貝發(fā)生)。

3. sql 自定義函數(shù)

自定義函數(shù),可以簡單方便的實現(xiàn)業(yè)務邏輯牡辽。

import com.tes.spark.db.ConnectionInfos;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;

public class SparkSimple02 {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate();
        sparkSession.sparkContext().setLogLevel("WARN");
        Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties());
        originSet.cache().createOrReplaceTempView("people");

        // action操作 打印原始結果集
        originSet.show();

        // 注冊自定義函數(shù)
        sparkSession.sqlContext().udf().register("genderUdf", gender -> {
            if("M".equals(gender)){
                return  "男";
            }else if("F".equals(gender)){
                return  "女";
            }
            return "未知";
        }, DataTypes.StringType);

        // 查詢結果
        Dataset<Row> peopleDs = sparkSession.sql("select job_number,name,age, genderUdf(gender) gender, dept_id, salary, create_time from people ");

        // action操作 打印函數(shù)處理后結果集
        peopleDs.show();
    }
}

執(zhí)行結果:


image.png

在sql中用使用java代碼實現(xiàn)邏輯操作喳篇,這為sql的處理邏輯能力提升了好幾個層次,將函數(shù)抽取成接口實現(xiàn)類可以方便的管理和維護這類自定義函數(shù)類态辛。此外麸澜,spark也支持自定義內(nèi)聚函數(shù),窗口函數(shù)等等方式奏黑,相比傳統(tǒng)開發(fā)實現(xiàn)的功能方式炊邦,使用spark sql開發(fā)效率可以明顯提高。

4. mysql 查詢連接復用

最近線上任務遇到一個獲取mysql connection blocked的問題熟史,從spark ui的executor thread dump 可以看到blocked的棧信息馁害,如圖:


connection blocked.png

查看代碼發(fā)現(xiàn)DBConnectionManager 調用了 spark driver注冊mysql driver 使用同步方式的代碼


driverRegister.png

看到這里我們很容易覺得是注冊driver 導致的blocked,其實再仔細看回報錯棧信息蹂匹,我們會發(fā)現(xiàn)碘菜,這里的getConnection是在dataset 的foreachpartition 中調用,并且是在每次db 操作時獲取一次getConnection 操作限寞,這意味著在該分區(qū)下有多次重復的在同步方法中注冊driver獲取連接的操作忍啸,看到這里線程blocked的原因就很明顯了,這里我們的解決方式是:
a. 在同個partition中的connection 復用進行db操作
b. 為了避免partition數(shù)據(jù)分布不均導致連接active時間過長履植,加上定時釋放連接再從連接池重新獲取連接操作
通過以上的連接處理计雌,解決了blocked問題,tps也達到了4w左右静尼。

5. executor 并發(fā)控制

我們都知道,利用spark 集群分區(qū)并行能力传泊,可以很容易實現(xiàn)較高的并發(fā)處理能力鼠渺,如果是并發(fā)的批處理,那并行處理的能力可以更好眷细,但是拦盹,mysql 在面對這么高的并發(fā)的時候,是有點吃不消的溪椎,因此我們需要適當降低spark 應用的并發(fā)和上下游系統(tǒng)和平相處普舆。控制spark job并發(fā)可以通過很多參數(shù)配置組合校读、集群資源沼侣、yarn隊列限制等方式實現(xiàn),經(jīng)過實踐歉秫,我們選擇以下參數(shù)實現(xiàn):

#需要關閉動態(tài)內(nèi)存分配蛾洛,其他配置才生效
spark.dynamicAllocation.enabled = false
spark.executor.instances = 2
spark.executor.cores = 2
image.png

這里發(fā)現(xiàn)除了設置executor配置之外,還需要關閉spark的動態(tài)executor分配機制,spark 的ExecutorAllocationManager 是 一個根據(jù)工作負載動態(tài)分配和刪除 executors 的管家轧膘, ExecutorAllocationManager 維持一個動態(tài)調整的目標executors數(shù)目钞螟, 并且定期同步到資源管理者,也就是 yarn 谎碍,啟動的時候根據(jù)配置設置一個目標executors數(shù)目鳞滨, spark 運行過程中會根據(jù)等待(pending)和正在運行(running)的tasks數(shù)目動態(tài)調整目標executors數(shù)目,因此需要關閉動態(tài)配置資源才能達到控制并發(fā)的效果蟆淀。

除了executor是動態(tài)分配之外拯啦,Spark 1.6 之后引入的統(tǒng)一內(nèi)存管理機制弃甥,與靜態(tài)內(nèi)存管理的區(qū)別在于存儲內(nèi)存和執(zhí)行內(nèi)存共享同一塊空間色乾,可以動態(tài)占用對方的空閑區(qū)域,我們先看看worker中的內(nèi)存規(guī)劃是怎樣的:


worker memory schedule.png

worker 可以根據(jù)實例配置主巍,內(nèi)存配置笋敞,cores配置動態(tài)生成executor數(shù)量碱蒙,每一個executor為一個jvm進程,因此executor 的內(nèi)存管理是建立在jvm的內(nèi)存管理之上的夯巷。從本文第一張spark on yarn圖片可以看到赛惩,yarn模式的 executor 是在yarn container 中運行,因此container的內(nèi)存分配大小同樣可以控制executor的數(shù)量趁餐。
RDD 的每個 Partition 經(jīng)過處理后唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )喷兼,從上圖可以看出,開發(fā)過程中常用的分區(qū)(partition)數(shù)據(jù)是以block的方式存儲在堆內(nèi)的storage內(nèi)存區(qū)域的后雷,還有為了減少網(wǎng)絡io而做的broadcast數(shù)據(jù)也存儲在storage區(qū)域季惯;堆內(nèi)的另一個區(qū)域內(nèi)存則主要用于緩存rdd shuffle產(chǎn)生的中間數(shù)據(jù);此外臀突,worker 中的多個executor還共享同一個節(jié)點上的堆外內(nèi)存勉抓,這部分內(nèi)存主要存儲經(jīng)序列化后的二進制數(shù)據(jù),使用的是系統(tǒng)的內(nèi)存候学,可以減少不必要的開銷以及頻繁的GC掃描和回收藕筋。

為了更好的理解executor的內(nèi)存分配,我們再來看一下executor各個內(nèi)存塊的參數(shù)設置:


executor jvm

off-heap.png

了解spark 內(nèi)存管理的機制后梳码,就可以根據(jù)mysql的處理能力來設置executor的并發(fā)處理能力隐圾,讓我們的spark 應用處理能力收放自如。調整executor數(shù)量還有另外一個好處掰茶,就是集群資源規(guī)劃暇藏,目前我們的集群隊列是yarn fair 模式,


yarn fair 集群模式.png

先看看yarn fair模式濒蒋,舉個例子叨咖,假設有兩個用戶A和B,他們分別擁有一個隊列。當A啟動一個job而B沒有任務時甸各,A會獲得全部集群資源垛贤;當B啟動一個job后,A的job會繼續(xù)運行趣倾,當A的job執(zhí)行完釋放資源后聘惦,不過一會兒之后兩個任務會各自獲得一半的集群資源。如果此時B再啟動第二個job并且其它job還在運行儒恋,則它將會和B的第一個job共享B這個隊列的資源善绎,也就是B的兩個job會用于四分之一的集群資源,而A的job仍然用于集群一半的資源诫尽,結果就是資源最終在兩個用戶之間平等的共享禀酱。

在這種情況下,即使有多個隊列執(zhí)行任務牧嫉,fair模式容易在資源空閑時占用其他隊列資源剂跟,一旦占用時間過長,就會導致其他任務都卡住酣藻,這也是我們遇到的實際問題曹洽。如果我們在一開始能評估任務所用的資源,就可以在yarn隊列的基礎上指定應用的資源辽剧,例如executor的內(nèi)存送淆,cpu,實例個數(shù)怕轿,并行task數(shù)量等等參數(shù)來管理集群資源偷崩,這有點類似于yarn Capacity Scheduler 隊列模式,但又比它有優(yōu)勢撞羽,因為spark 應用可以通過spark context的配置來動態(tài)的設置阐斜,不用在配置yarn 隊列后重啟集群,稍微靈活了一點放吩。

除了以上提到的幾點總結智听,我們還遇到很多其他的疑問和實踐羽杰,例如渡紫,什么時候出現(xiàn)shuffle;如何比較好避開或者利用shuffle考赛;Dataset 的cache操作會不會有性能問題惕澎,如何從spark ui中分析定位問題;spark 任務異常處理等等颜骤,暫時到這里唧喉,待續(xù)...

賴澤坤 @vip.fcs

參考資料:
http://www.cnblogs.com/yangsy0915/p/5118100.html
https://mp.weixin.qq.com/s/KhHy1mURJBiPMGqkl4-JEw
https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html?ca=drs-&utm_source=tuicool&utm_medium=referral
https://docs.scala-lang.org/zh-cn/overviews/collections/conversions-between-java-and-scala-collections.html
http://www.reibang.com/p/e7db5970e68c

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子八孝,更是在濱河造成了極大的恐慌董朝,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件干跛,死亡現(xiàn)場離奇詭異子姜,居然都是意外死亡,警方通過查閱死者的電腦和手機楼入,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進店門哥捕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人嘉熊,你說我怎么就攤上這事遥赚。” “怎么了阐肤?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵凫佛,是天一觀的道長。 經(jīng)常有香客問我泽腮,道長御蒲,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任诊赊,我火速辦了婚禮厚满,結果婚禮上,老公的妹妹穿的比我還像新娘碧磅。我一直安慰自己碘箍,他們只是感情好,可當我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布鲸郊。 她就那樣靜靜地躺著丰榴,像睡著了一般。 火紅的嫁衣襯著肌膚如雪秆撮。 梳的紋絲不亂的頭發(fā)上四濒,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天,我揣著相機與錄音职辨,去河邊找鬼盗蟆。 笑死,一個胖子當著我的面吹牛舒裤,可吹牛的內(nèi)容都是我干的喳资。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼腾供,長吁一口氣:“原來是場噩夢啊……” “哼仆邓!你這毒婦竟也來了鲜滩?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤节值,失蹤者是張志新(化名)和其女友劉穎徙硅,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體搞疗,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡闷游,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了贴汪。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片脐往。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖扳埂,靈堂內(nèi)的尸體忽然破棺而出业簿,到底是詐尸還是另有隱情,我是刑警寧澤阳懂,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布梅尤,位于F島的核電站,受9級特大地震影響岩调,放射性物質發(fā)生泄漏巷燥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一号枕、第九天 我趴在偏房一處隱蔽的房頂上張望缰揪。 院中可真熱鬧,春花似錦葱淳、人聲如沸钝腺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽艳狐。三九已至,卻和暖如春皿桑,著一層夾襖步出監(jiān)牢的瞬間毫目,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工诲侮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留镀虐,地道東北人。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓浆西,卻偏偏與公主長得像粉私,于是被迫代替她去往敵國和親顽腾。 傳聞我的和親對象是個殘疾皇子近零,可洞房花燭夜當晚...
    茶點故事閱讀 42,916評論 2 344