本文主要記錄使用SparkStreaming從Kafka里讀取數(shù)據(jù)目养,并計(jì)算WordCount
主要內(nèi)容:
- 1.本地模式運(yùn)行SparkStreaming
- 2.yarn-client模式運(yùn)行
相關(guān)文章:
1.Spark之PI本地
2.Spark之WordCount集群
3.SparkStreaming之讀取Kafka數(shù)據(jù)
4.SparkStreaming之使用redis保存Kafka的Offset
5.SparkStreaming之優(yōu)雅停止
6.SparkStreaming之寫(xiě)數(shù)據(jù)到Kafka
7.Spark計(jì)算《西虹市首富》短評(píng)詞云
1.本地模式運(yùn)行
object ScalaKafkaStreaming {
def main(args: Array[String]): Unit = {
// offset保存路徑
val checkpointPath = "D:\\hadoop\\checkpoint\\kafka-direct"
val conf = new SparkConf()
.setAppName("ScalaKafkaStream")
.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(checkpointPath)
val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
val groupId = "kafka-test-group"
val topicName = "Test"
val maxPoll = 500
val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
val kafkaTopicDS = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set(topicName), kafkaParams))
kafkaTopicDS.map(_.value)
.flatMap(_.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.transform(data => {
val sortData = data.sortBy(_._2, false)
sortData
})
.print()
ssc.start()
ssc.awaitTermination()
}
}
本地模式運(yùn)行SparkStreaming每隔5s從Kafka讀取500條數(shù)據(jù)并計(jì)算WorkCount,然后按次數(shù)降序排列毒嫡,并將Offset保存在本地文件夾
創(chuàng)建Topic
kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka --topic Test --partitions 3 --replication-factor 3
查看創(chuàng)建的Topic
kafka-topics.sh --describe --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka
編寫(xiě)Kafka程序并往Topic里寫(xiě)數(shù)據(jù)
public class ProducerTest {
private static final String[] WORDS = {
"hello", "hadoop", "java", "kafka", "spark"
};
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
boolean flag = true;
while (flag) {
for (int i = 0; i < 500; i++) {
//3癌蚁、發(fā)送數(shù)據(jù)
kafkaProducer.send(new ProducerRecord("Test", WORDS[new Random().nextInt(5)]));
}
kafkaProducer.flush();
System.out.println("==========Kafka Flush==========");
Thread.sleep(5000);
}
kafkaProducer.close();
}
}
每5s寫(xiě)500條數(shù)據(jù)到Topic
運(yùn)行結(jié)果如下:
可以看到我們的程序可以正確運(yùn)行了。
2.yarn-client模式運(yùn)行
修改程序的checkpoint為hdfs上的目錄
object ScalaKafkaStreaming {
def main(args: Array[String]): Unit = {
// offset保存路徑
val checkpointPath = "/data/output/checkpoint/kafka-direct"
val conf = new SparkConf()
.setAppName("ScalaKafkaStream")
//.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(3))
ssc.checkpoint(checkpointPath)
val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
val groupId = "kafka-test-group"
val topicName = "Test"
val maxPoll = 20000
val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
val kafkaTopicDS = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set(topicName), kafkaParams))
kafkaTopicDS.map(_.value)
.flatMap(_.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.transform(data => {
val sortData = data.sortBy(_._2, false)
sortData
})
.print()
ssc.start()
ssc.awaitTermination()
}
}
pom.xml文件
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 此處指定main方法入口的class -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
這里將spark-streaming-kafka-0-10_2.11打包進(jìn)jar兜畸,不然運(yùn)行時(shí)會(huì)報(bào)找不到一些類努释,也可以通過(guò)其他方式解決
上傳jar,執(zhí)行
./bin/spark-submit \
--class me.jinkun.scala.kafka.ScalaKafkaStreaming \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
/opt/soft-install/data/spark-yarn-1.0-SNAPSHOT.jar
運(yùn)行過(guò)程可能會(huì)報(bào)如下錯(cuò)誤:
Current usage: 114.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container.
解決方式:參考https://blog.csdn.net/kaaosidao/article/details/77950125
我這里修改yarn-site.xml咬摇,加入如下配置
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>3</value>
</property>
運(yùn)行如下:
說(shuō)明程序已經(jīng)正常啟動(dòng)伐蒂,進(jìn)入Yarn的管理界面可以看到正在執(zhí)行任務(wù)http://hadoop1:8088
通過(guò)ID可以查看運(yùn)行的日志
通過(guò)Tracking UI 可以看到Spark的管理界面
運(yùn)行如下命令停止SparkStreaming程序
yarn application -kill [appid]
3.checkpoint
在我們?cè)O(shè)置的checkpoint文件夾里保存了最近5次的checkpoint,在線上程序一般保存到hdfs里肛鹏。