存在一下一種應(yīng)用場(chǎng)景需要計(jì)算某一時(shí)刻的前后一段時(shí)間的相關(guān)統(tǒng)計(jì)信息。
此處存在一個(gè)rangeBetween(start,end)的Sql 函數(shù),建立某一段數(shù)據(jù)記錄的窗口铝阐,然后進(jìn)行相關(guān)操作的數(shù)據(jù)統(tǒng)計(jì)分析宏粤。
val seq = Seq(("001","events1",10,"2016-05-01 10:50:51"),("002","events2",210,"2016-05-01 10:50:56"),("001","events3",100,"2016-05-01 10:58:51"),("001","events4",50,"2016-05-01 10:51:51"))
val rdd= sc.parallelize(seq)
import sqlContext.implicits._
val df = rdd.toDf("id","event","metric","time")
val ts = col("time").cast("timestamp").cast("long")
val interval = (round(ts/300.0)*300).cast("long").cast("timestamp").alias("interval")
df.groupBy($"id",interval).sum("metric").show(false)
val w = Window.partitionBy($"id").orderBy("metric").rangeBetween(-150,150)
df.withColumn("tss",ts).withColumn("sum_metric",sum("metric").over(w)).orderBy("id","time").show(flase)
以上代碼中最最為關(guān)鍵的是:(window 必須在HiveContext中運(yùn)行)
val w = Window.partitionBy($"id").orderBy("metric").rangeBetween(-150,150)
可以沒(méi)有partitionBy? 但是必須有orderBy("metric").rangeBetween(-150,150)? 根據(jù)該排序列計(jì)算的值計(jì)算前后Range的窗口,進(jìn)行數(shù)據(jù)統(tǒng)計(jì)(orderBy的是時(shí)間 則根據(jù)時(shí)間排序Range骂因,是別的列 則根據(jù)該列的值進(jìn)行排序Range)
如果不添加 OrderBy("") 報(bào)錯(cuò):Non-zero range offsets are not supported foe windows with multiple order expressions
如果添加多個(gè) 則會(huì)報(bào)錯(cuò):Window specification is not valid because this range window Frame only accepts at most one Order by expression.