準(zhǔn)備工作:
開始工作
1. 啟動zookeeper
打開終端,切換到 zookeeper HOME
目錄琼腔, 進(jìn)入conf文件夾寞秃,拷貝一份 zoo_sample.cfg
副本并重命名為 zoo.cfg
切換到上級的bin目錄中斟叼,執(zhí)行 ./zkServer.sh start
啟動zookeeper,會有日志打印
Starting zookeeper ... STARTED
然后用 ./zkServer.sh status
查看狀態(tài)春寿,如果有下列信息輸出朗涩,則說明啟動成功
Mode: standalone
如果要停止zookeeper,則運(yùn)行 ./zkServer stop
即可
2. 啟動kafka
打開終端绑改,切換到 kafka HOME
目錄,運(yùn)行 bin/kafka-server-start.sh config/server.properties
會有以下類似日志輸出
[2014-11-12 17:38:13,395] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2014-11-12 17:38:13,420] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
3. 啟動kafka生產(chǎn)者
重新打開一個終端谢床,暫叫做 生產(chǎn)者終端,方便后面引用說明厘线。切換到 kafka HOME
目錄,運(yùn)行 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
創(chuàng)建一個叫 test
的主題识腿。
4. 編寫scala應(yīng)用程序
package test
import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object KafkaWordCount {
def main(args: Array[String]) {
// if (args.length < 4) {
// System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
// System.exit(1)
// }
// StreamingExamples.setStreamingLogLevels()
//val Array(zkQuorum, group, topics, numThreads) = args
val zkQuorum = "localhost:2181"
val group = "1"
val topics = "test"
val numThreads = 2
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
//val wordCounts = words.map(x => (x, 1L))
// .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
build.sbt
文件中添加依賴
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.0"
啟動scala程序,然后在 上面第2步的 生產(chǎn)者終端中輸入一些字符串造壮,如 sdfsadf a aa a a a a a a a a
渡讼,在ide的控制臺上可以看到有信息輸出
4/11/12 16:38:22 INFO scheduler.DAGScheduler: Stage 195 (take at DStream.scala:608) finished in 0.004 s
-------------------------------------------
Time: 1415781502000 ms
-------------------------------------------
(aa,1)
(a,9)
(sdfsadf,1)
說明程序成功運(yùn)行。