時間序列預測法及Spark-TimeSerial實現(xiàn)

時間序列預測法及Spark-Timeserial

時間序列預測法

時間序列預測法(Time Series Forecasting Method)

什么是時間序列預測法卵洗?

? 一種歷史資料延伸預測鳄逾,也稱歷史引伸預測法。是以時間數(shù)列所能反映的社會經(jīng)濟現(xiàn)象的發(fā)展過程和規(guī)律性垒棋,進行引伸外推,預測其發(fā)展趨勢的方法荔燎。

時間序列撞芍,也叫時間數(shù)列丢胚、歷史復數(shù)或動態(tài)數(shù)列翩瓜。它是將某種統(tǒng)計指標的數(shù)值,按時間先后順序排到所形成的數(shù)列携龟。時間序列預測法就是通過編制和分析時間序列兔跌,根據(jù)時間序列所反映出來的發(fā)展過程、方向和趨勢峡蟋,進行類推或延伸坟桅,借以預測下一段時間或以后若干年內(nèi)可能達到的水平华望。其內(nèi)容包括:收集與整理某種社會現(xiàn)象的歷史資料;對這些資料進行檢查鑒別仅乓,排成數(shù)列;分析時間數(shù)列建蹄,從中尋找該社會現(xiàn)象隨時間變化而變化的規(guī)律痛单,得出一定的模式;以此模式去預測該社會現(xiàn)象將來的情況旭绒。

時間序列預測法的步驟

? 第一步 收集歷史資料,加以整理重父,編成時間序列,并根據(jù)時間序列繪成統(tǒng)計圖忽匈。時間序列分析通常是把各種可能發(fā)生作用的因素進行分類丹允,傳統(tǒng)的分類方法是按各種因素的特點或影響效果分為四大類:(1)長期趨勢雕蔽;(2)季節(jié)變動;(3)循環(huán)變動扇售;(4)不規(guī)則變動承冰。

第二步 分析時間序列巷懈。時間序列中的每一時期的數(shù)值都是由許許多多不同的因素同時發(fā)生作用后的綜合結果顶燕。

第三步 求時間序列的長期趨勢(T)季節(jié)變動(s)和不規(guī)則變動(I)的值涌攻,并選定近似的數(shù)學模式來代表它們恳谎。對于數(shù)學模式中的諸未知參數(shù)因痛,使用合適的技術方法求出其值鸵膏。

第四步 利用時間序列資料求出長期趨勢谭企、季節(jié)變動和不規(guī)則變動的數(shù)學模型后债查,就可以利用它來預測未來的長期趨勢值T和季節(jié)變動值s盹廷,在可能的情況下預測不規(guī)則變動值I速和。然后用以下模式計算出未來的時間序列的預測值Y:

加法模式T+S+I=Y

乘法模式T×S×I=Y

如果不規(guī)則變動的預測值難以求得暮芭,就只求長期趨勢和季節(jié)變動的預測值瑞筐,以兩者相乘之積或相加之和為時間序列的預測值膘格。如果經(jīng)濟現(xiàn)象本身沒有季節(jié)變動或不需預測分季分月的資料,則長期趨勢的預測值就是時間序列的預測值,即T=Y访锻。但要注意這個預測值只反映現(xiàn)象未來的發(fā)展趨勢,即使很準確的趨勢線在按時間順序的觀察方面所起的作用鲤妥,本質(zhì)上也只是一個平均數(shù)的作用,實際值將圍繞著它上下波動。

時間序列分析基本特征

? 1.時間序列分析法是根據(jù)過去的變化趨勢預測未來的發(fā)展,它的前提是假定事物的過去延續(xù)到未來。

時間序列分析,正是根據(jù)客觀事物發(fā)展的連續(xù)規(guī)律性,運用過去的歷史數(shù)據(jù),通過統(tǒng)計分析,進一步推測未來的發(fā)展趨勢若皱。事物的過去會延續(xù)到未來這個假設前提包含兩層含義:一是不會發(fā)生突然的跳躍變化,是以相對小的步伐前進;二是過去和當前的現(xiàn)象可能表明現(xiàn)在和將來活動的發(fā)展變化趨向泥耀。這就決定了在一般情況下,時間序列分析法對于短迎瞧、近期預測比較顯著,但如延伸到更遠的將來,就會出現(xiàn)很大的局限性,導致預測值偏離實際較大而使決策失誤足绅。

2.時間序列數(shù)據(jù)變動存在著規(guī)律性與不規(guī)律性

時間序列中的每個觀察值大小,是影響變化的各種不同因素在同一時刻發(fā)生作用的綜合結果首量。從這些影響因素發(fā)生作用的大小和方向變化的時間特性來看,這些因素造成的時間序列數(shù)據(jù)的變動分為四種類型拣宏。

