Spark Structured Streaming寫Hive HBase Mysql

組件版本

  • spark版本 2.3.1 (hdp)
  • hadoop 3.1.1 (hdp)
  • HDP hive 3.1.2
  • HBase 2.0.0
  • mysql 版本5.x

使用Spark Structured Streaming讀取kafka的數(shù)據(jù)寫入hive庶柿、HBase和MySQL在spark里沒有原生支持饶火,整理實(shí)測做祝。

  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.insight.spark</groupId>
    <artifactId>SparkDemo</artifactId>
    <version>1.1</version>

    <properties>
        <encoding>UTF-8</encoding>
        <spark.version>2.3.1</spark.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/com.sun.jersey/jersey-core -->
        <dependency>
            <groupId>com.sun.jersey</groupId>
            <artifactId>jersey-client</artifactId>
            <version>1.19</version>
        </dependency>

        <!-- Spark核心庫 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jersey-client</artifactId>
                    <groupId>org.glassfish.jersey.core</groupId>
                </exclusion>
            </exclusions>
            <!-- <scope>provided</scope>-->
        </dependency>
        <!--Spark sql庫 提供DF類API -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--HBase相關(guān)庫-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.htrace</groupId>
            <artifactId>htrace-core</artifactId>
            <version>3.1.0-incubating</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.9</version>
        </dependency>
        
        <!--spark與hive交互 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>



    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <!-- If you have classpath issue like NoDefClassError,... -->
                    <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

主要代碼與使用方法

Usage: StructuredKafkaWordCount <bootstrap-servers> <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1] ……
程序接收多個參數(shù):第一個是kafka的broker地址宗兼,第二個是消費(fèi)的topic名稱咆耿、第三個是輸出類型杆煞,有4種娩嚼,用 0 1 2 3 表示蘑险,第4個是checkpoint的路徑,后續(xù)更多的參數(shù)可以傳遞給連接mysql使用岳悟。程序的邏輯是接收kafka的消息佃迄,做wordcount處理后輸出結(jié)果。

package com.insight.spark.streaming

import com.insight.spark.util.ConfigLoader
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}

import java.sql
import java.sql.{DriverManager, PreparedStatement}
import java.util.UUID

object StructuredStreamingTest {
  System.setProperty("HADOOP_USER_NAME","hdfs")
  val conf: Configuration = HBaseConfiguration.create()

  def main(args: Array[String]): Unit = {
    SetLogLevel.setStreamingLogLevels()
    if (args.length < 2) {
      System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        " <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1]")
      System.exit(1)
    }

    val Array(bootstrapServers, topics, number, _*) = args
    val checkpointLocation =
      if (args.length > 3) args(3) else "/tmp/temp-spark-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topics)
      .option("startingOffsets", "latest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    /**
      * Start running the query with user params:0 1 2 3
      * 1:結(jié)果寫入hive
      * 2:結(jié)果寫入hbase
      * 3:結(jié)果寫入mysql
      * 0/other:console 結(jié)果打印到控制臺
      */
    val dsw = number match {
      //寫hive
      case "1" =>
        wordCounts.writeStream
          .outputMode("complete")
          .trigger(Trigger.ProcessingTime("10 seconds"))//批次時(shí)間
          .format("com.insight.spark.streaming.HiveSinkProvider")//自定義HiveSinkProvider
          .option("checkpointLocation", checkpointLocation)
          .queryName("write hive")

      case "2" =>
        wordCounts.writeStream
          .outputMode("update")
          .foreach(new ForeachWriter[Row] {
            var connection: Connection = _

            def open(partitionId: Long, version: Long): Boolean = {
              conf.set("hbase.zookeeper.quorum", ConfigLoader.getString("hbase.zookeeper.list"))
              conf.set("hbase.zookeeper.property.clientPort", ConfigLoader.getString("hbase.zookeeper.port"))
              conf.set("zookeeper.znode.parent", ConfigLoader.getString("zookeeper.znode.parent"))
              import org.apache.hadoop.hbase.client.ConnectionFactory
              connection = ConnectionFactory.createConnection(conf)
              true
            }

            def process(record: Row): Unit = {
              val tableName = TableName.valueOf(ConfigLoader.getString("hbase.table.name")) //表名
              val table = connection.getTable(tableName)
              val put = new Put(Bytes.toBytes(record.mkString))
              put.addColumn("info".getBytes(), Bytes.toBytes("word"), Bytes.toBytes(record(0).toString))
              put.addColumn("info".getBytes(), Bytes.toBytes("count"), Bytes.toBytes(record(1).toString))
              table.put(put)
            }

            def close(errorOrNull: Throwable): Unit = {
              connection.close()
            }
          })
          .option("checkpointLocation", checkpointLocation)
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .queryName("write hbase")

      case "3" =>

        /** 建表語句贵少,先建個spark庫
          * CREATE TABLE `words` (
          * `id` int(11) NOT NULL AUTO_INCREMENT,
          * `word` varchar(255) NOT NULL,
          * `count` int(11) DEFAULT 0,
          * PRIMARY KEY (`id`),
          * UNIQUE KEY `word` (`word`)
          * ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
          */
        val (url, user, pwd) = (args(4), args(5), args(6))
        wordCounts.writeStream
          .outputMode("complete")
          .foreach(new ForeachWriter[Row] {
            var conn: sql.Connection = _
            var p: PreparedStatement = _
            def open(partitionId: Long, version: Long): Boolean = {
              Class.forName("com.mysql.jdbc.Driver")
              conn = DriverManager.getConnection(url, user, pwd)
              p = conn.prepareStatement("replace into spark.words(word,count) values(?,?)")
              true
            }

            def process(record: Row): Unit = {
              p.setString(1, record(0).toString)
              p.setInt(2, record(1).toString.toInt)
              p.execute()
            }

            def close(errorOrNull: Throwable): Unit = {
              p.close()
              conn.close()
            }
          })
          .option("checkpointLocation", checkpointLocation)
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .queryName("write mysql")

      case _ =>
        wordCounts.writeStream
          .outputMode("update")
          .format("console")
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .option("checkpointLocation", checkpointLocation)
          .queryName("print it")

    }

    dsw.start().awaitTermination()

  }
}

