【機(jī)器學(xué)習(xí)實(shí)踐】隱馬爾可夫模型(一)

隱馬爾可夫模型

隱馬爾可夫鏈模型 Hidden Markov Model

首先應(yīng)當(dāng)了解馬爾科夫鏈模型,隱馬爾可夫鏈模型即可以解決馬爾可夫鏈模型狀態(tài)量非常多的情況下(如 連續(xù)值)轉(zhuǎn)移矩陣過大的問題众羡,并由它轉(zhuǎn)化而來留攒。解決原理是假定當(dāng)前狀態(tài)(稱為顯狀態(tài))僅由隱狀態(tài)決定仍稀,由于隱狀態(tài)可以比當(dāng)前狀態(tài)少很多,因此轉(zhuǎn)移矩陣參數(shù)更少鳄乏。
隱馬爾可夫模型的分類:離散、連續(xù)。

隱馬爾可夫模型的典型功能

  1. 根據(jù)一個(gè)或多個(gè)已知的可見層和隱藏層訓(xùn)練模型饭弓,獲得釋放矩陣和狀態(tài)轉(zhuǎn)移矩陣,得到模型參數(shù)媒抠。
  2. 使用前向算法弟断、后向算法或者暴力算法,根據(jù)已知模型的參數(shù)趴生,求出某一已知可見層序列的發(fā)生概率阀趴。
  3. 使用維特比(Viterbi)算法根據(jù)一個(gè)可見層序列推導(dǎo)出最有可能的隱藏層序列。

python實(shí)現(xiàn)

此處利用python對(duì)離散的隱馬爾可夫模型進(jìn)行建模苍匆,滿足其訓(xùn)練刘急、計(jì)算和推導(dǎo)的功能。
由于篇幅限制浸踩,本篇僅實(shí)現(xiàn)其前向算法叔汁、后向算法、遍歷(暴力)算法的實(shí)現(xiàn)检碗,和相關(guān)準(zhǔn)備工作

下面是hmm類本體get_prob實(shí)現(xiàn)典型功能2的功能据块,而fit在本章中使用具有隱藏層和可見層的數(shù)據(jù)進(jìn)行計(jì)算,而Baum-Welch算法和Viterbi算法在后續(xù)篇幅進(jìn)行介紹折剃,本類對(duì)相關(guān)算法部分留下了篇幅

#hmm.py

from operator import truediv
import numpy as np

