馬爾可夫假設(shè):
有限歷史假設(shè):P(Xt+1=sk|X1,...,Xt) = P(Xt+1=sk|Xt)
時間不變假設(shè):?i∈{1, 2, ..., T}, ?x, y∈S, P(xi=y|xi-1 = x) = P(y|x)
馬爾可夫模型形式化表述:
MM: λ=(S, π, A)
- S: 狀態(tài)集合
- π: 初始狀態(tài)概率
- A: 狀態(tài)間轉(zhuǎn)移概率
隱馬爾可夫模型形式化表述:
HMM: λ=(S, O, π, A, B)
- S: 狀態(tài)集合
- O: 每個狀態(tài)可能的輸出集合
- π: 初始狀態(tài)概率
- A: 狀態(tài)間轉(zhuǎn)移概率
- B: 觀察值概率矩陣
HMM可分為兩部分:
- Markov鏈:由π,A描述茬祷,輸出為狀態(tài)序列
- 隨機(jī)過程:由B描述路星,產(chǎn)生輸出為觀測序列
HMM三個問題:
- 評估。給定模型 λ=(S, O, π, A, B)匹中,如何高效計算某一輸出字符序列的概率P(O|λ)?
答案:向前算法,向后算法
- 解碼豪诲。給定輸出序列O和一個模型λ顶捷,如何確定產(chǎn)生這一序列概率的最大狀態(tài)序列(q1, q2, ..., qT)?
答案:Viterbi算法
- 參數(shù)估計。給定一個觀察序列O屎篱,如何調(diào)整模型λ參數(shù)服赎,使得產(chǎn)生O的概率最大?
答案:Baum-Welch統(tǒng)計方法交播,EM算法(抽空整理一下EM算法)
評估問題和解碼問題都可用動態(tài)規(guī)劃解決重虑,可以參考我博客中的Dynamic Time Warping這篇文章中動態(tài)規(guī)劃算法,找到O的最大概率和找到最佳狀態(tài)轉(zhuǎn)移可以合并為一個問題秦士。通過正向計算最大概率缺厉,然后通過回溯解決最佳狀態(tài)轉(zhuǎn)移。【這種方法只能找到一條最佳的狀態(tài)轉(zhuǎn)移】
但是在實(shí)際情況中,情況比較復(fù)雜芽死,HMM的狀態(tài)轉(zhuǎn)移路徑可能會涉及多條概率相同的路徑乏梁,這樣基于上一篇文章中提到的算法就不太容易處理回溯中遇到多條路徑的問題了。
Viterbi算法
Viterbi算法也是動態(tài)規(guī)劃关贵,在解碼問題中的DP公式為:
DP遞推公式:δt(j) = max[δt-1(i) × aij × bj(ot)]
- δt(j):t 時刻沿一條狀態(tài)路徑q1, q2, ..., qt遇骑,且qt=j,即 t 時刻處于狀態(tài)j揖曾,產(chǎn)生o1, o2, ..., ot的最大概率
- aij:從狀態(tài)i到狀態(tài)j的轉(zhuǎn)移概率
- bj(ot):狀態(tài) j 輸出ot的概率
Viterbi練習(xí)題:杭電OJ:Peter's Hobby
題目描述:根據(jù)輸入的觀測序列落萎,找到最大可能的狀態(tài)序列。這題動態(tài)規(guī)劃和Viterbi都可以做炭剪。
狀態(tài)發(fā)射概率矩陣
狀態(tài)轉(zhuǎn)移概率矩陣
算法思路:
- 找到初始狀態(tài)選擇概率练链;
- 遞歸尋找最優(yōu)子結(jié)構(gòu),從前向后找到最佳的狀態(tài)轉(zhuǎn)移路徑奴拦;每次遞歸僅保留最大概率的路徑媒鼓,同時回溯剪掉小概率路徑;
- 輸出狀態(tài)路徑
Python Viterbi實(shí)現(xiàn):
from state import State
def transition_probability(prb, emission, last_state, current_state, trans_prb_table, emission_prb_table):
"""
當(dāng)前狀態(tài)概率 = 上一次最優(yōu)概率 * 狀態(tài)轉(zhuǎn)移概率 * 序列發(fā)射概率
:param prb: 上一次最優(yōu)概率
:param last_state: 上一次狀態(tài)
:param emission: 當(dāng)前時刻發(fā)射概率
:param current_state: 當(dāng)前狀態(tài)
:param trans_prb_table: 狀態(tài)轉(zhuǎn)移概率表
:param emission_prb_table: 狀態(tài)發(fā)射字符概率
:return:
"""
if last_state is None:
return emission_prb_table[current_state][emission]
return prb * trans_prb_table[last_state][current_state] * emission_prb_table[current_state][emission]
def reverse(state_node):
"""
回溯刪除不可行路徑错妖,將next—state從父節(jié)點(diǎn)的next列表中移除
:return:
"""
previous_node = state_node.previous
if previous_node is None:
# 如果到起始點(diǎn)則返回
return
previous_node.next.remove(state_node)
# 如果該狀態(tài)沒有子狀態(tài)绿鸣,則說明該狀態(tài)可以從路徑中移除,遞歸刪除
if len(previous_node.next) == 0:
reverse(previous_node)
def search_state(last_state, order, transition_prb_table, emission_prb_table, days):
"""
搜索HMM狀態(tài)網(wǎng)格暂氯,由于可能會存在多條概率相同的路徑潮模,因此要用列表存儲
:param last_state: 上一次的狀態(tài)列表
:param order: 時間序列索引
:param transition_prb_table:狀態(tài)轉(zhuǎn)移概率表
:param emission_prb_table: 狀態(tài)發(fā)射字符概率表
:param days: 觀測序列
:return:
"""
if order == len(days):
return last_state
humidity = days[order]
# 記錄當(dāng)前路徑中最大的概率,用于回溯修剪路徑
current_path_maximum_prb = -1
# 從前向后搜索最佳的狀態(tài)結(jié)點(diǎn)
for state in last_state:
maximum_prb = -1
next_state = []
for current_state in transition_prb_table.keys():
# 判斷轉(zhuǎn)移到哪個狀態(tài)輸出目標(biāo)天氣的概率最大
prb = transition_probability(state.prb, humidity, state.state, current_state, transition_prb_table,
emission_prb_table)
if prb > maximum_prb:
maximum_prb = prb
next_state.clear()
next_state.append(State(current_state, maximum_prb, state))
elif prb == maximum_prb:
next_state.append(State(current_state, maximum_prb, state))
state.next += next_state
if maximum_prb > current_path_maximum_prb:
current_path_maximum_prb = maximum_prb
# 記錄當(dāng)前狀態(tài)痴施,用于下一次遞歸
current_states = []
# 保留概率高的子路徑
for state in last_state:
for node in state.next:
if node.prb < current_path_maximum_prb:
state.next.remove(node)
else:
current_states.append(node)
# 如果當(dāng)前狀態(tài)沒有向外繼續(xù)轉(zhuǎn)移擎厢,則遞歸刪除路徑
if len(state.next) == 0:
reverse(state)
# 遞歸尋找轉(zhuǎn)移狀態(tài)
return search_state(current_states, order + 1, transition_prb_table, emission_prb_table, days)
def init_data():
"""
初始化數(shù)據(jù)
:return:
"""
trans_prb_table = {'sunny': {'sunny': 0.5, 'cloudy': 0.375, 'rainy': 0.125},
'rainy': {'sunny': 0.25, 'cloudy': 0.375, 'rainy': 0.375},
'cloudy': {'sunny': 0.25, 'cloudy': 0.125, 'rainy': 0.625}}
emission_prb_table = {'sunny': {'dry': 0.6, 'dryish': 0.2, 'damp': 0.15, 'soggy': 0.05},
'rainy': {'dry': 0.05, 'dryish': 0.1, 'damp': 0.35, 'soggy': 0.5},
'cloudy': {'dry': 0.25, 'dryish': 0.3, 'damp': 0.2, 'soggy': 0.25}}
return trans_prb_table, emission_prb_table
def backwards(state_node):
"""
采用棧模擬,遞歸輸出狀態(tài)序列
:param state_node: 狀態(tài)節(jié)點(diǎn)
:return:
"""
stack = [state_node.state]
# 如果當(dāng)前狀態(tài)節(jié)點(diǎn)的前趨節(jié)點(diǎn)為起始節(jié)點(diǎn)辣吃,則遞歸返回
if state_node.previous.prb == -1:
return [state_node.state]
stack += backwards(state_node.previous)
return stack
if __name__ == '__main__':
transition_prb_table, emission_prb_table = init_data()
days = ['dry', 'damp', 'soggy']
# 記錄路徑起點(diǎn)
start_point = State('start', -1, None)
# 初始化初始狀態(tài)
maximum = -1
states = []
for state in transition_prb_table.keys():
prb = transition_probability(1, days[0], None, state, transition_prb_table, emission_prb_table)
if prb > maximum:
states.clear()
states.append(State(state, prb, start_point))
maximum = prb
elif prb == maximum:
states.append(State(state, prb, start_point))
start_point.next += states
end_states = search_state(states, 1, transition_prb_table, emission_prb_table, days)
# 采用棧模擬动遭,輸出多條概率最大的狀態(tài)路徑
for state in end_states:
state_seq = backwards(state)
# 狀態(tài)出棧
while len(state_seq) != 0:
print(state_seq.pop())
Python代碼是C++代碼轉(zhuǎn)過來的,沒有改題目的輸入要求神得,以上代碼僅僅提供一個思路沽损。以上代碼考慮了存在多于一條路徑的情況,因此比單一路徑的代碼(比如)要復(fù)雜一些循头。由于路徑采用鏈表存儲,同時也多了一步回溯剪枝炎疆。