(1)趨勢性:某個變量隨著時間進展或自變量變化,呈現(xiàn)一種比較緩慢而長期的持續(xù)上升杨凑、下降、停留的同性質(zhì)變動趨向,但變動幅度可能不相等伪嫁。

(2)周期性:某因素由于外部影響隨著自然季節(jié)的交替出現(xiàn)高峰與低谷的規(guī)律。

(3)隨機性:個別為隨機變動,整體呈統(tǒng)計規(guī)律。

(4)綜合性:實際變化情況是幾種變動的疊加或組合提鸟。預測時設法過濾除去不規(guī)則變動,突出反映趨勢性和周期性變動。

時間序列預測法的分類

時間序列預測法可用于短期預測中期預測長期預測盗棵。根據(jù)對資料分析方法的不同,又可分為:簡單序時平均數(shù)法加權序時平均數(shù)法移動平均法加權移動平均法趨勢預測法淌实、指數(shù)平滑法倘感、季節(jié)性趨勢預測法蜡豹、市場壽命周期預測法等娇唯。

簡單序時平均數(shù)法 也稱算術平均法佑淀。即把若干歷史時期的統(tǒng)計數(shù)值作為觀察值景图,求出算術平均數(shù)作為下期預測值。這種方法基于下列假設:“過去這樣笛粘,今后也將這樣”,把近期和遠期數(shù)據(jù)等同化和平均化垛膝,因此只能適用于事物變化不大的趨勢預測桶雀。如果事物呈現(xiàn)某種上升或下降的趨勢,就不宜采用此法。

加權序時平均數(shù)法 就是把各個時期的歷史數(shù)據(jù)按近期和遠期影響程度進行加權,求出平均值加匈,作為下期預測值啥寇。

簡單移動平均法 就是相繼移動計算若干時期的算術平均數(shù)作為下期預測值。

加權移動平均法 即將簡單移動平均數(shù)進行加權計算子檀。在確定權數(shù)時,近期觀察值的權數(shù)應該大些,遠期觀察值的權數(shù)應該小些主籍。

上述幾種方法雖然簡便千元,能迅速求出預測值幸海,但由于沒有考慮整個社會經(jīng)濟發(fā)展的新動向和其他因素的影響袜硫,所以準確性較差父款。應根據(jù)新的情況憨攒,對預測結果作必要的修正衙荐。

指數(shù)平滑法 即根據(jù)歷史資料的上期實際數(shù)和預測值仍劈,用指數(shù)加權的辦法進行預測降狠。此法實質(zhì)是由內(nèi)加權移動平均法演變而來的一種方法,優(yōu)點是只要有上期實際數(shù)和上期預測值铆铆,就可計算下期的預測值巧涧,這樣可以節(jié)省很多數(shù)據(jù)和處理數(shù)據(jù)的時間却紧,減少數(shù)據(jù)的存儲量肿男,方法簡便哟楷。是國外廣泛使用的一種短期預測方法崭别。

季節(jié)趨勢預測法 根據(jù)經(jīng)濟事物每年重復出現(xiàn)的周期性季節(jié)變動指數(shù),預測其季節(jié)性變動趨勢稠炬。推算季節(jié)性指數(shù)可采用不同的方法,常用的方法有季(月)別平均法和移動平均法兩種:a.季(月)別平均法。就是把各年度的數(shù)值分季(或月)加以平均扎附,除以各年季(或月)的總平均數(shù),得出各季(月)指數(shù)。這種方法可以用來分析生產(chǎn)改橘、銷售胖烛、原材料儲備身笤、預計資金周轉(zhuǎn)需要量等方面的經(jīng)濟事物的季節(jié)性變動考抄;b.移動平均法疯兼。即應用移動平均數(shù)計算比例求典型季節(jié)指數(shù)吧彪。

市場壽命周期預測法 就是對產(chǎn)品市場壽命周期的分析研究姨裸。例如對處于成長期的產(chǎn)品預測其銷售量,最常用的一種方法就是根據(jù)統(tǒng)計資料赡艰,按時間序列畫成曲線圖,再將曲線外延料身,即得到未來銷售發(fā)展趨勢惯驼。最簡單的外延方法是直線外延法隙畜,適用于對耐用消費品的預測。這種方法簡單言询、直觀运杭、易于掌握。

時間序列預測法

1.逐步自回歸(StepAR)模型:StepAR模型是有趨勢虱咧、季節(jié)因素數(shù)據(jù)的模型類。

2.Winters Method—Additive模型:它是將時勢和乘法季節(jié)因素相結合绘沉,考慮序列中有規(guī)律節(jié)波動。

3.ARlMA模型:它是處理帶有趨勢帖世、季節(jié)因平穩(wěn)隨機項數(shù)據(jù)的模型類[3]

