相比于傳統(tǒng)代碼,Spark是比較難調試的奸鬓,所以對其進行單元測試是非常必要的焙畔。
RDD測試
RDD在集群中運行,每次修改bug后串远,都要上傳到集群進行測試宏多,代價非常大。
所以優(yōu)先在本地進行單元測試澡罚,可以減少小模塊的邏輯錯誤伸但。
例如,要測試一個WordCount程序:
//定義一個簡單的wordcount
object WordCount extends Serializable{
def count(lines:RDD[String]): RDD[(String,Int)]={
val rdd=lines.flatMap(line=>line.split("\\s")).map(word=>(word,1)).reduceByKey(_ + _)
rdd
}
}
可以通過sc.parallelize()
和 sc.textFile()
模擬創(chuàng)建RDD留搔,對返回的RDD進行遍歷校驗:
//引入scalatest建立一個單元測試類更胖,混入特質BeforeAndAfter,在before和after中分別初始化sc和停止sc,
//初始化SparkContext時只需將Master設置為local(local[N],N表示線程)即可却妨,無需本地配置或搭建集群饵逐,
class WordCountTests extends FlatSpec with BeforeAndAfter{
val master="local" // sparkcontext的運行master
var sc:SparkContext=_
it should("test success") in{
//其中參數(shù)為rdd或者dataframe可以通過通過簡單的手動構造即可
val seq=Seq("the test test1","the test","the")
val rdd=sc.parallelize(seq)
val wordCounts=WordCount.count(rdd)
wordCounts.map(p=>{
p._1 match {
case "the"=>
assert(p._2==3)
case "test"=>
assert(p._2==2)
case "test1"=>
assert(p._2==1)
case _=>
None
}
}).foreach(_=>())
}
//這里before和after中分別進行sparkcontext的初始化和結束,如果是SQLContext也可以在這里面初始化
before{
val conf=new SparkConf()
.setAppName("test").setMaster(master)
sc=new SparkContext(conf)
}
after{
if(sc!=null){
sc.stop()
}
}
}
在將 master URL 設置為
local
來測試時會簡單的創(chuàng)建一個SparkContext
彪标,運行您的操作倍权,然后調用SparkContext.stop()
將該作業(yè)停止
無返回值方法測試
很多情況下,方法并沒有返回值捞烟,而是把結果輸出到文件或其他介質薄声。這種情況下,需要check輸出文件進行測試坷襟,比較困難奸柬。
例如:
trait WriterHandle{
def writer(records:Seq[GenericRecord]):Unit={
val parquetWriter=...
records.foreach(parquetWriter.writer(..))
}
}
//一個類處混入這個特質,經過一定轉換后將結果數(shù)據寫入parquet中
class ProcessHandle(objects:Iterator[T]) extends Serializable with WriterHandle{
def process():Unit={
val records:Seq[GenericRecord]=build(objects)={
...
}
//這里調用了特質writer中的writer方法生年,實際單元測試運行到這里可能寫入的時候會出錯婴程,不能正常測試通過
writer(records)
}
}
這種情況,可以把寫文件邏輯抽象成trait
抱婉,測試時進行替換:
class Writertests extends FlatSpec {
it should("write success") in{
val objects=Seq(object1,object2..).toIterator
//在new處理類档叔,混入原先特質的一個子特質
val process=new ProcessHandle(objects) with Writerhandletest
}
}
//可以自定義一個trait繼承自原先的特質,通過將原先的方法覆蓋蒸绩,然后在重寫后的方法里面的根據傳入值定義所需要斷言即可
trait Writerhandletest extends WriterHandle{
override def writer(records:Seq[GenericRecord]):Unit={
assert(records.length==N)
assert(records(0).XX=="xxx")
}
}
私有方法測試
如果有必要衙四,可以也可對私有方法進行測試。例如:
class MyTest(s:String){
//此公有方法可能不方便測試
def ():Unit={
...
doSth(s)
}
//這里私有方法患亿,可能是邏輯關鍵所在传蹈,有必要測試
private def doSth(s:String):String={
...
}
}
這種情況需要借助于PrivateMethodTester,進行私有方法的調用:
//要混入PrivateMethodTester特質
class MytestTests extends FlatSpec with PrivateMethodTester{
it should("write success") in{
//首先new一個要測試的類
val myTest=new MyTest("string")
//其中通過PrivateMethod修飾步藕,[]中為返回值惦界, ('method)單引號后跟私有方法名
val dosth=PrivateMethod[String]("doSth")
//通過invokePrivate 委托調用私有方法,注意參數(shù)要對,貌似傳null的話會找不到對應的方法
val str=myTest invokePrivate dosth("string")
//最后斷言相應的至即可
asset(str=="string")
}
}