官方示例說(shuō)明
按照官方文檔的 這個(gè)示例說(shuō)明,可以輕松的在本地的spark-shell環(huán)境中測(cè)試這個(gè)示例泉粉。示例连霉,即為了更好的入門(mén),那么就再說(shuō)明一下嗡靡。
運(yùn)行這個(gè)統(tǒng)計(jì)單詞的方式有三種跺撼,前面兩種是官方文檔上的指引,第三種則是用scala程序運(yùn)行讨彼。
-
第一種方式, run-demo
- 打開(kāi)一個(gè)終端歉井,打開(kāi)一個(gè)終端,輸入 命令
nc -lk 9999
哈误,暫時(shí)叫做 “nc終端” 吧 - 再打開(kāi)終端哩至,切換到Spark HOME目錄, 執(zhí)行命令
bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
蜜自, 然后每秒會(huì)有類(lèi)似一下日志循環(huán)輸出
-------------------------------------------
Time: 1415701382000 ms
-------------------------------------------
-------------------------------------------
Time: 1415701383000 ms
-------------------------------------------
- 在nc終端隨便輸入一些字符串菩貌,用空格隔開(kāi),回車(chē)重荠,如aa aa bb c箭阶。可以在上面的Spark終端中看到有新內(nèi)容輸出
-------------------------------------------
Time: 1415701670000 ms
-------------------------------------------
(aa,2)
(bb,1)
(c,1)
OK戈鲁,成功仇参!
-
第二種 spark-shell 模式
下面介紹在spark-shell中輸入scala代碼運(yùn)行的方式。
- 同上面第一步婆殿,打開(kāi)一個(gè)終端诈乒,打開(kāi)一個(gè)終端,輸入 命令
nc -lk 9999
婆芦,暫時(shí)叫做 “nc終端” 吧 - 再打開(kāi)一個(gè)終端抓谴, 切換到Spark HOME目錄下暮蹂,輸入
bin/spark-shell
(如果你已經(jīng)安裝好了Spark的話,直接輸入spark-shell
即可)癌压,等待Spark啟動(dòng)成功仰泻,會(huì)打印信息
Spark context available as sc.
scala>
然后輸入以下語(yǔ)句:
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming._
import org.apache.spark.streaming.api._
// Create a StreamingContext with a local master
val ssc = new StreamingContext(sc, Seconds(1))
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print a few of the counts to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
會(huì)打印以下信息:
14/11/11 18:07:23 INFO MemoryStore: ensureFreeSpace(2216) called with curMem=100936, maxMem=278019440
......
14/11/11 18:07:23 INFO DAGScheduler: Stage 91 (take at DStream.scala:608) finished in 0.004 s
14/11/11 18:07:23 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.007531701 s
-------------------------------------------
Time: 1415700443000 ms
-------------------------------------------
- 同第一種方式的第3步,隨便輸入一些字符串滩届,用空格隔開(kāi)集侯,回車(chē),如aa aa bb c帜消√耐鳎可以在上面的Spark終端中看到有新內(nèi)容輸出
-------------------------------------------
Time: 1415701670000 ms
-------------------------------------------
(aa,2)
(bb,1)
(c,1)
OK,成功泡挺!
-
第三種 scala-ide編程方式
在用這種方式運(yùn)行這個(gè)demo代碼的時(shí)候辈讶,遇到了不少問(wèn)題,記錄下來(lái)娄猫,供大家參考贱除。這個(gè)例子,請(qǐng)大家先根據(jù)這里記錄的方式進(jìn)行操作媳溺,得到一個(gè)可以運(yùn)行的程序月幌,后面我會(huì)記錄遇到的問(wèn)題。
- 下載scala-ide, 下載鏈接悬蔽,下載 For Scala 2.10.4 下的對(duì)應(yīng)平臺(tái)的ide扯躺,解壓,運(yùn)行蝎困。
- 安裝sbt录语,下載鏈接,
- 安裝sbteclipse, github地址, 編輯
~/.sbt/0.13/plugins/plugins.sbt
文件, 添加以下內(nèi)容addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")
禾乘,如果沒(méi)有plugins目錄和plugins.sbt钦无,自行創(chuàng)建。 - 用向?qū)?chuàng)建一個(gè)scala項(xiàng)目盖袭,并在項(xiàng)目根目錄下創(chuàng)建一個(gè)build.sbt文件失暂,添加以下內(nèi)容(注意,每行正式語(yǔ)句之后要換行)
name := "spark-test"
version := "1.0"
scalaVersion := "2.10.4"
// set the main class for the main 'run' task
// change Compile to Test to set it for 'test:run'
mainClass in (Compile, run) := Some("test.SparkTest")
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.1.0"
- 創(chuàng)建test.SparkTest.scala文件鳄虱,添加以下代碼
package test
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext
import org.apache.spark.api.java.function._
import org.apache.spark.streaming._
import org.apache.spark.streaming.api._
object SparkTest {
def main(args: Array[String]): Unit = {
// Create a StreamingContext with a local master
// Spark Streaming needs at least two working thread
val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10))
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey( + _)
wordCounts.print
ssc.start
ssc.awaitTermination
}
}
- 終端中切換目錄到這個(gè)項(xiàng)目根目錄弟塞,輸入命令
sbt
, 命令運(yùn)行成功后拙已,敲入eclipse
生成eclipse項(xiàng)目和項(xiàng)目所需依賴(lài) - 同第一種方式的第1,3步决记,
再打開(kāi)一個(gè)終端,輸入 命令nc -lk 9999
倍踪。
然后運(yùn)行剛才寫(xiě)的main程序系宫,在nc終端中輸入一些字符串索昂,用空格隔開(kāi),回車(chē)扩借,如aa aa bb c椒惨。可以在ide控制臺(tái)中觀察到
-------------------------------------------
Time: 1415701670000 ms
-------------------------------------------
(aa,2)
(bb,1)
(c,1)
OK潮罪,成功康谆!
下面是遇到的問(wèn)題及解決方法:
1. 運(yùn)行程序說(shuō)找不到主類(lèi)
解:沒(méi)有在sbt文件配置主類(lèi)是哪個(gè),在build.sbt
文件中添加以下代碼
mainClass in (Compile, run) := Some("test.SparkTest")
Some中就是主類(lèi)的路徑
2. java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
這個(gè)問(wèn)題困擾了我很長(zhǎng)時(shí)間嫉到,一直沒(méi)找到怎么解決沃暗。后來(lái)看到說(shuō)是scala每次版本升級(jí)不兼容以前的版本編譯的庫(kù),于是換了對(duì)應(yīng)的版本的ide才正常運(yùn)行何恶。
解:scala-ide版本和現(xiàn)在用的spark包依賴(lài)編譯的scala版本不一致孽锥, 請(qǐng)下載上面說(shuō)過(guò)的 scala-ide For Scala 2.10.4
版本。