4.Winters Method—Muhiplicative模型:該方將時同趨勢和乘法季節(jié)因素相結合哪轿,考慮序列規(guī)律的季節(jié)波動杨耙。時間趨勢模型可根據(jù)該序列律的季節(jié)波動對該趨勢進行修正。為了能捕捉到季節(jié)性车柠,趨勢模型包含每個季節(jié)的一個季節(jié)參季節(jié)因子采用乘法季節(jié)因子。隨機時間序列
x_t(t=1,2,\ldots,N,N=n)

整理匯總歷史上各類保險的數(shù)據(jù)得到逐月的數(shù)據(jù),Winters Method-Multiplicative模型表示為

xt = (a + bt)s(t) + εt  (1)

其中a和b為趨勢參數(shù)猿妈,s(t)為對應于時刻t的這個季節(jié)選擇的季節(jié)參數(shù)鳍刷,修正方程為输瓜。

a_t=\omega_1\frac{x_t}{s{t-1}(t)}+(1-\omega_1)(a_{t-1}+b_{t-1})

bt = ω2(at ? at ? 1) + (1 ? ω2)bt ? 1  (2)

其中:xt,at,bt,分別為序列在時刻t的實測值负芋、平滑值和平滑趨勢s{t-1}(t)選擇在季節(jié)因子被修正之前對應于時刻t的季節(jié)因子的過去值。

在該修正系統(tǒng)中蠕嫁,趨勢多項式在當前周期中總是被中心化锨天,以便在t以后的時間里預報值的趨勢多項式的截距參數(shù)總是修正后的截距參數(shù)at。向前τ個周期的預報值是剃毒。

xt + τ = (at + btτ)st(t + τ)(3)

當季節(jié)在數(shù)據(jù)中改變時季節(jié)參數(shù)被修正病袄,它使用季節(jié)實測值與預報值比率的平均值搂赋。

5.GARCH(ARCH)模型

帶自相關擾動的回歸模型為益缠。

xt = ξtβ + vt脑奠,

v_t=\epsilon_t-\varphi_1v_{t-1}-\ldots-\varphi_n v_{t-n}

,

εt = N(0,σ2)  (4)

其中:xt為因變量;ξt為回歸因子構成的列向量左刽;\beta為結構參數(shù)構成的列向量捺信;εt為均值是0、方差是一的獨立同分布正態(tài)隨機變量欠痴。

服從GARCH過程的序列xt迄靠,對于t時刻X的條件分布記為

xt | φt ? 1?N(0,ht)  (5)

其中\(zhòng)phi_{t-1}表示時間t-1前的所有可用信息,條件方差喇辽。

h_t=\tilde{\omega}+\sum_{i=1}^p a_ix^2_{t-i}+\sum_{j=1}\gamma_j h_{t-j}

(6)掌挚。

其中p≥0,q>0,
\tilde{\omega}\ge0

,
\gamma_j\ge0

當p=0時菩咨,GARCH(p,q)模型退化為ARCH(p)模型吠式,ARCH參數(shù)至少要有一個不為0。

GARCH回歸模型可寫成

x_t=\xi^\prime_t\beta+\epsilon_t,\epsilon_t=\sqrt{h_t e_t}

,

h_t=\tilde{\omega}+\sum_{a_i}\epsilon^2_{t-i}+\sum\gamma_j h_{t-j}

抽米,

et? N(0,1)  (7)

也可以考慮服從自回歸過程的擾動或帶有GARCH誤差的模型特占,即AR(n)-GARCH(p,q)。

x_t==\xi^\prime_t\beta+v_t

,

v_t=\epsilon_t-\varphi_1v_{t-1}-\ldots-\varphi_n v_{t-n}

,

\epsilon_t=\sqrt{h_t e_t}
h_t=\xi^\prime_t+\sum_{i=1}^q a_i\epsilon^2_{t-i}+\sum\gamma_j h_{t-j}

  (8)

其中三次平滑指數(shù)(HoltWinters)http://www.dataguru.cn/article-3235-1.html

該部分詳細介紹見

http://wiki.mbalib.com/wiki/%E6%97%B6%E9%97%B4%E5%BA%8F%E5%88%97%E9%A2%84%E6%B5%8B%E6%B3%95

Spark-TimeSerial

spark里面的庫是沒有時間序列算法的云茸,但是國外有人已經(jīng)寫好了相應的算法是目。其github網(wǎng)址是:https://github.com/sryza/spark-timeseries

spark-timeserial介紹:https://yq.aliyun.com/articles/70292

實例:http://blog.csdn.net/qq_30232405/article/details/70622400

實際應用。

數(shù)據(jù)格式(處理過后的):每5分鐘一個值标捺;

預測代碼:

/**
  * Created by ${lyp} on 2017/6/21.
  */


case class PV(time:String,key:String,ct: Double );

object RunModel {
  val starttime="2017-01-01 00:00:00"
  val endtime= "2017-05-31 23:55:00"
  val predictedN=288
  val outputTableName=""
  val modelName="holtwinters"
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  val hiveColumnName=List("time","key","ct")

