使用Mongo聚合操作來(lái)進(jìn)行重復(fù)的數(shù)據(jù)項(xiàng)清洗敬飒,并使用PyMongo加入到數(shù)據(jù)清洗組件中。
當(dāng)前環(huán)境:PyMongo 3.6.1 / MongoDB 3.4.7 / Python 3.6.4 :: Anaconda, Inc.
在爬蟲中斷續(xù)爬時(shí)會(huì)出現(xiàn)少量數(shù)據(jù)重復(fù)的問(wèn)題判耕,我將數(shù)據(jù)去重放在了數(shù)據(jù)清洗環(huán)節(jié)柠横,清洗的過(guò)程中順帶將重復(fù)的數(shù)據(jù)刪除靶擦。
Mongo老版本的解決方案是建立單一索引托修,Mongo3.+可以使用聚合操作將重復(fù)的數(shù)據(jù)檢索出來(lái)并進(jìn)行刪除。
元數(shù)據(jù)結(jié)構(gòu)如下:
item = {
"_id" : ObjectId("..."),
"title" : "...", # 數(shù)據(jù)標(biāo)題
"date" : "...", # 數(shù)據(jù)日期
"url" : "...", # 數(shù)據(jù)來(lái)源
"content" : "...",
"source" : "..."
"category" : "...",
...
}
需要根據(jù)「相同標(biāo)題+相同日期+相同來(lái)源」判定數(shù)據(jù)重復(fù)饶米,在管道中根據(jù)這三項(xiàng)條件分組($group)后計(jì)數(shù)將數(shù)量>1的匹配($match)出來(lái)桨啃,最后遍歷刪除(db.collections.remove())
聚合操作的過(guò)程
$group: 使用title/date/url作為條件進(jìn)行分組組成新的_id,并計(jì)數(shù)+1檬输,dups中存放元數(shù)據(jù)的_id
$match: 在$group得到的分組基礎(chǔ)上匹配數(shù)量>1的項(xiàng)
Mongo Shell 查詢重復(fù)數(shù)據(jù)的操作如下:
db.test.aggregate([
{
$group: { _id: {'title': '$title','date':'$date','url': '$url'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
},
{
$match: {count: {$gt: 1}}
}
])
Mongo Shell 將查詢到的結(jié)果刪除操作:
db.test.aggregate([
... // 同上聚合操作照瘾,此處略
]).forEach(function(doc){
doc.dups.shift(); // 去除重復(fù)組的第一個(gè)元數(shù)據(jù)_id,得到除第一個(gè)之外的其他元組
db.test.remove({_id: {$in: doc.dups}}); // remove()刪除這些重復(fù)的數(shù)據(jù)
})
PyMongo 操作代碼如下:
使用bulk_write()進(jìn)行批量刪除
pipeline = [
{
'$group': {
'_id': {'title': '$title', 'date': '$date', 'url': '$url'},
'count': {'$sum': 1},
'dups': {
'$addToSet': '$_id'
}
},
},
{
'$match': {
'count': {
'$gt': 1
}
}
}
]
map_id = map(lambda doc: doc['dups'][1:], db['data_value'].aggregate(pipeline=pipeline))
list_id = [item for sublist in map_id for item in sublist]
print(db['data_value'] \
.bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), list_id))) \
.bulk_api_result)
一行代碼鬼畜版:
print(db['data_value'].bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), [item for sublist in map(lambda doc: doc['dups'][1:], db['data_value'].aggregate(pipeline=[{'$group': {'_id': {'title': '$title', 'date': '$date', 'url': '$url'},'count': {'$sum': 1},'dups': {'$addToSet': '$_id'}},},{'$match': {'count': {'$gt': 1}}}])) for item in sublist]))).bulk_api_result)