SparkStreaming之讀取Kafka數(shù)據(jù)

本文主要記錄使用SparkStreaming從Kafka里讀取數(shù)據(jù)目养,并計(jì)算WordCount

主要內(nèi)容:

  • 1.本地模式運(yùn)行SparkStreaming
  • 2.yarn-client模式運(yùn)行

相關(guān)文章:
1.Spark之PI本地
2.Spark之WordCount集群
3.SparkStreaming之讀取Kafka數(shù)據(jù)
4.SparkStreaming之使用redis保存Kafka的Offset
5.SparkStreaming之優(yōu)雅停止
6.SparkStreaming之寫(xiě)數(shù)據(jù)到Kafka
7.Spark計(jì)算《西虹市首富》短評(píng)詞云

1.本地模式運(yùn)行

object ScalaKafkaStreaming {
  def main(args: Array[String]): Unit = {
    // offset保存路徑
    val checkpointPath = "D:\\hadoop\\checkpoint\\kafka-direct"

    val conf = new SparkConf()
      .setAppName("ScalaKafkaStream")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(checkpointPath)

    val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
    val groupId = "kafka-test-group"
    val topicName = "Test"
    val maxPoll = 500

    val kafkaParams = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    val kafkaTopicDS = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set(topicName), kafkaParams))

    kafkaTopicDS.map(_.value)
      .flatMap(_.split(" "))
      .map(x => (x, 1L))
      .reduceByKey(_ + _)
      .transform(data => {
        val sortData = data.sortBy(_._2, false)
        sortData
      })
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

本地模式運(yùn)行SparkStreaming每隔5s從Kafka讀取500條數(shù)據(jù)并計(jì)算WorkCount,然后按次數(shù)降序排列毒嫡,并將Offset保存在本地文件夾

創(chuàng)建Topic

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka --topic Test --partitions 3 --replication-factor 3

查看創(chuàng)建的Topic

kafka-topics.sh --describe --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka

編寫(xiě)Kafka程序并往Topic里寫(xiě)數(shù)據(jù)

public class ProducerTest {
    private static final String[] WORDS = {
            "hello", "hadoop", "java", "kafka", "spark"
    };

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
        boolean flag = true;
        while (flag) {
            for (int i = 0; i < 500; i++) {
                //3癌蚁、發(fā)送數(shù)據(jù)
                kafkaProducer.send(new ProducerRecord("Test", WORDS[new Random().nextInt(5)]));
            }
            kafkaProducer.flush();
            System.out.println("==========Kafka Flush==========");
            Thread.sleep(5000);
        }

        kafkaProducer.close();
    }
}

每5s寫(xiě)500條數(shù)據(jù)到Topic

運(yùn)行結(jié)果如下:


可以看到我們的程序可以正確運(yùn)行了。

2.yarn-client模式運(yùn)行

修改程序的checkpoint為hdfs上的目錄

object ScalaKafkaStreaming {
  def main(args: Array[String]): Unit = {
    // offset保存路徑
    val checkpointPath = "/data/output/checkpoint/kafka-direct"

    val conf = new SparkConf()
      .setAppName("ScalaKafkaStream")
      //.setMaster("local[2]")

    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc, Seconds(3))
    ssc.checkpoint(checkpointPath)

    val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
    val groupId = "kafka-test-group"
    val topicName = "Test"
    val maxPoll = 20000

    val kafkaParams = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    val kafkaTopicDS = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set(topicName), kafkaParams))

    kafkaTopicDS.map(_.value)
      .flatMap(_.split(" "))
      .map(x => (x, 1L))
      .reduceByKey(_ + _)
      .transform(data => {
        val sortData = data.sortBy(_._2, false)
        sortData
      })
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

pom.xml文件

<dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
    <scope>compile</scope>
  </dependency>
</dependencies>
<build>
  <plugins>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
        <appendAssemblyId>false</appendAssemblyId>
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <!-- 此處指定main方法入口的class -->
            <mainClass></mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>assembly</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
    <plugin>
      <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <version>2.15.2</version>
      <executions>
        <execution>
          <id>scala-compile-first</id>
          <goals>
            <goal>compile</goal>
          </goals>
          <configuration>
            <includes>
              <include>**/*.scala</include>
            </includes>
          </configuration>
        </execution>
        <execution>
          <id>scala-test-compile</id>
          <goals>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

這里將spark-streaming-kafka-0-10_2.11打包進(jìn)jar兜畸,不然運(yùn)行時(shí)會(huì)報(bào)找不到一些類努释,也可以通過(guò)其他方式解決

上傳jar,執(zhí)行

./bin/spark-submit \
--class me.jinkun.scala.kafka.ScalaKafkaStreaming \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
/opt/soft-install/data/spark-yarn-1.0-SNAPSHOT.jar

運(yùn)行過(guò)程可能會(huì)報(bào)如下錯(cuò)誤:

Current usage: 114.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container.

解決方式:參考https://blog.csdn.net/kaaosidao/article/details/77950125
我這里修改yarn-site.xml咬摇,加入如下配置

<property>
     <name>yarn.nodemanager.vmem-pmem-ratio</name>
     <value>3</value>
</property>

運(yùn)行如下:


說(shuō)明程序已經(jīng)正常啟動(dòng)伐蒂,進(jìn)入Yarn的管理界面可以看到正在執(zhí)行任務(wù)http://hadoop1:8088

Yarn管理界面正在運(yùn)行的作用

通過(guò)ID可以查看運(yùn)行的日志


運(yùn)行的結(jié)果

通過(guò)Tracking UI 可以看到Spark的管理界面


運(yùn)行如下命令停止SparkStreaming程序

yarn application -kill [appid]

3.checkpoint


在我們?cè)O(shè)置的checkpoint文件夾里保存了最近5次的checkpoint,在線上程序一般保存到hdfs里肛鹏。


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末逸邦,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子在扰,更是在濱河造成了極大的恐慌缕减,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件芒珠,死亡現(xiàn)場(chǎng)離奇詭異桥狡,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)皱卓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén)裹芝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人娜汁,你說(shuō)我怎么就攤上這事嫂易。” “怎么了存炮?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵炬搭,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我穆桂,道長(zhǎng)宫盔,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任享完,我火速辦了婚禮灼芭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘般又。我一直安慰自己彼绷,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布茴迁。 她就那樣靜靜地躺著寄悯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪堕义。 梳的紋絲不亂的頭發(fā)上猜旬,一...
    開(kāi)封第一講書(shū)人閱讀 49,772評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音倦卖,去河邊找鬼洒擦。 笑死,一個(gè)胖子當(dāng)著我的面吹牛怕膛,可吹牛的內(nèi)容都是我干的熟嫩。 我是一名探鬼主播,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼褐捻,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼掸茅!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起柠逞,我...
    開(kāi)封第一講書(shū)人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤倦蚪,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后边苹,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體陵且,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年个束,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了慕购。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡茬底,死狀恐怖沪悲,靈堂內(nèi)的尸體忽然破棺而出巢寡,到底是詐尸還是另有隱情尉辑,我是刑警寧澤,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布茴扁,位于F島的核電站,受9級(jí)特大地震影響涉馁,放射性物質(zhì)發(fā)生泄漏门岔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一烤送、第九天 我趴在偏房一處隱蔽的房頂上張望寒随。 院中可真熱鬧,春花似錦帮坚、人聲如沸妻往。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)讯泣。三九已至,卻和暖如春阅悍,著一層夾襖步出監(jiān)牢的瞬間判帮,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工溉箕, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留晦墙,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓肴茄,卻偏偏與公主長(zhǎng)得像晌畅,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子寡痰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容