  def main(args: Array[String]): Unit = {


    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf= new SparkConf().setAppName("timeserial").setMaster("local")
    val sc= new SparkContext(conf)
    val sqlContext=new SQLContext(sc)

    import sqlContext.implicits._

    //create dataframe
    val trainData=getData(sc,sqlContext,"src/main/resource/data.csv")
    //val vertifyData=getData(sc,sqlContext,"src/main/resource/data2.csv")
    trainData.show()
    //vertifyData.show()


    //create DateTimeIndex
    val zone = ZoneId.systemDefault()
    var dtIndex:UniformDateTimeIndex=DateTimeIndex.uniformFromInterval(
      ZonedDateTime.of(2017, 1, 1, 0, 0, 0, 0, zone),
      ZonedDateTime.of(2017, 5, 31, 23, 55, 0, 0, zone),
      new MinuteFrequency(5)
    )

    //create TimeSeriesRDD
    val trainTsrdd=TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex,trainData,hiveColumnName(0),hiveColumnName(1), hiveColumnName(2))
    trainTsrdd.foreach(println(_))

    //cache
    trainTsrdd.cache()

    //add absent value "linear", "nearest", "next", or "previous"
    val filledTrainTsrdd=trainTsrdd.fill("linear")

    //create model
    val timeSeriesModel= new TimeSeriesModel(predictedN)

    //train model
    val forcast=modelName match {
      case "arima"=>{
        println("begin train")
        val (forecast,coefficients)=timeSeriesModel.arimaModelTrain(filledTrainTsrdd)
        forecast

      }
      case "holtwinters"=>{
        //季節(jié)性參數(shù)(12或者4)
        val period=288*30
        //holtWinters選擇模型:additive(加法模型)懊纳、Multiplicative(乘法模型)
        val holtWintersModelType="Multiplicative"
        val (forecast,sse)=timeSeriesModel.holtWintersModelTrain(filledTrainTsrdd,period,holtWintersModelType)
        forecast
      }
      case _=>throw new UnsupportedOperationException("Currently only supports 'ariam' and 'holtwinters")
    }

    val time=timeSeriesModel.productStartDatePredictDate(predictedN,endtime,endtime)

    forcast.map{
      row=>
        val key=row._1
        val values=row._2.toArray.mkString(",")
        (key,values)
    }.flatMap(row=>row._2.split(","))saveAsTextFile("src/main/resource/30Multiplicative")
  }


  def getTrainData(sqlContext:SQLContext):DataFrame={
    val data=sqlContext.sql(
      s"""
         |select time, 'key' as  key, ct from tmp_music.tmp_lyp_nginx_result_ct2 where time between ${starttime} and ${endtime}
       """.stripMargin)
    data
  }

  def getData(sparkContext: SparkContext,sqlContext:SQLContext,path:String):DataFrame={
    val data=sparkContext.textFile(path).map(line=>line.split(",")).map{
      line=>
        val time =sdf.parse(line(0))
        val timestamp= new Timestamp(time.getTime)
        Row(timestamp,line(1),line(2).toDouble)
    }

    val field=Seq(
      StructField(hiveColumnName(0), TimestampType, true),
      StructField(hiveColumnName(1), StringType, true),
      StructField(hiveColumnName(2), DoubleType, true)
    )
    val schema=StructType(field)
    val zonedDateDataDf=sqlContext.createDataFrame(data,schema)
    zonedDateDataDf
  }
}
/**
  * 時間序列模型
  * Created by Administrator on 2017/4/19.
  */
class TimeSeriesModel extends Serializable{

  //預測后面N個值
  private var predictedN=1
  //存放的表名字
  private var outputTableName="timeseries_output"

  def this(predictedN:Int){
    this()
    this.predictedN=predictedN
  }