class discrete_hidden_Markov_model:
    def __init__(self, debug:bool=False):
        #typical 5 model params of hmm(lambda(Pi, A, B))
        self.vis_kind = None
        self.invis_kind = None
        self.init_state_mat = None
        self.trans_state_mat = None
        self.emitter_mat = None
        #configurations
        self.debug = debug

    #from a visible chain, or a visible chain with an invisible chain, infer the params of the hmm model
    def fit(self, vis_data:np.ndarray, invis_data:np.ndarray=None)->None:

        #if invis_data is None, then use baum-welch algrithm, if invis_data is given, then use ordinary probability calculation
        if invis_data is None:#use bw automatically
            pass 
        else:
            if np.shape(vis_data) != np.shape(invis_data):
                print("[-] length of two chains are not equal");return
            else:
                data_len = np.shape(vis_data)[1]
                data_num = np.shape(vis_data)[0]
            self.vis_kind = vis_data.max() + 1
            self.invis_kind = invis_data.max() + 1
            # to fill the initial state matrix
            self.init_state_mat = np.zeros((self.invis_kind,), dtype=np.float64)
            for i in range(data_num):
                self.init_state_mat[invis_data[i, 0]] += 1#only the head of one chain
            self.init_state_mat /= np.linalg.norm(self.init_state_mat, ord=1)
            if self.debug : print(self.init_state_mat)
            # to fill the state transfer matrix
            self.trans_state_mat = np.zeros((self.invis_kind, self.invis_kind), dtype=np.float64)
            for k in range(data_num):
                for i in range(data_len - 1):
                    self.trans_state_mat[invis_data[k, i], invis_data[k, i + 1]] += 1
            for i in range(self.invis_kind):
                self.trans_state_mat[i] /= np.linalg.norm(self.trans_state_mat[i], ord=1)
            if self.debug:print(self.trans_state_mat)
            # to fill the emitter matrix
            self.emitter_mat = np.zeros((self.invis_kind, self.vis_kind), dtype=np.float64)
            for k in range(data_num):
                for i in range(data_len):
                    self.emitter_mat[invis_data[k, i],vis_data[k, i]] += 1
            for i in range(self.invis_kind):
                self.emitter_mat[i] /= np.linalg.norm(self.emitter_mat[i], ord=1)
            if self.debug:print(self.emitter_mat)

    #get the specific probability from a known visible chain, with the params of the model known
    def get_prob(self, vis_data:np.ndarray, algorithm:str="forward")->float:
        '''algorithm : 'traversal' / 'forward' / 'backward' 
        '''
        vis_data = vis_data.ravel()
        if self.emitter_mat is None or self.trans_state_mat is None:
            print("[-] params have not been infered yet"); return
        probability = 0.
        if algorithm == 'forward':
            data_infer_len = vis_data.shape[0]
            prob_data_arr = np.zeros((self.invis_kind, data_infer_len), dtype=np.float64)
            #first col of prob
            for j in range(self.invis_kind):
                prob_data_arr[j, 0] = self.init_state_mat[j] * self.emitter_mat[j, vis_data[0]]
            #total time complexity : O(T*N^2)
            for i in range(1, data_infer_len):
                #calculate this matrix of probability forward
                #time complexity : O(N^2)
                for j in range(self.invis_kind):# indicator for forward invis nodes
                    prob_back_node = 0.
                    for k in range(self.invis_kind):# indicator for last(back) invis nodes
                        prob_back_node += prob_data_arr[k, i-1] * self.trans_state_mat[k, j] * self.emitter_mat[j, vis_data[i]]
                    prob_data_arr[j, i] = prob_back_node
            if self.debug:print(prob_data_arr)
            probability = prob_data_arr[:, -1].sum()
            if self.debug:print(probability)
        elif algorithm == 'backward':
            data_infer_len = vis_data.shape[0]
            prob_data_arr = np.zeros((self.invis_kind, data_infer_len+1), dtype=np.float64)
            #last col of the prob
            for j in range(self.vis_kind):
                prob_data_arr[j, -1] = 1
            #total time complexity : O(T*N^2)
            for i in range(data_infer_len - 1, -1, -1):
                #calculate this matrix of probability backward
                #time complexity : O(N^2)
                for j in range(self.invis_kind):#indicator for backward invis nodes
                    prob_front_node = 0.
                    for k in range(self.invis_kind):#indicator for last(front) invis nodes
                        prob_front_node += prob_data_arr[k, i+1] * self.trans_state_mat[j, k] * self.emitter_mat[j, vis_data[i]];
                    prob_data_arr[j, i] = prob_front_node
            for j in range (self.vis_kind):
                probability += prob_data_arr[j, 0] * self.init_state_mat[j]
            if self.debug:print(prob_data_arr)
            
        elif algorithm == 'traversal':
            invis_data = np.zeros(vis_data.shape, dtype=np.int)
            #generate all possible invisible sequences, with traversing method
            data_infer_len = vis_data.shape[0]
            for i in range(self.invis_kind**data_infer_len-1):
                probability_this_chain = 1.
                #calculate the probability of this very sequence and add it to total probability
                for k in range(data_infer_len):
                    # P(vis_data[k]|invis_data[k])
                    # not bayes law type, reason invis -> result vis : regard invis as reason
                    probability_this_chain *= self.emitter_mat[invis_data[k], vis_data[k]]
                for k in range(0, data_infer_len - 1):
                    # P(invis_data[k]->invis_data[k+1])
                    probability_this_chain *= self.trans_state_mat[invis_data[k], invis_data[k+1]]
                probability_this_chain *= self.init_state_mat[invis_data[0]]
                # if self.debug : print(probability_this_chain)
                probability += probability_this_chain
                #update the chain
                invis_data[0] += 1
                for j in range(data_infer_len - 1):
                    if invis_data[j] == self.invis_kind:
                        invis_data[j] = 0
                        invis_data[j+1] += 1
            if self.debug : print(probability)
        else:
            print("[-] should choose the right algorithm");return
        return probability

    def get_hidden(self, vis_data:np.ndarray, algorithm:str="Viterbi")->np.ndarray:
        '''algorithm : 'Viterbi"
        '''
        if algorithm == "Viterbi":
            pass
        else:
            print("[-] should choose the right algorithm");return

