本文以命名實(shí)體識(shí)別NER數(shù)據(jù)預(yù)處理為例
將訓(xùn)練集中每句話變成4個(gè)list:
- 第一個(gè)list是字,如[今驾孔,天芍秆,去,北翠勉,京]
- 第二個(gè)list是char_to_id [3,5,6,8,9]
- 第三個(gè)list是通過jieba得到的分詞妖啥,如[1,3,0,1,3] (1,詞的開始 2,詞的中間 3对碌,詞的結(jié)尾荆虱,0,單個(gè)詞)
- 第四個(gè)list是target 如[0,0,0,2,3] (非0元素對(duì)應(yīng)著tag_to_id中的數(shù)值)
Batch:
- 將訓(xùn)練集劃分成若干個(gè)batch
- 每個(gè)batch有20個(gè)句子
- 劃分時(shí)朽们,按句子長度從大到小排序
構(gòu)建模型:
- Input: 輸入兩個(gè)特征怀读,char_to_id的List及通過jieba分詞得到的特征list
- Embedding: 預(yù)先訓(xùn)練好的100維詞向量模型,通過查詢得到每個(gè)字的100維向量骑脱,加上分詞特征向量菜枷,輸出到dropout(0.5)
Bi-LSTM
- Project_layer: 兩層wx+b 邏輯回歸
- Loss_layer:內(nèi)嵌CRF
1. 加載數(shù)據(jù)集
ef load_sentences(path):
"""
加載數(shù)據(jù)集,每一行至少包含一個(gè)漢字和一個(gè)標(biāo)記
句子和句子之間是以空格進(jìn)行分割
最后返回句子集合
:param path:
:return:
"""
# 存放數(shù)據(jù)集
sentences = []
# 臨時(shí)存放每一個(gè)句子
sentence = []
for line in codecs.open(path, 'r', encoding='utf-8'):
# 去掉兩邊空格
line = line.strip()
# 首先判斷是不是空惜姐,如果是則表示句子和句子之間的分割點(diǎn)
if not line:
if len(sentence) > 0:
sentences.append(sentence)
# 清空sentence表示一句話完結(jié)
sentence = []
else:
if line[0] == " ":
continue
else:
word = line.split()
assert len(word) >= 2
sentence.append(word)
# 循環(huán)走完犁跪,要判斷一下椿息,防止最后一個(gè)句子沒有進(jìn)入到句子集合中
if len(sentence) > 0:
sentences.append(sentence)
return sentences
2. 更新數(shù)據(jù)集編碼
def update_tag_scheme(sentences, tag_scheme):
"""
更新為指定編碼
:param sentences:
:param tag_scheme:
:return:
"""
for i, s in enumerate(sentences):
tags = [w[-1] for w in s]
if not data_utils.check_bio(tags):
s_str = "\n".join(" ".join(w) for w in s)
raise Exception("輸入的句子應(yīng)為BIO編碼歹袁,請(qǐng)檢查輸入句子%i:\n%s" % (i, s_str))
if tag_scheme == "BIO":
for word, new_tag in zip(s, tags):
word[-1] = new_tag
if tag_scheme == "BIOES":
new_tags = data_utils.bio_to_bioes(tags)
for word, new_tag in zip(s, new_tags):
word[-1] = new_tag
else:
raise Exception("非法目標(biāo)編碼")
data_utils中的check_bio(tags)坷衍,,首先確定是不是合法的BIO
def check_bio(tags):
"""
檢測(cè)輸入的tags是否是bio編碼
如果不是bio編碼
那么錯(cuò)誤的類型
(1)編碼不在BIO中
(2)第一個(gè)編碼是I
(3)當(dāng)前編碼不是B,前一個(gè)編碼不是O
:param tags:
:return:
"""
for i, tag in enumerate(tags):
if tag == 'O':
continue
tag_list = tag.split("-")
if len(tag_list) != 2 or tag_list[0] not in set(['B', 'I']):
# 非法編碼
return False
if tag_list[0] == 'B':
continue
elif i == 0 or tags[i - 1] == 'O':
# 如果第一個(gè)位置不是B或者當(dāng)前編碼不是B并且前一個(gè)編碼0条舔,則全部轉(zhuǎn)換成B
tags[i] = 'B' + tag[1:]
elif tags[i - 1][1:] == tag[1:]:
# 如果當(dāng)前編碼的后面類型編碼與tags中的前一個(gè)編碼中后面類型編碼相同則跳過
continue
else:
# 如果編碼類型不一致枫耳,則重新從B開始編碼
tags[i] = 'B' + tag[1:]
return True
把BIO轉(zhuǎn)換成BIOES
def bio_to_bioes(tags):
"""
把bio編碼轉(zhuǎn)換成bioes編碼
返回新的tags
:param tags:
:return:
"""
new_tags = []
for i, tag in enumerate(tags):
if tag == 'O':
# 直接保留,不變化
new_tags.append(tag)
elif tag.split('-')[0] == 'B':
# 如果tag是以B開頭孟抗,那么我們就要做下面的判斷
# 首先迁杨,如果當(dāng)前tag不是最后一個(gè),并且緊跟著的后一個(gè)是I
if (i + 1) < len(tags) and tags[i + 1].split('-')[0] == 'I':
# 直接保留
new_tags.append(tag)
else:
# 如果是最后一個(gè)或者緊跟著的后一個(gè)不是I凄硼,那么表示單字铅协,需要把B換成S表示單字
new_tags.append(tag.replace('B-', 'S-'))
elif tag.split('-')[0] == 'I':
# 如果tag是以I開頭,那么我們需要進(jìn)行下面的判斷
# 首先摊沉,如果當(dāng)前tag不是最后一個(gè)狐史,并且緊跟著的一個(gè)是I
if (i + 1) < len(tags) and tags[i + 1].split('-')[0] == 'I':
# 直接保留
new_tags.append(tag)
else:
# 如果是最后一個(gè),或者后一個(gè)不是I開頭的说墨,那么就表示一個(gè)詞的結(jié)尾骏全,就把I換成E表示一個(gè)詞結(jié)尾
new_tags.append(tag.replace('I-', 'E-'))
else:
raise Exception('非法編碼')
return new_tags
3. 構(gòu)建字典映射
def word_mapping(sentences):
"""
構(gòu)建字典
:param sentences:
:return:
"""
word_list = [[x[0] for x in s] for s in sentences] # 得到所有的字
dico = data_utils.create_dico(word_list)
dico['<PAD>'] = 10000001
dico['<UNK>'] = 10000000
word_to_id, id_to_word = data_utils.create_mapping(dico)
return dico, word_to_id, id_to_word
Create_dico用來統(tǒng)計(jì)詞頻,這里也可以引入collections.Counter()來計(jì)算詞頻
def create_dico(item_list):
"""
對(duì)于item_list中的每一個(gè)items尼斧,統(tǒng)計(jì)items中item在item_list中的次數(shù)
item:出現(xiàn)的次數(shù)
:param item_list:
:return:
"""
assert type(item_list) is list
dico = {}
for items in item_list:
for item in items:
if item not in dico: #第一次出現(xiàn)姜贡,標(biāo)記為1
dico[item] = 1
else:
dico[item] += 1
return dico
根據(jù)詞頻來創(chuàng)建映射的方法create_mapping()
def create_mapping(dico):
"""
創(chuàng)建item to id, id_to_item
item的排序按詞典中出現(xiàn)的次數(shù)
:param dico:
:return:
"""
sorted_items = sorted(dico.items(), key=lambda x: (-x[1], x[0]))
id_to_item = {i: v[0] for i, v in enumerate(sorted_items)}
item_to_id = {v: k for k, v in id_to_item.items()}
return item_to_id, id_to_item
4. 構(gòu)造tag映射
def tag_mapping(sentences):
"""
構(gòu)建標(biāo)簽字典
:param sentences:
:return:
"""
tag_list = [[x[1] for x in s] for s in sentences]
dico = data_utils.create_dico(tag_list)
tag_to_id, id_to_tag = data_utils.create_mapping(dico)
return dico, tag_to_id, id_to_tag
5.prepare dataset
def prepare_dataset(sentences, word_to_id, tag_to_id, train=True):
"""
數(shù)據(jù)預(yù)處理,返回list其實(shí)包含
-word_list
-word_id_list
-word char indexs
-tag_id_list
:param sentences:
:param word_to_id:
:param tag_to_id:
:param train:
:return:
"""
none_index = tag_to_id['O']
data = []
for s in sentences:
word_list = [w[0] for w in s] # 集中所有的字
word_id_list = [word_to_id[w if w in word_to_id else '<UNK>'] for w in word_list] # 得到所有的字對(duì)應(yīng)的id
segs = data_utils.get_seg_features("".join(word_list))
if train:
tag_id_list = [tag_to_id[w[-1]] for w in s]
else:
tag_id_list = [none_index for w in s]
# 此時(shí)對(duì)于一個(gè)句子則得到4個(gè)特征列表
data.append([word_list, word_id_list, segs, tag_id_list])
return data
6. jieba來分詞棺棵,或者用nltk.word_tokenize(sentence)方法來分詞
def get_seg_features(words):
"""
利用jieba分詞
采用類似bioes的編碼楼咳,0表示單個(gè)字成詞, 1表示一個(gè)詞的開始, 2表示一個(gè)詞的中間烛恤,3表示一個(gè)詞的結(jié)尾
:param words:
:return:
"""
seg_features = []
word_list = list(jieba.cut(words))
for word in word_list:
if len(word) == 1:
seg_features.append(0)
else:
temp = [2] * len(word)
temp[0] = 1
temp[-1] = 3
seg_features.extend(temp)
return seg_features
7. 批量輸入batchManager
class BatchManager(object):
def __init__(self, data, batch_size):
self.batch_data = self.sort_and_pad(data, batch_size)
self.len_data = len(self.batch_data)
def sort_and_pad(self, data, batch_size):
num_batch = int(math.ceil(len(data) / batch_size))
sorted_data = sorted(data, key=lambda x: len(x[0])) # 按照長度對(duì)數(shù)據(jù)進(jìn)行排序操作
batch_data = list()
for i in range(num_batch):
# 按照批次進(jìn)行填充爬橡,所以每個(gè)批次的數(shù)據(jù)長度是一樣的
batch_data.append(self.pad_data(sorted_data[i * batch_size: (i + 1) * batch_size]))
return batch_data
填充數(shù)據(jù)pad_data
def pad_data(data):
word_list = []
word_id_list = []
seg_list = []
tag_id_list = []
max_length = max([len(sentence[0]) for sentence in data])
for line in data:
words, word_ids, segs, tag_ids = line
padding = [0] * (max_length - len(words))
word_list.append(words + padding)
word_id_list.append(word_ids + padding)
seg_list.append(segs + padding)
tag_id_list.append(tag_ids + padding)
return [word_list, word_id_list, seg_list, tag_id_list]
8. shuffle隨機(jī)打亂數(shù)據(jù)
# 隨機(jī)得到一個(gè)批次的數(shù)據(jù)
def iter_batch(self, shuffle=False):
if shuffle:
random.shuffle(self.batch_data)
for idx in range(self.len_data):
yield self.batch_data[idx]
到這里就算最基本的數(shù)據(jù)預(yù)處理完成,想了解更細(xì)節(jié)棒动,也可以通過matplot來看看數(shù)據(jù)集樣本分布糙申,比如train
import matplotlib.pyplot as plt
import collections
train_cnt = collections.Counter(map(len,train_data))
plt.bar(*zip(*train_cnt.items(),color='r'))
plt.xlabel('Sentence Length for Training Data')
plt.ylabel('Samples')
plt.show()
看一下各個(gè)不同命名實(shí)體識(shí)別的數(shù)量情況
def stat_entities(data):
cnt = collections.defaultdict(int)
for sentence in data:
for char, tag in sentence:
if tag.statswith("B-"):
cnt[tag] += 1
cnt["samples"] = len(data)
return cnt
print(stat_entities(train_data))
同理,也可查看dev_data和 test_data
除了以上最基本的預(yù)處理船惨,還有一些常用的柜裸,如去停用詞,我們創(chuàng)建一個(gè)stopwords.ttxt,這里面可以放一些日常場(chǎng)景需要除去的詞粱锐,如冠詞疙挺,人稱,數(shù)字等特定詞怜浅,用pd.read_csv打開即可铐然,因?yàn)閟topword.txt很小蔬崩,數(shù)據(jù)集如果較大用IO更快更節(jié)省內(nèi)存。
# 加載停用詞
stopwords = pd.read_csv('stopwords.txt', index_col=False, quoting=3, sep="\t", names=['stopword'], encoding='utf-8')
stopwords = stopwords['stopword'].values
分詞和停用詞相結(jié)合
# 定義分詞和打標(biāo)簽函數(shù)preprocess_text
# 參數(shù)content_lines即為上面轉(zhuǎn)換的list
# 參數(shù)sentences是定義的空list搀暑,用來儲(chǔ)存打標(biāo)簽之后的數(shù)據(jù)
# 參數(shù)category 是類型標(biāo)簽
def preprocess_text(content_lines, sentences, category):
for line in content_lines:
try:
segs = jieba.lcut(line)
segs = [v for v in segs if not str(v).isdigit()] # 去數(shù)字
segs = list(filter(lambda x: x.strip(), segs)) # 去左右空格
segs = list(filter(lambda x: len(x) > 1, segs)) # 長度為1的字符
segs = list(filter(lambda x: x not in stopwords, segs)) # 去掉停用詞
sentences.append((" ".join(segs), category)) # 打標(biāo)簽
except Exception:
print(line)
continue
另外沥阳,還有轉(zhuǎn)換大小寫等等具體會(huì)根據(jù)不同環(huán)境來處理數(shù)據(jù)。
總結(jié)一下數(shù)據(jù)預(yù)處理:
語料清洗:把不相關(guān)的自点,視為噪點(diǎn)的內(nèi)容刪除桐罕。人工去重,對(duì)齊桂敛,刪除和標(biāo)注等功炮。
分詞(以jieba為例):
- 精確分詞 jieba.cut(content, cut_all=False)
- 全模式 jieba.cut(content, cut_all=True) 所有的詞都掃描出來
- 搜索引擎模式 jieba.cut_for_search(content) 在精確分詞基礎(chǔ)上再切分,提高召回率
- lcut代替cut jieba.lcut(content) 這樣做為了返回List
- 獲取詞性:import jieba.posseg as psg psg.lcut(content)
詞性標(biāo)注:就是給每次詞語打標(biāo)簽术唬,這樣可以讓文本在后面處理時(shí)融入更多的有用的語言信息薪伏。分為基于規(guī)則和基于統(tǒng)計(jì)兩種,基于統(tǒng)計(jì)如最大熵粗仓,HMM,CRF
去停用詞:這個(gè)比較靈活嫁怀,一般情況下標(biāo)點(diǎn)符號(hào),語氣詞潦牛,人稱等都可以去掉眶掌,把常用停用詞放進(jìn)一個(gè)文檔,用時(shí)候直接調(diào)用巴碗。但是做情感分類時(shí)候朴爬,語氣詞,感嘆詞也是應(yīng)該保留的橡淆,因?yàn)檫@些詞對(duì)語氣程度召噩,感情色彩有一定的貢獻(xiàn)和意義。
放進(jìn)字典逸爵,詞和tag都轉(zhuǎn)換成id具滴,zip成新的數(shù)組。大篇幅就是處理這個(gè)玩意师倔,這里就不啰嗦了构韵。