  case class Coefficient(coefficients: String,p: String,d: String,q:String,logLikelihoodCSS:String,arc:String);
  /**
    * Arima模型:
    * 輸出其p,d亡容,q參數(shù)
    * 輸出其預測的predictedN個值
    * @param trainTsrdd
    */
  def arimaModelTrain(trainTsrdd:TimeSeriesRDD[String]): (RDD[(String,Vector)],RDD[(String,Coefficient)])={
    val predictedN=this.predictedN

    //train model
    val arimaAndVectorRdd=trainTsrdd.map{line=>
      line match {
        case (key,denseVector)=>
          (key,ARIMA.autoFit(denseVector),denseVector)
      }
    }

    /**參數(shù)輸出:p,d,q的實際值和其系數(shù)值嗤疯、最大似然估計值、aic值**/
    val coefficients=arimaAndVectorRdd.map{line=>
      line match{
        case (key,arimaModel,denseVector)=>{
          val coefficients=arimaModel.coefficients.mkString(",")
          val p=arimaModel.p.toString
          val d=arimaModel.d.toString
          val q=arimaModel.q.toString
          val logLikelihoodCSS=arimaModel.logLikelihoodCSS(denseVector).toString
          val arc=arimaModel.approxAIC(denseVector).toString
          (key,Coefficient(coefficients,p,d,q,logLikelihoodCSS,arc))
        }
      }
    }

    //print coefficients
    coefficients.collect().map{f=>
      val key=f._1
      val coefficients=f._2
      println(key+" coefficients:"+coefficients.coefficients+"=>"+"(p="+coefficients.p+",d="+coefficients.d+",q="+coefficients.q+")"
        +"logLikelihoodCSS:"+coefficients.logLikelihoodCSS+" arc:"+coefficients.arc)
    }

    //predict
    val forecast= arimaAndVectorRdd.map{
      row=>
        val key=row._1
        val model=row._2
        val denseVector=row._3
        (key,model.forecast(denseVector,predictedN))
    }

    //print predict
    val forecastValue=forecast.map{
      _ match{
        case (key,value)=>{
          val partArray=value.toArray.mkString(",").split(",")
          var forecastArrayBuffer=new ArrayBuffer[Double]()
          var i=partArray.length-predictedN
          while(i<partArray.length){
            forecastArrayBuffer+=partArray(i).toDouble
            i=i+1
          }
          (key,Vectors.dense(forecastArrayBuffer.toArray))
        }
      }
    }
    println("Arima forecast of next "+predictedN+" observations:")
    forecastValue.foreach(println)

    //return forecastValue & coefficients
    (forecastValue,coefficients)
  }



  /**
    *實現(xiàn)HoltWinters模型
    * @param trainTsrdd
    */
  def holtWintersModelTrain(trainTsrdd:TimeSeriesRDD[String],period:Int,holtWintersModelType:String): (RDD[(String,Vector)],RDD[(String,Double)]) ={

    //set parms
    val predictedN=this.predictedN

    //create model
    val holtWintersAndVectorRdd=trainTsrdd.map{
      row=>
        val key=row._1
        val denseVector=row._2
        //ts: Vector, period: Int, modelType: String = "additive", method: String = "BOBYQA"
        val model=HoltWinters.fitModel(denseVector,period,holtWintersModelType)
        (key,model,denseVector)
    }

    //create dist vector
    val predictedArrayBuffer=new ArrayBuffer[Double]()
    var i=0
    while(i<predictedN){
      predictedArrayBuffer+=i
      i=i+1
    }
    val predictedVectors=Vectors.dense(predictedArrayBuffer.toArray)

    //predict
    val forecast=holtWintersAndVectorRdd.map{
      row=>
        val key=row._1
        val model=row._2
        val denseVector=row._3
        val forcaset=model.forecast(denseVector,predictedVectors)
        (key,forcaset)
    }

    //print predict
    println("HoltWinters forecast of next "+predictedN+" observations:")
    forecast.foreach(println)

    //sse- to get sum of squared errors
    val sse=holtWintersAndVectorRdd.map{
      row=>
        val key=row._1
        val model=row._2
        val vector=row._3
        (key,model.sse(vector))
    }
    return (forecast,sse)
  }



  /**
    * 批量生成日期(具體到分鐘秒)闺兢,用來保存
    * 格式為:yyyy-MM-dd HH:mm:ss
    * @param predictedN
    * @param startTime
    * @param endTime
    */
  def productStartDatePredictDate(predictedN:Int,startTime:String,endTime:String): ArrayBuffer[String] ={
    //形成開始start到預測predicted的日期
    var dateArrayBuffer=new ArrayBuffer[String]()
    val dateFormat= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    val cal1 = Calendar.getInstance()
    val cal2 = Calendar.getInstance()
    val st=dateFormat.parse(startTime)
    val et=dateFormat.parse(endTime)
    //設置訓練數(shù)據(jù)中開始和結束日期
    cal1.set(st.getYear,st.getMonth,st.getDay,st.getHours,st.getMinutes,st.getSeconds)
    cal2.set(et.getYear,et.getMonth,et.getDay,et.getHours,et.getMinutes,et.getSeconds)
    //間隔差
    val minuteDiff=(cal2.getTimeInMillis-cal1.getTimeInMillis)/ (1000 * 60 * 5)+predictedN

    var iMinuteDiff = 0;
    while(iMinuteDiff<=minuteDiff){
      cal1.add(Calendar.MINUTE,5)
      //保存日期
      dateArrayBuffer+=dateFormat.format(cal1.getTime)
      iMinuteDiff=iMinuteDiff+1;

    }
    dateArrayBuffer
  }
}

Holtwinters實現(xiàn)

/**
 * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
 *
 * Cloudera, Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"). You may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
 * the specific language governing permissions and limitations under the
 * License.
 */

package com.cloudera.sparkts.models

