前言
分享一些Mongdb常用的數(shù)據(jù)清洗方式
注:"Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in." 原因與解決:
mongdb_bigdata_error.png
- 原因
大數(shù)據(jù)計(jì)算、數(shù)據(jù)統(tǒng)計(jì)時(shí)穴吹,每個(gè)計(jì)算任務(wù)(job或task)都會(huì)使用獨(dú)立有限的內(nèi)存空間呢铆,mongodb沒有提供復(fù)雜的內(nèi)存分配模型(任務(wù)調(diào)度算法)峡蟋,僅限定每個(gè)stage最多使用100M內(nèi)存诉位,如果超過此值將終止計(jì)算并返回error;為了支持較大數(shù)據(jù)集合的處理逊彭,我們可以指定“allowDiskUse”參數(shù)將“溢出”的數(shù)據(jù)寫入本地的臨時(shí)文件中(臨時(shí)的collection)瓦戚,這個(gè)參數(shù)我們通常需要設(shè)定為true。
- 解決方案:
{allowDiskUse:true}
查詢重復(fù)數(shù)據(jù)
db.feedImg_all.aggregate([
{
$group: { _id: {'mvid': '$mvid','feed_id':'$feed_id'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
},
{
$match: {count: {$gt: 1}}
}
],{allowDiskUse:true})
刪除重復(fù)數(shù)據(jù)
db.xiuxiu_all.aggregate([
{
$group: { _id: {'mvid': '$mvid','feed_id':'$feed_id'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
},
{
$match: {count: {$gt: 1}}
}
],{allowDiskUse:true}).forEach(function(doc){
doc.dups.shift(); // 去除重復(fù)組的第一個(gè)元數(shù)據(jù)_id摸吠,得到除第一個(gè)之外的其他元組
db.xiuxiu_all.remove({_id: {$in: doc.dups}}); // remove()刪除這些重復(fù)的數(shù)據(jù)
})
刪除重復(fù)數(shù)據(jù)(python版本)
# -*- coding:utf-8 -*-
import pymongo
from pymongo import DeleteOne
'''
@author: lcx
@time: 2018/11/15
@desc:
'''
pipeline = [
{
'$group': {
'_id': {'mvid': '$mvid', 'feed_id': '$feed_id'},
'count': {'$sum': 1},
'dups': {
'$addToSet': '$_id'
}
},
},
{
'$match': {
'count': {
'$gt': 1
}
}
}
]
myclient = pymongo.MongoClient(host='m3005.test.com',port=3005,connect=False)
db = myclient.deepnet_test
if __name__ == '__main__':
map_id = map(lambda doc: doc['dups'][1:], db['xiuxiu_all'].aggregate(pipeline=pipeline,allowDiskUse=True))
list_id = [item for sublist in map_id for item in sublist]
print(db['xiuxiu_all']
.bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), list_id)))
.bulk_api_result)
復(fù)制collection里的數(shù)據(jù)到另一個(gè)collection中
db.xiuxiu_all.find().forEach(function(x){
db.xiuxiu_all_bak.insert(x);
})
過濾重復(fù)字段并統(tǒng)計(jì)總記錄數(shù)
db.feedImg_all.aggregate(
[
{$match:{"createTime": {"$gte": 1541606400, "$lt": 1541692800}}}, // 添加過濾條件
{$project:{"feedId": true}},
{$group:{_id: "$feedId"}},
{$group:{_id: null, count: {$sum:1}}}
], {allowDiskUse: true})