類似的數(shù)據(jù)集很少瑰钮,因此本人自制了一個(gè)生成隱馬爾可夫鏈的程序。還可以根據(jù)生成時(shí)的原始參數(shù)檢驗(yàn)各種算法的正確性微驶。

#data_gen.py

#this file is used to generate original data for hmm training
import numpy as np
#suppose 3 invis state, 3 vis state

chain_len = 100
chain_num = 500

#initial invisible state probability
#   prob    0    1    2
#           p    p    p
init_state_prob = np.array([0.3, 0.2, 0.5])
#here we assume this to be not equal

#invisible state transfer matrix
#    past\cur    0    1    2
#     0          p    p    p
#     1          p    p    p
#     2          p    p    p

trans_state_mat = np.array([
    [0.2, 0.4, 0.4],
    [0.4, 0.4, 0.2],
    [0.3, 0.3, 0.4]
])

#invisible state -> visible state matrix
#    invis\vis    0    1    2
#      0          p    p    p
#      1          p    p    p
#      2          p    p    p

emitter_mat = np.array([
    [0.3, 0.4, 0.3],
    [0.5, 0.3, 0.2],
    [0.2, 0.5, 0.3]
])

def get_chain()->tuple([np.ndarray, np.ndarray]):
    vis_chains = np.ndarray((chain_num, chain_len), dtype=np.int)
    invis_chains = np.ndarray((chain_num, chain_len), dtype=np.int)
    for i in range(chain_num):
        
        #generate invisible chain
        start_code = np.random.random()
        invis_chain = list()
        if start_code < init_state_prob[0]:
            start_invis_state = np.array([1., 0., 0.])
            invis_chain.append(0)
        elif start_code < init_state_prob[1] + init_state_prob[0]:
            start_invis_state = np.array([0., 1., 0.])
            invis_chain.append(1)
        else:
            start_invis_state = np.array([0., 0., 1.])
            invis_chain.append(2)
        invis_state = start_invis_state
        while len(invis_chain) < chain_len:
            next_state_prob = (invis_state @ trans_state_mat)
            next_code = np.random.random()
            if next_code < next_state_prob[0]:
                invis_state = np.array([1., 0., 0.])
                invis_chain.append(0)
            elif next_code < next_state_prob[0] + next_state_prob[1]:
                invis_state = np.array([0., 1., 0.])
                invis_chain.append(1)
            else:
                invis_state = np.array([0., 0., 1.])
                invis_chain.append(2)

        #generate visible chain
        vis_chain = list()
        for invis_state_code in invis_chain:
            if invis_state_code == 0:
                invis_state = np.array([1., 0., 0.])
            elif invis_state_code == 1:
                invis_state = np.array([0., 1., 0.])
            else:
                invis_state = np.array([0., 0., 1.])
            vis_state_prob = invis_state @ emitter_mat
            vis_code = np.random.random()
            if vis_code < vis_state_prob[0]:
                vis_chain.append(0)
            elif vis_code < vis_state_prob[0] + vis_state_prob[1]:
                vis_chain.append(1)
            else:
                vis_chain.append(2)

        vis_chains[i, :] = vis_chain
        invis_chains[i, :] = invis_chain
    return vis_chains, invis_chains

編寫代碼產(chǎn)生對(duì)象進(jìn)行測(cè)試

#main.py

import numpy as np
from data_gen import get_chain
import data_gen

from hmm import discrete_hidden_Markov_model

#initialize
yyz_discrete_HMM_machine = discrete_hidden_Markov_model(debug=False)

# strict test
# yyz_discrete_HMM_machine.emitter_mat = data_gen.emitter_mat
# yyz_discrete_HMM_machine.trans_state_mat = data_gen.trans_state_mat
# yyz_discrete_HMM_machine.init_state_mat = data_gen.init_state_prob
# yyz_discrete_HMM_machine.invis_kind = 3
# yyz_discrete_HMM_machine.vis_kind = 3