import org.apache.commons.math3.analysis.MultivariateFunction
import org.apache.spark.mllib.linalg._
import org.apache.commons.math3.optim.MaxIter
import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction
import org.apache.commons.math3.optim.MaxEval
import org.apache.commons.math3.optim.SimpleBounds
import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer
import org.apache.commons.math3.optim.InitialGuess
import org.apache.commons.math3.optim.nonlinear.scalar.GoalType

/**
 * Triple exponential smoothing takes into account seasonal changes as well as trends.
 * Seasonality is de?ned to be the tendency of time-series data to exhibit behavior that repeats
 * itself every L periods, much like any harmonic function.
 *
 * The Holt-Winters method is a popular and effective approach to forecasting seasonal time series
 *
 * See https://en.wikipedia.org/wiki/Exponential_smoothing#Triple_exponential_smoothing
 * for more information on Triple Exponential Smoothing
 * See https://www.otexts.org/fpp/7/5 and
 * https://stat.ethz.ch/R-manual/R-devel/library/stats/html/HoltWinters.html
 * for more information on Holt Winter Method.
 */
object HoltWinters {

  /**
   * Fit HoltWinter model to a given time series. Holt Winter Model has three parameters
   * level, trend and season component of time series.
   * We use BOBYQA optimizer which is used to calculate minimum of a function with
   * bounded constraints and without using derivatives.
   * See http://www.damtp.cam.ac.uk/user/na/NA_papers/NA2009_06.pdf for more details.
   *
   * @param ts Time Series for which we want to fit HoltWinter Model
   * @param period Seasonality of data i.e  period of time before behavior begins to repeat itself
   * @param modelType Two variations differ in the nature of the seasonal component.
   *    Additive method is preferred when seasonal variations are roughly constant through the series,
   *    Multiplicative method is preferred when the seasonal variations are changing
   *    proportional to the level of the series.
   * @param method: Currently only BOBYQA is supported.
   */
  def fitModel(ts: Vector, period: Int, modelType: String = "additive", method: String = "BOBYQA")
  : HoltWintersModel = {
    method match {
      case "BOBYQA" => fitModelWithBOBYQA(ts, period, modelType)
      case _ => throw new UnsupportedOperationException("Currently only supports 'BOBYQA'")
    }
  }

  def fitModelWithBOBYQA(ts: Vector, period: Int, modelType:String): HoltWintersModel = {
    val optimizer = new BOBYQAOptimizer(7)
    val objectiveFunction = new ObjectiveFunction(new MultivariateFunction() {
      def value(params: Array[Double]): Double = {
        new HoltWintersModel(modelType, period, params(0), params(1), params(2)).sse(ts)
      }
    })

    // The starting guesses in R's stats:HoltWinters
    val initGuess = new InitialGuess(Array(0.3, 0.1, 0.1))
    val maxIter = new MaxIter(30000)
    val maxEval = new MaxEval(30000)
    val goal = GoalType.MINIMIZE
    val bounds = new SimpleBounds(Array(0.0, 0.0, 0.0), Array(1.0, 1.0, 1.0))
    val optimal = optimizer.optimize(objectiveFunction, goal, bounds,initGuess, maxIter, maxEval)
    val params = optimal.getPoint
    new HoltWintersModel(modelType, period, params(0), params(1), params(2))
  }
}

