隱馬爾可夫模型
隱馬爾可夫鏈模型 Hidden Markov Model
首先應(yīng)當(dāng)了解馬爾科夫鏈模型,隱馬爾可夫鏈模型即可以解決馬爾可夫鏈模型狀態(tài)量非常多的情況下(如 連續(xù)值)轉(zhuǎn)移矩陣過大的問題众羡,并由它轉(zhuǎn)化而來留攒。解決原理是假定當(dāng)前狀態(tài)(稱為顯狀態(tài))僅由隱狀態(tài)決定仍稀,由于隱狀態(tài)可以比當(dāng)前狀態(tài)少很多,因此轉(zhuǎn)移矩陣參數(shù)更少鳄乏。
隱馬爾可夫模型的分類:離散、連續(xù)。
隱馬爾可夫模型的典型功能
- 根據(jù)一個(gè)或多個(gè)已知的可見層和隱藏層訓(xùn)練模型饭弓,獲得釋放矩陣和狀態(tài)轉(zhuǎn)移矩陣,得到模型參數(shù)媒抠。
- 使用前向算法弟断、后向算法或者暴力算法,根據(jù)已知模型的參數(shù)趴生,求出某一已知可見層序列的發(fā)生概率阀趴。
- 使用維特比(Viterbi)算法根據(jù)一個(gè)可見層序列推導(dǎo)出最有可能的隱藏層序列。
python實(shí)現(xiàn)
此處利用python對(duì)離散的隱馬爾可夫模型進(jìn)行建模苍匆,滿足其訓(xùn)練刘急、計(jì)算和推導(dǎo)的功能。
由于篇幅限制浸踩,本篇僅實(shí)現(xiàn)其前向算法叔汁、后向算法、遍歷(暴力)算法的實(shí)現(xiàn)检碗,和相關(guān)準(zhǔn)備工作
下面是hmm類本體get_prob
實(shí)現(xiàn)典型功能2的功能据块,而fit
在本章中使用具有隱藏層和可見層的數(shù)據(jù)進(jìn)行計(jì)算,而Baum-Welch算法和Viterbi算法在后續(xù)篇幅進(jìn)行介紹折剃,本類對(duì)相關(guān)算法部分留下了篇幅
#hmm.py
from operator import truediv
import numpy as np
class discrete_hidden_Markov_model:
def __init__(self, debug:bool=False):
#typical 5 model params of hmm(lambda(Pi, A, B))
self.vis_kind = None
self.invis_kind = None
self.init_state_mat = None
self.trans_state_mat = None
self.emitter_mat = None
#configurations
self.debug = debug
#from a visible chain, or a visible chain with an invisible chain, infer the params of the hmm model
def fit(self, vis_data:np.ndarray, invis_data:np.ndarray=None)->None:
#if invis_data is None, then use baum-welch algrithm, if invis_data is given, then use ordinary probability calculation
if invis_data is None:#use bw automatically
pass
else:
if np.shape(vis_data) != np.shape(invis_data):
print("[-] length of two chains are not equal");return
else:
data_len = np.shape(vis_data)[1]
data_num = np.shape(vis_data)[0]
self.vis_kind = vis_data.max() + 1
self.invis_kind = invis_data.max() + 1
# to fill the initial state matrix
self.init_state_mat = np.zeros((self.invis_kind,), dtype=np.float64)
for i in range(data_num):
self.init_state_mat[invis_data[i, 0]] += 1#only the head of one chain
self.init_state_mat /= np.linalg.norm(self.init_state_mat, ord=1)
if self.debug : print(self.init_state_mat)
# to fill the state transfer matrix
self.trans_state_mat = np.zeros((self.invis_kind, self.invis_kind), dtype=np.float64)
for k in range(data_num):
for i in range(data_len - 1):
self.trans_state_mat[invis_data[k, i], invis_data[k, i + 1]] += 1
for i in range(self.invis_kind):
self.trans_state_mat[i] /= np.linalg.norm(self.trans_state_mat[i], ord=1)
if self.debug:print(self.trans_state_mat)
# to fill the emitter matrix
self.emitter_mat = np.zeros((self.invis_kind, self.vis_kind), dtype=np.float64)
for k in range(data_num):
for i in range(data_len):
self.emitter_mat[invis_data[k, i],vis_data[k, i]] += 1
for i in range(self.invis_kind):
self.emitter_mat[i] /= np.linalg.norm(self.emitter_mat[i], ord=1)
if self.debug:print(self.emitter_mat)
#get the specific probability from a known visible chain, with the params of the model known
def get_prob(self, vis_data:np.ndarray, algorithm:str="forward")->float:
'''algorithm : 'traversal' / 'forward' / 'backward'
'''
vis_data = vis_data.ravel()
if self.emitter_mat is None or self.trans_state_mat is None:
print("[-] params have not been infered yet"); return
probability = 0.
if algorithm == 'forward':
data_infer_len = vis_data.shape[0]
prob_data_arr = np.zeros((self.invis_kind, data_infer_len), dtype=np.float64)
#first col of prob
for j in range(self.invis_kind):
prob_data_arr[j, 0] = self.init_state_mat[j] * self.emitter_mat[j, vis_data[0]]
#total time complexity : O(T*N^2)
for i in range(1, data_infer_len):
#calculate this matrix of probability forward
#time complexity : O(N^2)
for j in range(self.invis_kind):# indicator for forward invis nodes
prob_back_node = 0.
for k in range(self.invis_kind):# indicator for last(back) invis nodes
prob_back_node += prob_data_arr[k, i-1] * self.trans_state_mat[k, j] * self.emitter_mat[j, vis_data[i]]
prob_data_arr[j, i] = prob_back_node
if self.debug:print(prob_data_arr)
probability = prob_data_arr[:, -1].sum()
if self.debug:print(probability)
elif algorithm == 'backward':
data_infer_len = vis_data.shape[0]
prob_data_arr = np.zeros((self.invis_kind, data_infer_len+1), dtype=np.float64)
#last col of the prob
for j in range(self.vis_kind):
prob_data_arr[j, -1] = 1
#total time complexity : O(T*N^2)
for i in range(data_infer_len - 1, -1, -1):
#calculate this matrix of probability backward
#time complexity : O(N^2)
for j in range(self.invis_kind):#indicator for backward invis nodes
prob_front_node = 0.
for k in range(self.invis_kind):#indicator for last(front) invis nodes
prob_front_node += prob_data_arr[k, i+1] * self.trans_state_mat[j, k] * self.emitter_mat[j, vis_data[i]];
prob_data_arr[j, i] = prob_front_node
for j in range (self.vis_kind):
probability += prob_data_arr[j, 0] * self.init_state_mat[j]
if self.debug:print(prob_data_arr)
elif algorithm == 'traversal':
invis_data = np.zeros(vis_data.shape, dtype=np.int)
#generate all possible invisible sequences, with traversing method
data_infer_len = vis_data.shape[0]
for i in range(self.invis_kind**data_infer_len-1):
probability_this_chain = 1.
#calculate the probability of this very sequence and add it to total probability
for k in range(data_infer_len):
# P(vis_data[k]|invis_data[k])
# not bayes law type, reason invis -> result vis : regard invis as reason
probability_this_chain *= self.emitter_mat[invis_data[k], vis_data[k]]
for k in range(0, data_infer_len - 1):
# P(invis_data[k]->invis_data[k+1])
probability_this_chain *= self.trans_state_mat[invis_data[k], invis_data[k+1]]
probability_this_chain *= self.init_state_mat[invis_data[0]]
# if self.debug : print(probability_this_chain)
probability += probability_this_chain
#update the chain
invis_data[0] += 1
for j in range(data_infer_len - 1):
if invis_data[j] == self.invis_kind:
invis_data[j] = 0
invis_data[j+1] += 1
if self.debug : print(probability)
else:
print("[-] should choose the right algorithm");return
return probability
def get_hidden(self, vis_data:np.ndarray, algorithm:str="Viterbi")->np.ndarray:
'''algorithm : 'Viterbi"
'''
if algorithm == "Viterbi":
pass
else:
print("[-] should choose the right algorithm");return
類似的數(shù)據(jù)集很少瑰钮,因此本人自制了一個(gè)生成隱馬爾可夫鏈的程序。還可以根據(jù)生成時(shí)的原始參數(shù)檢驗(yàn)各種算法的正確性微驶。
#data_gen.py
#this file is used to generate original data for hmm training
import numpy as np
#suppose 3 invis state, 3 vis state
chain_len = 100
chain_num = 500
#initial invisible state probability
# prob 0 1 2
# p p p
init_state_prob = np.array([0.3, 0.2, 0.5])
#here we assume this to be not equal
#invisible state transfer matrix
# past\cur 0 1 2
# 0 p p p
# 1 p p p
# 2 p p p
trans_state_mat = np.array([
[0.2, 0.4, 0.4],
[0.4, 0.4, 0.2],
[0.3, 0.3, 0.4]
])
#invisible state -> visible state matrix
# invis\vis 0 1 2
# 0 p p p
# 1 p p p
# 2 p p p
emitter_mat = np.array([
[0.3, 0.4, 0.3],
[0.5, 0.3, 0.2],
[0.2, 0.5, 0.3]
])
def get_chain()->tuple([np.ndarray, np.ndarray]):
vis_chains = np.ndarray((chain_num, chain_len), dtype=np.int)
invis_chains = np.ndarray((chain_num, chain_len), dtype=np.int)
for i in range(chain_num):
#generate invisible chain
start_code = np.random.random()
invis_chain = list()
if start_code < init_state_prob[0]:
start_invis_state = np.array([1., 0., 0.])
invis_chain.append(0)
elif start_code < init_state_prob[1] + init_state_prob[0]:
start_invis_state = np.array([0., 1., 0.])
invis_chain.append(1)
else:
start_invis_state = np.array([0., 0., 1.])
invis_chain.append(2)
invis_state = start_invis_state
while len(invis_chain) < chain_len:
next_state_prob = (invis_state @ trans_state_mat)
next_code = np.random.random()
if next_code < next_state_prob[0]:
invis_state = np.array([1., 0., 0.])
invis_chain.append(0)
elif next_code < next_state_prob[0] + next_state_prob[1]:
invis_state = np.array([0., 1., 0.])
invis_chain.append(1)
else:
invis_state = np.array([0., 0., 1.])
invis_chain.append(2)
#generate visible chain
vis_chain = list()
for invis_state_code in invis_chain:
if invis_state_code == 0:
invis_state = np.array([1., 0., 0.])
elif invis_state_code == 1:
invis_state = np.array([0., 1., 0.])
else:
invis_state = np.array([0., 0., 1.])
vis_state_prob = invis_state @ emitter_mat
vis_code = np.random.random()
if vis_code < vis_state_prob[0]:
vis_chain.append(0)
elif vis_code < vis_state_prob[0] + vis_state_prob[1]:
vis_chain.append(1)
else:
vis_chain.append(2)
vis_chains[i, :] = vis_chain
invis_chains[i, :] = invis_chain
return vis_chains, invis_chains
編寫代碼產(chǎn)生對(duì)象進(jìn)行測(cè)試
#main.py
import numpy as np
from data_gen import get_chain
import data_gen
from hmm import discrete_hidden_Markov_model
#initialize
yyz_discrete_HMM_machine = discrete_hidden_Markov_model(debug=False)
# strict test
# yyz_discrete_HMM_machine.emitter_mat = data_gen.emitter_mat
# yyz_discrete_HMM_machine.trans_state_mat = data_gen.trans_state_mat
# yyz_discrete_HMM_machine.init_state_mat = data_gen.init_state_prob
# yyz_discrete_HMM_machine.invis_kind = 3
# yyz_discrete_HMM_machine.vis_kind = 3
#fit test
vis, invis = get_chain()
yyz_discrete_HMM_machine.fit(vis, invis)
#print params
print(yyz_discrete_HMM_machine.emitter_mat, yyz_discrete_HMM_machine.trans_state_mat, yyz_discrete_HMM_machine.init_state_mat)
#normalization test : total probability is 1.
data_infer_len = 6
vis_data = np.zeros((data_infer_len,), dtype=np.int)
# generate all possible invisible sequences, with traversing method
prob_for = 0.
prob_tra = 0.
prob_bak = 0.
for i in range(3**data_infer_len):
prob_tra += yyz_discrete_HMM_machine.get_prob(vis_data, algorithm='traversal')
prob_bak += yyz_discrete_HMM_machine.get_prob(vis_data, algorithm='backward')
prob_for += yyz_discrete_HMM_machine.get_prob(vis_data, algorithm='forward')
vis_data[0] += 1
for j in range(data_infer_len - 1):
if vis_data[j] == 3:
vis_data[j] = 0
vis_data[j+1] += 1
print(prob_for, prob_tra, prob_bak)
#result
擬合結(jié)果
發(fā)射矩陣
[[0.30223322 0.3989192 0.29884758]
[0.50214004 0.29603819 0.20182177]
[0.20046293 0.49771578 0.30182128]]
狀態(tài)轉(zhuǎn)移矩陣
[[0.20398501 0.4028408 0.3931742 ]
[0.39799201 0.39865764 0.20335034]
[0.30328927 0.29505072 0.40166001]]
初始概率
[0.298 0.224 0.478]
歸一化結(jié)果
1.0 0.9950028676787088 0.9999999999999998
得到的概率矩陣和生成代碼時(shí)在誤差允許范圍內(nèi)相等浪谴,說明本初級(jí)統(tǒng)計(jì)型擬合程序有效开睡;三種算法都具有歸一化的特性,典型功能2算法測(cè)試成功苟耻。
總結(jié)
- 對(duì)隱馬爾可夫模型做了簡要介紹
- 由于算法之間的加法篇恒、乘法方式和次數(shù)不同,理論上結(jié)果應(yīng)當(dāng)相同的算法之間也產(chǎn)生了偏差凶杖。這是數(shù)據(jù)結(jié)構(gòu)的精度所導(dǎo)致的胁艰。
- 今天是2.11除夕夜,這一篇其實(shí)寫了挺久的智蝠,主要這會(huì)兒看春晚太無聊就寫一下總結(jié)腾么。另外,祝各位網(wǎng)友牛年新年快樂杈湾!