#fit test
vis, invis = get_chain()
yyz_discrete_HMM_machine.fit(vis, invis)

#print params
print(yyz_discrete_HMM_machine.emitter_mat, yyz_discrete_HMM_machine.trans_state_mat, yyz_discrete_HMM_machine.init_state_mat)


#normalization test : total probability is 1.
data_infer_len = 6
vis_data = np.zeros((data_infer_len,), dtype=np.int)
# generate all possible invisible sequences, with traversing method
prob_for = 0.
prob_tra = 0.
prob_bak = 0.
for i in range(3**data_infer_len):
    prob_tra += yyz_discrete_HMM_machine.get_prob(vis_data, algorithm='traversal')
    prob_bak += yyz_discrete_HMM_machine.get_prob(vis_data, algorithm='backward')
    prob_for += yyz_discrete_HMM_machine.get_prob(vis_data, algorithm='forward')
    vis_data[0] += 1
    for j in range(data_infer_len - 1):
        if vis_data[j] == 3:
            vis_data[j] = 0
            vis_data[j+1] += 1

print(prob_for, prob_tra, prob_bak)

#result
擬合結(jié)果

發(fā)射矩陣
[[0.30223322 0.3989192  0.29884758]
 [0.50214004 0.29603819 0.20182177]
 [0.20046293 0.49771578 0.30182128]]

狀態(tài)轉(zhuǎn)移矩陣
[[0.20398501 0.4028408  0.3931742 ]
 [0.39799201 0.39865764 0.20335034]
 [0.30328927 0.29505072 0.40166001]]

初始概率
[0.298 0.224 0.478]
歸一化結(jié)果
1.0 0.9950028676787088 0.9999999999999998

得到的概率矩陣和生成代碼時(shí)在誤差允許范圍內(nèi)相等浪谴,說明本初級(jí)統(tǒng)計(jì)型擬合程序有效开睡;三種算法都具有歸一化的特性,典型功能2算法測(cè)試成功苟耻。

總結(jié)

  1. 對(duì)隱馬爾可夫模型做了簡要介紹
  2. 由于算法之間的加法篇恒、乘法方式和次數(shù)不同,理論上結(jié)果應(yīng)當(dāng)相同的算法之間也產(chǎn)生了偏差凶杖。這是數(shù)據(jù)結(jié)構(gòu)的精度所導(dǎo)致的胁艰。
  3. 今天是2.11除夕夜,這一篇其實(shí)寫了挺久的智蝠,主要這會(huì)兒看春晚太無聊就寫一下總結(jié)腾么。另外,祝各位網(wǎng)友牛年新年快樂杈湾!

參考資料

前后向算法的理解和公式
知乎資料

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末解虱,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子漆撞,更是在濱河造成了極大的恐慌殴泰,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件浮驳,死亡現(xiàn)場離奇詭異悍汛,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)至会,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門离咐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人奉件,你說我怎么就攤上這事健霹。” “怎么了瓶蚂?”我有些...
    開封第一講書人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵糖埋,是天一觀的道長。 經(jīng)常有香客問我窃这,道長瞳别,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任杭攻,我火速辦了婚禮祟敛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘兆解。我一直安慰自己馆铁,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開白布锅睛。 她就那樣靜靜地躺著埠巨,像睡著了一般历谍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辣垒,一...
    開封第一講書人閱讀 49,772評(píng)論 1 290
  • 那天望侈,我揣著相機(jī)與錄音,去河邊找鬼勋桶。 笑死脱衙,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的例驹。 我是一名探鬼主播捐韩,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼鹃锈!你這毒婦竟也來了荤胁?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤仪召,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后松蒜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扔茅,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年秸苗,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了召娜。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡惊楼,死狀恐怖玖瘸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情檀咙,我是刑警寧澤雅倒,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站弧可,受9級(jí)特大地震影響蔑匣,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜棕诵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一裁良、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧校套,春花似錦价脾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽犀变。三九已至,卻和暖如春座硕,著一層夾襖步出監(jiān)牢的瞬間弛作,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來泰國打工华匾, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留映琳,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓蜘拉,卻偏偏與公主長得像萨西,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子旭旭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容