class HoltWintersModel(
    val modelType: String,
    val period: Int,
    val alpha: Double,
    val beta: Double,
    val gamma: Double) extends TimeSeriesModel {

  if (!modelType.equalsIgnoreCase("additive") && !modelType.equalsIgnoreCase("multiplicative")) {
    throw new IllegalArgumentException("Invalid model type: " + modelType)
  }
  val additive = modelType.equalsIgnoreCase("additive")

  /**
   * Calculates sum of squared errors, used to estimate the alpha and beta parameters
   *
   * @param ts A time series for which we want to calculate the SSE, given the current parameters
   * @return SSE
   */
  def sse(ts: Vector): Double = {
    val n = ts.size
    val smoothed = new DenseVector(Array.fill(n)(0.0))
    addTimeDependentEffects(ts, smoothed)

    var error = 0.0
    var sqrErrors = 0.0

    // We predict only from period by using the first period - 1 elements.
    for(i <- period to (n - 1)) {
      error = ts(i) - smoothed(i)
      sqrErrors += error * error
    }

    sqrErrors
  }

  /**
   * {@inheritDoc}
   */
  override def removeTimeDependentEffects(ts: Vector, dest: Vector = null): Vector = {
    throw new UnsupportedOperationException("not yet implemented")
  }

  /**
   * {@inheritDoc}
   */
  override def addTimeDependentEffects(ts: Vector, dest: Vector): Vector = {
    val destArr = dest.toArray
    val fitted = getHoltWintersComponents(ts)._1
    for (i <- 0 to (dest.size - 1)) {
      destArr(i) = fitted(i)
    }
    dest
  }

  /**
   * Final prediction Value is sum of level trend and season
   * But in R's stats:HoltWinters additional weight is given for trend
   *
   * @param ts
   * @param dest
   */
  def forecast(ts: Vector, dest: Vector): Vector = {
    val destArr = dest.toArray
    val (_, level, trend, season) = getHoltWintersComponents(ts)
    val n = ts.size

    val finalLevel = level(n - period)
    val finalTrend = trend(n - period)
    val finalSeason = new Array[Double](period)

    for (i <- 0 until period) {
      finalSeason(i) = season(i + n - period)
    }

    for (i <- 0 until dest.size) {
      destArr(i) = if (additive) {
        (finalLevel + (i + 1) * finalTrend) + finalSeason(i % period)
      } else {
        (finalLevel + (i + 1) * finalTrend) * finalSeason(i % period)
      }
    }
    dest
  }

  /**
   * Start from the intial parameters and then iterate to find the final parameters
   * using the equations of HoltWinter Method.
   * See https://www.otexts.org/fpp/7/5 and
   * https://stat.ethz.ch/R-manual/R-devel/library/stats/html/HoltWinters.html
   * for more information on Holt Winter Method equations.
   *
   * @param ts A time series for which we want the HoltWinter parameters level,trend and season.
   * @return (level trend season). Final vectors of level trend and season are returned.
   */
  def getHoltWintersComponents(ts: Vector): (Vector, Vector, Vector, Vector) = {
    val n = ts.size
    require(n >= 2, "Requires length of at least 2")

    val dest = new Array[Double](n)

    val level = new Array[Double](n)
    val trend = new Array[Double](n)
    val season = new Array[Double](n)

    val (initLevel, initTrend, initSeason) = initHoltWinters(ts)
    level(0) = initLevel
    trend(0) = initTrend
    for (i <- 0 until initSeason.size){
      season(i) = initSeason(i)
    }

    for (i <- 0 to (n - period - 1)) {
      dest(i + period) = level(i) + trend(i)

      // Add the seasonal factor for additive and multiply for multiplicative model.
      if (additive) {
        dest(i + period) += season(i)
      } else {
        dest(i + period) *= season(i)
      }

      val levelWeight = if (additive) {
        ts(i + period) - season(i)
      } else {
        ts(i + period) / season(i)
      }

      level(i + 1) = alpha * levelWeight + (1 - alpha) * (level(i) + trend(i))

      trend(i + 1) = beta * (level(i + 1) - level(i)) + (1 - beta) * trend(i)

      val seasonWeight = if (additive) {
        ts(i + period) - level(i + 1)
      } else {
        ts(i + period) / level(i + 1)
      }
      season(i + period) = gamma * seasonWeight + (1 - gamma) * season(i)
    }

    (Vectors.dense(dest), Vectors.dense(level), Vectors.dense(trend), Vectors.dense(season))
  }

  def getKernel(): (Array[Double]) = {
    if (period % 2 == 0){
      val kernel = Array.fill(period + 1)(1.0 / period)
      kernel(0) = 0.5 / period
      kernel(period) = 0.5 / period
      kernel
    } else {
      Array.fill(period)(1.0 / period)
    }
  }

  /**
   * Function to calculate the Weighted moving average/convolution using above kernel/weights
   * for input data.
   * See http://robjhyndman.com/papers/movingaverage.pdf for more information
   * @param inData Series on which you want to do moving average
   * @param kernel Weight vector for weighted moving average
   */
  def convolve(inData: Array[Double], kernel: Array[Double]): (Array[Double]) = {
    val kernelSize = kernel.size
    val dataSize = inData.size

    val outData = new Array[Double](dataSize - kernelSize + 1)

    var end = 0
    while (end <= (dataSize - kernelSize)) {
      var sum = 0.0
      for (i <- 0 until kernelSize) {
        sum += kernel(i) * inData(end + i)
      }

      outData(end) = sum
      end += 1
    }

    outData
  }

  /**
   * Function to get the initial level, trend and season using method suggested in
   * http://robjhyndman.com/hyndsight/hw-initialization/
   * @param ts
   */
  def initHoltWinters(ts: Vector): (Double, Double, Array[Double]) = {
    val arrTs = ts.toArray

    // Decompose a window of time series into level trend and seasonal using convolution
    val kernel = getKernel()
    val kernelSize = kernel.size
    val trend = convolve(arrTs.take(period * 2), kernel)

    // Remove the trend from time series. Subtract for additive and divide for multiplicative
    val n = (kernelSize -1) / 2
    val removeTrend = arrTs.take(period * 2).zip(
      Array.fill(n)(0.0) ++ trend ++ Array.fill(n)(0.0)).map{
      case (a, t) =>
        if (t != 0){
          if (additive) {
            (a - t)
          } else {
            (a / t)
          }
        }  else{
          0
        }
    }

    // seasonal mean is sum of mean of all season values of that period
    val seasonalMean = removeTrend.splitAt(period).zipped.map { case (prevx, x) =>
      if (prevx == 0 || x == 0) (x + prevx) else (x + prevx) / 2
    }

    val meanOfFigures = seasonalMean.sum / period

    // The seasonal mean is then centered and removed to get season.
    // Subtract for additive and divide for multiplicative.
    val initSeason = if (additive) {
      seasonalMean.map(_ - meanOfFigures )
    } else {
      seasonalMean.map(_ / meanOfFigures )
    }

    // Do Simple Linear Regression to find the initial level and trend
    val indices = 1 to trend.size
    val xbar = (indices.sum: Double) / indices.size
    val ybar = trend.sum / trend.size

    val xxbar = indices.map( x => (x - xbar) * (x - xbar) ).sum
    val xybar = indices.zip(trend).map {
      case (x, y) => (x - xbar) * (y - ybar)
    }.sum

    val initTrend = xybar / xxbar
    val initLevel = ybar - (initTrend * xbar)

    (initLevel, initTrend, initSeason)
  }
}

