1. 減少client模式下的日志輸出
import org.apache.log4j.{ Level, Logger }
Logger.getLogger("org").setLevel(Level.ERROR)//或者WARN
2. 判斷字符串是否為整數或小數
和spark沒關系启绰,是個scala語法問題月洛。
實際項目中涉及過濾數據質量的問題日麸,因此綜合網上看到的例子整理出一個能同時適配整數和小數的方法杭煎。
var pattern = """^[0-9]+([.]{1}[0-9]+){0,1}""".r
def isNumber(s: String) = {
s match {
case null => false
case this.pattern(_*) => true
case _ => false
}
}
3. spark傳遞對象的序列化的問題
原因是寫了一個讀取配置文件的config類,在構造函數中讀取文件背率,然后賦值话瞧。報錯:
object not serializable
原因:spark任務是由driver分配給executor的。在driver中建立的class(基本上就是各種rdd算子之外定義的class)必須通過序列化方式下發(fā)寝姿,否則就要回避這種用法:
//定義
class config extends java.io.Serializable {
def this(path: String) {
this() //調用主構造函數
//自定義構造方法
}
}
//使用
val taskconf = new config(args(0))
其他的方法(不建議用交排,參見后續(xù)分析):
使用object或case object,此時不能定義構造函數会油,但是可以自己寫一個load函數个粱。使用的時候也不用new:
//定義
object config {
def load(path: String) {
}
}
//使用
val taskconf = config
taskconf.load(args(0))
//或者直接用config
相當于實現了一個靜態(tài)類,而且感覺作為一個只讀的配置類翻翩,這樣搞就夠了。
ps. 經過后續(xù)測試稻薇,object對象往executor里面?zhèn)骱孟襁€是有問題嫂冻,這個很奇怪:
我的obj中有個load函數,讀配置文件然后給object中的變量賦值塞椎,此外變量自身有默認值桨仿。在開發(fā)用的虛機上,object應該是正確而傳到executor了案狠,里面的配置值是正確的服傍。但是在實際服務器上钱雷,發(fā)現傳過去的是初始值,load函數中讀取的東西沒有傳過去吹零,目前沒有找出兩個環(huán)境的關鍵差別(差別肯定是有的罩抗,但不知道哪些方面起的作用。)
4. scala的class和object
目前只遇到了case class和class的序列化問題灿椅,其他的沒有深究
- object
- scala沒有靜態(tài)的概念套蒂,object可以模擬靜態(tài)方法,
- object不可用構造函數
- main函數必須在object里面
- 可以通過定義伴生對象茫蛹,可以實現既有實例成員又有靜態(tài)成員的類的功能(可互相訪問private field)
- object not serializable
- case class和case object
- 使用case 關鍵字定義類不需要創(chuàng)建對象直接調用
- case class:樣例類操刀,常被用于模式匹配,比如定義rdd轉df的表結構婴洼,感覺類似結構體(struct)骨坑。
- case class支持序列化,初始化可以不用new柬采,一般是只讀的(val)欢唾,系統(tǒng)自動創(chuàng)建伴生對象
- case class必須有參數列表,否則報錯警没,如果沒有參數匈辱,可以用case object
5. scala讀properties文件
之前沒用過,也不太理解這個Properties杀迹,格式和讀寫都很好用亡脸,但遇到的主要問題是路徑問題,網上很多教程是搞WEB開發(fā)的人寫的树酪,路徑的使用上存在一些差異浅碾。
最終的解決:
import java.util.Properties
import java.io.{ FileInputStream }
//調用
val ins = this.getClass().getResourceAsStream(path);
properties.load(new FileInputStream(path))
this.mathod = properties.getProperty("mathod", "0").trim()
直接用properties.load方法,path中即便有絕對路徑也不認续语,但加入第一句就沒問題了垂谢。如果沒有第一句,則要求properties文件在當前執(zhí)行路徑下(不是jar包位置)疮茄。
6. 累加器
- 累加器在Driver端定義賦初始值滥朱,并只能由Driver端讀取,Excutor端負責持續(xù)寫入力试。注意徙邻,Spark2語法,和1的時候不太一樣畸裳。
- 一般建議用到foreach等action里缰犁,因為只會執(zhí)行一次。transform中使用累加器有坑,例如可以把累加器放到map當中帅容,如果讀早了(在action之前)就沒有值颇象,如果后續(xù)有多個讀取該rdd的操作,則tranform中的累加器會重復累加并徘。此外還要考慮容錯恢復(tranform也可能執(zhí)行多遍)等情況遣钳。
- 在簡單計數時它和count()效果相同,但是count需要自己進行獨立的遍歷饮亏,累加器可以塞到其他的分析過程里(比如map)
- 如果是dataframe計數耍贾,貌似不是很方便了,不如直接用sql路幸。
- 可能累加器在流分析等場景更有用一些荐开。
對比一下:
方法1:map/reduce會遍歷一遍數據,然后的count或foreach會再遍歷一遍數據简肴。
//先建立SparkSession:spark
val accum = spark.sparkContext.longAccumulator("My Accumulator")
rdd1.map().reduce()
rdd1.count())
//或者
rdd1.foreach( x => accum.add(1))
方法2:rdd1只被遍歷的一遍晃听,即完成map/reduce和計數過程。
//先建立SparkSession:spark
val accum = spark.sparkContext.longAccumulator("My Accumulator")
//"My Accumulator"這個名字會在spark的web ui中顯示出來
rdd1.map( x => {accum.add(1);
x //這里指map的真正輸出
}).reduce()
println(accum.value)
方法2中砰识,如果:
rdd2 = rdd1.map( x => {accum.add(1);
x //這里指map的真正輸出
})
rdd2.reduce()
rdd2.reducebykey()
println(accum.value)
rdd2之后經歷了兩個action能扒,則計數器所在的map會被執(zhí)行兩遍。此時累加器的value會翻倍辫狼。
(注意rdd2的定義實際是惰性的初斑,因為沒有action)
數組累加器:
val a1 = spark.sparkContext.collectionAccumulator[String]("a1")
//在算子中使用的時候
a1.add("some string")
//driver端讀取的時候
a1.value.toArray().mkString("\n") //有點土
如果將累加器作為參數傳遞(或者其他場景需要引用頭文件):
import org.apache.spark.util.{ LongAccumulator, CollectionAccumulator }
8. 廣播變量
//先建立SparkSession:spark
spark.sparkContext.broadcast(prop)
廣播變量允許程序員在每臺機器上緩存一個只讀變量(RDD數據也行),而不是將變量與任務一起發(fā)送膨处。
目前看到obj是無法被廣播的见秤,因為無法序列化。
在join真椿、跨stage鹃答、多task等場景可以優(yōu)化。
還有就是官網當中rdd突硝、累加器和共享變量是放在一個標題下的测摔,不知道官方如何定位dataframe中的累加器和共享變量。
補充:后續(xù)廣播了一個dataframe解恰,但想在另一個dataframe之中嵌套使用時出錯锋八,這個問題暫時沒解決。
9. 有關scala中字符串split和array長度
遇到一個小坑护盈,應該是我孤陋寡聞了:
val s = "0,0,0,0,0,,0,,,,,,,,"
val a = s.split(",")
println(a.length)
出來的長度是7查库,也就是最后的空值沒有計算在內,中間的空值是考慮的
但如果:
val s = "0,0,0,0,0,,0,,,,,,,,"
val a = s.split(",",-1)
println(a.length)
則輸出長度是15黄琼,此時舉例來說,a(8)=""(empty),不是null脏款。
需要注意字符串isEmpty和null的區(qū)別
10. 有關history-server和eventlog围苫,以及通過rest接口監(jiān)控
- 其實通過網頁監(jiān)控任務,比通過rest接口要方便撤师。
- 可參考的官網信息剂府,比如rest接口和history-server配置方法等:http://spark.apache.org/docs/latest/monitoring.html
網上有很多教程,但有些不是很全剃盾,我的體會如下:
- spark的web監(jiān)控界面腺占,默認是8080,但這個主要看的是集群痒谴,當然也能看任務衰伯。spark提交的任務詳情,轉到默認4040查看(注意如果有多個任務上下文积蔚,則依次是4041意鲸、4042),但如果是完成的任務就需要通過history-server(默認18080端口查看了)尽爆。注意:4040端口只在程序運行時有響應怎顾,但程序運行完畢之后,4040端口立馬不響應了漱贱。但是18080是可以查看所有完成或未完成任務細節(jié)的(我還沒有對比顯示細節(jié)是否有差異)
- 如果想通過rest接口查看任務狀態(tài)(獲得json數據)槐雾,通過8080是查不到的(我做錯了嗎?)幅狮,只能通過4040+18080查看募强,目前看來18080更方便,但前提是配置并啟動history-server彪笼。
- 提交任務之后钻注,是不知道任務id的,這時候(如果通過rest方式)只能通過 http://master:18080/api/v1/applications?status=[completed|running] 先獲得任務列表配猫,再來找任務幅恋,感覺有點不爽,不知道還有什么更好的辦法沒有泵肄。
- 首先配置history-server:
4.1. 在conf/spark-env.sh中添加
export SPARK_HISTORY_OPTS=" -Dspark.history.fs.logDirectory=hdfs://node1:9000/history/"
spark.history.fs.logDirectory這一項是必須配置的捆交,也可以用“file:///”指定本地目錄。但路徑必須是已經存在的腐巢。
還可以利用-D添加更多的指令品追,在上面官網的“Spark History Server Configuration Options”一節(jié)中記錄了很多可以修改的配置,常見的比如重新指定端口(spark.history.ui.port)冯丙,以及cache的任務數量(spark.history.retainedApplications)肉瓦。
之后啟動服務:start-history-server.sh(sbin下面)
4.2. 如果不做4.1的配置,則需要啟動history-server的時候使用:start-history-server.sh dfs://node1:9000/history/
4.3.在提交的任務中配置:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:9000/history
spark.eventLog.compress true (可選)
注意:spark.eventLog.dir是針對任務的,spark.history.fs.logDirectory是針對history server的泞莉,這兩項很容易迷惑哪雕,它們的配置文件不同,但內容(路徑)必須是一致的鲫趁,且必須提前建立的斯嚎!否則history server找不到日志的。
另外挨厚,理論上任務的配置堡僻,直接在spark-defaults.conf中配就好了,submit的時候疫剃,會自動加載這些默認參數钉疫,但是在我的項目中不生效,不知道什么原因(現在只能理解我的spark-defaults.conf格式有錯)慌申,于是我把上述配置內容寫入代碼陌选,就沒問題了。
:
5