HiveSinkProvider源碼

其中用到的HiveSinkProvider代碼如下:

package com.insight.spark.streaming

import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.slf4j.LoggerFactory


case class HiveSink(sqlContext: SQLContext,
                    parameters: Map[String, String],
                    partitionColumns: Seq[String],
                    outputMode: OutputMode) extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val logger = LoggerFactory.getLogger(this.getClass)

    val schema = StructType(Array(
      StructField("word", StringType),
      StructField("count", IntegerType)
    ))
    val res = data.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }
    // 轉(zhuǎn)化df格式
    val df = data.sparkSession.createDataFrame(res, schema)
    df.write.mode(SaveMode.Append).format("hive").saveAsTable("words")

  }
}

class HiveSinkProvider extends StreamSinkProvider with DataSourceRegister {
  override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
    HiveSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "HiveSinkProvider"
}

打包運(yùn)行呵俏,spark-submit --xxx this.jar ...就可以了。

點(diǎn):結(jié)構(gòu)化流春瞬、Spark Structured Streaming、hive套啤、hbase宽气、mysql
線:spark
面:內(nèi)存計(jì)算

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市潜沦,隨后出現(xiàn)的幾起案子萄涯,更是在濱河造成了極大的恐慌,老刑警劉巖唆鸡,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件涝影,死亡現(xiàn)場離奇詭異,居然都是意外死亡争占,警方通過查閱死者的電腦和手機(jī)燃逻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來臂痕,“玉大人伯襟,你說我怎么就攤上這事∥胀” “怎么了姆怪?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長澡绩。 經(jīng)常有香客問我稽揭,道長,這世上最難降的妖魔是什么肥卡? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任溪掀,我火速辦了婚禮,結(jié)果婚禮上步鉴,老公的妹妹穿的比我還像新娘膨桥。我一直安慰自己蛮浑,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布只嚣。 她就那樣靜靜地躺著沮稚,像睡著了一般。 火紅的嫁衣襯著肌膚如雪册舞。 梳的紋絲不亂的頭發(fā)上蕴掏,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天,我揣著相機(jī)與錄音调鲸,去河邊找鬼盛杰。 笑死,一個胖子當(dāng)著我的面吹牛藐石,可吹牛的內(nèi)容都是我干的即供。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼于微,長吁一口氣:“原來是場噩夢啊……” “哼逗嫡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起株依,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤驱证,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后恋腕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體抹锄,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年荠藤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伙单。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡哈肖,死狀恐怖车份,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情牡彻,我是刑警寧澤扫沼,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站庄吼,受9級特大地震影響缎除,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜总寻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一器罐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧渐行,春花似錦轰坊、人聲如沸铸董。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽粟害。三九已至,卻和暖如春颤芬,著一層夾襖步出監(jiān)牢的瞬間悲幅,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工站蝠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留汰具,地道東北人。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓菱魔,卻偏偏與公主長得像留荔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子澜倦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 17.分區(qū)分桶的區(qū)別聚蝶,為什么要分區(qū) 分區(qū)表:原來的一個大表存儲的時(shí)候分成不同的數(shù)據(jù)目錄進(jìn)行存儲。如果說是單分區(qū)表肥隆,...
    qydong閱讀 600評論 0 0
  • 表情是什么既荚,我認(rèn)為表情就是表現(xiàn)出來的情緒稚失。表情可以傳達(dá)很多信息栋艳。高興了當(dāng)然就笑了,難過就哭了句各。兩者是相互影響密不可...
    Persistenc_6aea閱讀 124,454評論 2 7
  • 16宿命:用概率思維提高你的勝算 以前的我是風(fēng)險(xiǎn)厭惡者吸占,不喜歡去冒險(xiǎn),但是人生放棄了冒險(xiǎn)凿宾,也就放棄了無數(shù)的可能矾屯。 ...
    yichen大刀閱讀 6,041評論 0 4