預測結果:

折線圖:

actural:6/1當天的實際值

Hotwind7*MUTI :預測值(周期為一周茂缚,乘法)

Hotwind30*MUTI :預測值(周期為一個月,乘法)

其余:預測值與實際值的差值

結論:

按照5分鐘為時間間隔列敲。最小的周期性為天阱佛。即period=288。這個周期誤差較其余兩者稍微大一些戴而。選擇一周288 * 7 或者一個月288 * 30 凑术,效果如圖。還可以所意。但其實這個值怎么選淮逊。有點存疑催首,即使根據(jù)sum of squared errors來看。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末泄鹏,一起剝皮案震驚了整個濱河市郎任,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌备籽,老刑警劉巖舶治,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異车猬,居然都是意外死亡霉猛,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進店門珠闰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來惜浅,“玉大人,你說我怎么就攤上這事伏嗜√诚ぃ” “怎么了?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵承绸,是天一觀的道長裸影。 經(jīng)常有香客問我,道長军熏,這世上最難降的妖魔是什么空民? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮羞迷,結果婚禮上,老公的妹妹穿的比我還像新娘画饥。我一直安慰自己衔瓮,他們只是感情好,可當我...
    茶點故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布抖甘。 她就那樣靜靜地躺著热鞍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪衔彻。 梳的紋絲不亂的頭發(fā)上薇宠,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天,我揣著相機與錄音艰额,去河邊找鬼澄港。 笑死,一個胖子當著我的面吹牛柄沮,可吹牛的內(nèi)容都是我干的回梧。 我是一名探鬼主播废岂,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼狱意!你這毒婦竟也來了湖苞?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤详囤,失蹤者是張志新(化名)和其女友劉穎财骨,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體藏姐,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡隆箩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了包各。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片摘仅。...
    茶點故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖问畅,靈堂內(nèi)的尸體忽然破棺而出娃属,到底是詐尸還是另有隱情,我是刑警寧澤护姆,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布矾端,位于F島的核電站,受9級特大地震影響卵皂,放射性物質(zhì)發(fā)生泄漏秩铆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一灯变、第九天 我趴在偏房一處隱蔽的房頂上張望殴玛。 院中可真熱鬧,春花似錦添祸、人聲如沸滚粟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凡壤。三九已至,卻和暖如春耙替,著一層夾襖步出監(jiān)牢的瞬間亚侠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工俗扇, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留硝烂,地道東北人。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓铜幽,卻偏偏與公主長得像钢坦,于是被迫代替她去往敵國和親究孕。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 本系列第三篇爹凹,承接前面的《淺談機器學習基礎》和《淺談深度學習基礎》厨诸。 自然語言處理緒論 什么是自然語言處理? 自然...
    我偏笑_NSNirvana閱讀 17,545評論 2 68
  • 常用的預測方法(如回歸分析)禾酱,需要較大的樣本微酬,如果樣本較小,常造成較大的誤差颤陶,使預測目標失效颗管。灰度預測模型(Gra...
    風逝流沙閱讀 13,336評論 2 3
  • 今天想要來寫上一篇和平日里的文章風格差距較大的文章滓走,可能會讓看到這篇文章的老朋友感到有些不適應垦江,只是,我認為還是因...
    一方筮白閱讀 579評論 0 4
  • 問題:android 掃描SDcard目錄搅方,F(xiàn)ile.listfiles()總是返回null 解決方案:修改tar...
    樹綠29閱讀 1,473評論 0 0
  • 第五期好報30天寫作群還有兩天就要解散了比吭,群里很多的朋友在聊自己通過每天500字寫作有所改變,或是堅持寫作得到...
    薇薇薛閱讀 597評論 1 2