MongoDB聚合操作用于對(duì)數(shù)據(jù)的批量操作膀藐,將集合按條件分組后在進(jìn)行一系列操作屠阻,諸如求和、求均值等额各。聚合操作能對(duì)集合進(jìn)行復(fù)雜的操作国觉,主要用于數(shù)理統(tǒng)計(jì)和數(shù)據(jù)挖掘。MongoDB中聚合操作的輸入是i集合中的文檔虾啦,輸出可以是一個(gè)文檔麻诀,也可是多條文檔痕寓。MongoDB提供非常強(qiáng)大的聚合操作,可分為三種方式
- 聚合管道(Aggregation Pipeline)
- 單目聚合操作(Single Purpose Aggregation Operation)
- MapReduce 編程模型
聚合管道
POSIX多線程使用方式中蝇闭,有種叫做管道(流水線)的方式呻率,其數(shù)據(jù)元素流串行地被一組線程按順序執(zhí)行。
聚合管道由階段(Stage)組成呻引,文檔在一個(gè)階段處理完畢后礼仗,聚合管道會(huì)把處理結(jié)果傳到下一個(gè)階段。聚合管的功能
- 對(duì)文檔進(jìn)行過濾逻悠,查詢出符合條件的文檔元践。
- 對(duì)文檔進(jìn)行變換,改變文檔的輸出形式童谒。
聚合管道的每個(gè)階段使用階段操作符(Stage Operators)定義单旁,在每個(gè)階段操作符中可使用表達(dá)式操作符(Expression Operators)計(jì)算總和、均值饥伊、拼接或分割字符串等操作象浑,直到每個(gè)階段完結(jié)最后返回結(jié)果,返回的結(jié)果可直接輸出琅豆,也可存儲(chǔ)到集合中愉豺。
處理流程
-
db.collection.aggregate()
可同時(shí)使用多個(gè)管道,方便數(shù)據(jù)處理趋距。 -
db.collection.aggregate()
使用 MongoDB 內(nèi)置原生操作,聚合效率高且支持類似SQL中GroupBy的操作越除,不在需要用戶編寫自定義的JS例程节腐。 - 每個(gè)階段管道限制100M的內(nèi)存,若單節(jié)點(diǎn)管道超出極限摘盆,MongoDB產(chǎn)生錯(cuò)誤翼雀。為了能夠處理大型數(shù)據(jù)集,可設(shè)置
allowDiskUse
為true
為聚合管道節(jié)點(diǎn)把數(shù)據(jù)寫入臨時(shí)文件孩擂,以解決100M內(nèi)存限制狼渊。 -
db.collection.aggregate()
可作用于分片集合,但結(jié)果不能輸在分片集合类垦,MapReduce可作用于在分片集合其結(jié)果也可輸在分片集合中狈邑。 -
db.collection.aggregate()
返回一個(gè)指針(cursor),數(shù)據(jù)存放在內(nèi)存中可直接操作蚤认,跟MongoShell一樣米苹。 -
db.collection.aggregate()
輸出的結(jié)果只能保存在文檔中,BSON Document大小限制為16M砰琢。
語法解析
db.collection.aggregate(pipeline, options)
db.collection.aggregate([{<stage>,...}], ...)
pipeline 參數(shù)
-
$project
對(duì)輸入文檔添加新字段或刪除現(xiàn)有字段蘸嘶,可自定義顯示哪些字段良瞧。 -
$match
根據(jù)條件過濾僅輸出符合條件的文檔,若放在pipeline前面训唱,根據(jù)條件過濾數(shù)據(jù)并傳輸?shù)綄懸粋€(gè)階段管道褥蚯,可提高后續(xù)數(shù)據(jù)處理效率。也可放在out之前用以對(duì)結(jié)果再一次過濾况增。 -
$redact
字段所處的document結(jié)構(gòu)的級(jí)別 -
$limit
用來限制MongoDB聚合管道返回的文檔數(shù)量 -
$skip
在聚合管道中跳過指定數(shù)量的文檔并返回剩余的文檔 -
$unwind
將文檔中某個(gè)數(shù)組類型字段拆分成多條赞庶,每條包含數(shù)組中的一個(gè)值。 -
$sample
隨機(jī)選擇從起輸入指定數(shù)量的文檔巡通,若大于或等于5%的collection的文檔尘执,$sample進(jìn)行收集掃描并排序隨后選擇頂部文件。因此$sample在收集階段是受排序的內(nèi)存限制宴凉。 -
$sort
將輸入文檔排序后輸出 -
$geoNear
用于地理位置數(shù)據(jù)分析 -
$out
必須為pipeline最后一個(gè)階段管道誊锭,將最后計(jì)算結(jié)果寫入到指定collection中。 -
$indexStats
返回?cái)?shù)據(jù)集合的每個(gè)索引使用情況 -
$group
將集合中的文檔分組弥锄,可用于統(tǒng)計(jì)結(jié)果丧靡,$group 首先將數(shù)據(jù)根據(jù) key 進(jìn)行分組。
$match
篩選條件籽暇,過濾不滿足條件的文檔温治,可使用常規(guī)查詢操作符。
db.users.aggregate( {$match: {'age':{$gte:18}} } )
$project
- 用于包含戒悠、排除字段熬荆,設(shè)置需查詢或過濾的字段,0為過濾掉字段不顯示绸狐,1為需查詢的字段卤恳。
- 用于對(duì)字段重命名
- 投射中可使用表達(dá)式
db.users.aggregate(
{$match: {age: {$gte: 18}}},
{$project: {_id:0, username:1, created_at:1}}
)
// $project 當(dāng)字段值為0或1時(shí)用于過濾字段,當(dāng)鍵名為一個(gè)自定義的字符串寒矿,鍵值為$緊跟原字段表示要對(duì)該字段進(jìn)行重命名突琳。
db.users.aggregate(
{$match: {age: {$gte: 18}}},
{$project: {_id:0, username:1, nickname:$username }}
)
// 通過修改字段名達(dá)到生成字段副本,以便后續(xù)操作符使用符相。
db.users.aggregate(
{ $match: {age: {$gte: 18 } } },
{ $project: {id:$_id, username:1 } }
)
算術(shù)表達(dá)式可對(duì)數(shù)值運(yùn)算
db.users.aggregate(
// 對(duì) score 字段值加1后作為 scores 的值
{ $project: {scores: { $add : [$score,$score,1] } } },
)
db.users.aggregate(
// $subtract:[exp1, exp2] 數(shù)組中第一個(gè)元素減去第二個(gè)元素
{ $project: {scores: { $subtract: [$score,1] } } },
)
db.users.aggregate(
// $multiply:[exp1, exp2] 數(shù)組中多個(gè)元素相乘
{ $project: {scores: { $multiply: [$score, 2, 5] } } },
)
db.users.aggregate(
// $divide:[exp1, exp2] 數(shù)組中第一個(gè)元素除以第二個(gè)元素
{ $project: {scores: { $divide: [$score, 2, 5] } } },
)
db.users.aggregate(
// $mod:[exp1, exp2] 數(shù)組中第一個(gè)元素除以第二個(gè)元素的余數(shù)
{ $project: {scores: { $mod: [$score, 2] } } },
)
字符串操作
db.users.aggregate(
// $substr:[exp, startOffset, numToReturn] 字符串截取
{ $project: {nickname: { $substr: [$nickname, 2, 6] } } },
)
db.users.aggregate(
// $concat:[exp1, exp2, exp3...] 字符串拼接拆融,將數(shù)組中多個(gè)元素拼接在一起
{ $project: {fullname: { $concat: [$firstname, $lastname] } } },
)
db.users.aggregate(
// $toLower:exp 字符串轉(zhuǎn)為小寫
{ $project: {nickname: { $toLower: $username } } },
)
db.users.aggregate(
// $toUpper:exp 字符串轉(zhuǎn)為大寫
{ $project: {nickname: { $toUpper: $username } } },
)
為所有文檔新增字段
db.users.update({}, {$set, {publish_at:new Date()} }, true, true)
日期表達(dá)式
db.users.aggregate(
{ $project: {
'year': { $year: $created_at },
'month': { $month: $created_at },
'dayOfMonth': { $dayOfMonth: $created_at },
'dayOfWeek': { $dayOfWeek: $created_at },
'dayOfYear': { $dayOfYear: $created_at },
'hour': { $hour: $created_at },
'minute': { $minute: $created_at },
'second': { $second: $created_at },
}
}
)
時(shí)間間隔(秒數(shù))
db.users.aggregate(
{
$project: {
'fasttime': {
$subtract: [ { $second: new Date() }, { $second: $created_at } ]
}
}
}
)
字符串比較
db.users.aggregate(
// $cmp:[exp1, exp2] 字符串比較,相同為0啊终,小于返回負(fù)數(shù)镜豹, 大于返回正數(shù)
{ $project: { result : { $cmp: [ $age, 18 ] } } }
)
db.users.aggregate(
// $strcasecmp:[exp1, exp2] 字符串比較,相同為0蓝牲,小于返回-1逛艰, 大于返回1
{ $project: { result : { $strcasecmp: [ $username, 'junchow' ] } } }
)
邏輯條件
db.users.aggregate(
// $eq 判斷表達(dá)式是否相等
{ $project: { result : {$eq: [$username, 'junchow' ] } } }
)
db.users.aggregate(
// $and [exp1, exp2...expn] 連接多條件,所有條件為真則表達(dá)式為真
{ $project: { result: { $and : [ {$eq : [$username:'junchow']}, {$gt:[$age : 18]} ] } } }
)
db.users.aggregate({
// $not exp 用于取反操作
$project: {result: {$not:{$eq:[$username, 'junchow']}} }
})
db.users.aggregate({
// $cond:[booleanExp, trueExp, falseExp] 三目運(yùn)算符
$project : { result: {$cond: [{$eq:[$username:'junchow']}, true, false] } }
})
db.users.aggregate({
// $ifNull:[exp, replacementExpr] 若條件為null則返回表達(dá)式值搞旭,若字段不存在時(shí)字段值為null
$project: { result: { $ifNull : [ $notExistField, 'not exist is null' ] } }
})
$group
$group分組使用_id指定要分組的鍵名散怖,用來自定義字段統(tǒng)計(jì)菇绵。
db.users.aggregate({
$match : { age: { $gte : 18 } }
},{
$group : { _id:$username, count:{$sum:1} }
});
// 多字段分組
db.users.aggregate({
$match: {age: {$gte:18} }},
$group: {_id:{username:$username, age:$ge}, 'count':{$sum:1} }
})
// $sum:val 對(duì)每個(gè)文檔加val求和
// $avg:val 對(duì)每個(gè)文檔求均值
db.users.aggregate({
$group: { _id:$username, count:{$avg:$age} }
})
db.users.aggregate({
$group: { _id:$username, count:{$max:$age} }
})
db.users.aggregate({
$group: {_id:$username, count:{$min:$age} }
})
// $first:val 獲取分組中首個(gè)
db.users.aggregate({
$group:{_id:$username, count:{$first: $age} }
})
db.users.aggregate({
$group:{_id:$username, count:{$last: $age} }
})
db.users.aggregate({
$group: {_id:$username, count:{$addToSet: $age} }
})
db.users.aggregate({
$group:{_id:$username, count:{$push: $age} }
})
聚合運(yùn)算
group
先選定分組所依據(jù)的鍵,而后將集合依據(jù)選定鍵值的不同分成若干組镇眷。然后可通過聚合每一組內(nèi)的文檔咬最,產(chǎn)生一個(gè)結(jié)果文檔。
group
不支持分片集群欠动,無法進(jìn)行分布式運(yùn)算(shard cluster)永乌。若需要支持分布式需使用aggregate
或mapReduce
。
db.collection.group(document)
{
# 分組字段
key:{key1, key2:1},
# 查詢條件
cond:{},
# 聚合函數(shù)
reduce:function(current, result){},
# 初始化
initial:{},
# 統(tǒng)計(jì)一組后的回調(diào)函數(shù)
finalize:function(){}
}
計(jì)算每個(gè)欄目下商品個(gè)數(shù)
SELECT COUNT(*) FROM goods GROUP BY category_id;
db.goods.group({
key:{category_id:1},
cond:{},//所有
reduce:function(current, result){//current對(duì)應(yīng)當(dāng)前行具伍,result對(duì)應(yīng)分組中的多行
result.total += 1;
},
initial:{total:0}
})
查看每個(gè)欄目下商品價(jià)格大于100的數(shù)量
SELECT category,goods_name FROM goods WHERE 1=1 AND price>100 GROUP BY category_id
db.goods.group({
key:{category_id:1},
cond:{price:{$gt:100}},
reduce:function(current,result){
result.count += 1;
},
initial:{count:0}
})
計(jì)算每個(gè)欄目下商品庫存量
SELECT category_id,SUM(store) FROM goods WHERE 1=1 AND GROUP BY category_id
db.goods.group({
key:{category_id:1},
cond:{},
initial:{sum:0},
reduce:function(current,result){
result += current.store;
}
});
獲取每個(gè)欄目下最貴的商品價(jià)格
SELECT catetory_id,MAX(price) FROM goods GROUP BY category_id
db.goods.group({
key:{category_id:1},
cond:{},
initial:{max:0},
reduce:function(current,result){
if(current.price > result.max){
result.max = current.price;
}
}
});
查詢每個(gè)欄目下商品的平均價(jià)格
SELECT category_id,AVERAGE(price) FROM goods GROUP BY category_id
db.goods.group({
key:{category_id:1},
cond:{},
reduce:function(current,result){
result.total += current.price;
result.count += 1;
},
initial:{total:0, count:0},//進(jìn)組result
finalize:functioin(result){//出組 result
result.average = result.total/result.count;
}
})
aggregate
aggregate聚合框架與sql對(duì)比
- $match WHERE
- $group GROUP BY
- $project SELECT
- $sort ORDER BY
- $limit LIMIT
- $sum SUM()
- $sum COUNT()
查詢每個(gè)欄目下商品數(shù)量
SELECT COUNT(*) FROM goods GROUP BY category_id
db.goods.aggregate([
{$group:{_id:'$category_id', count:{$sum:1}}},
{$project:{_id:0, category_id:'$category_id', count:'$count'}},
{$sort:{count:1}}
])
查詢每個(gè)欄目下價(jià)格大于100的商品個(gè)數(shù)
SELECT COUNT(*) FROM goods WHERE 1=1 AND price>100 GROUP BY category_id
db.goods.aggregate([
{$match:{price:{$gt:100}}},
{$group:{_id:'$category_id'}, count:{$sum:1}},
{$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
{$sort:{count:1}}
])
查詢每個(gè)欄目下價(jià)格大于100的商品個(gè)數(shù)翅雏,僅顯示個(gè)數(shù)大于3的。
SELECT category_id,COUNT(*) AS count WHERE 1=1 AND price>100 GROUP BY category_id HAVING count>3
db.goods.aggregate([
{$match:{price:{$gt:100}}},
{$group:{_id:'$category_id', count:{$sum:1}}},
{$match:{count:{$gt:3}}},
{$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
{$sort:{count:1}}
])
查看每個(gè)欄目下商品的庫存量
SELECT category_id,SUM(store) WHERE 1=1 GROUP BY category_id
db.goods.aggregate([
{$group:{_id:'$category_id', count:{$sum:1}, total:{$sum:'$store'}}},
{$project:{_id:0, category_id:'$_id.category_id', count:'$count', total:'$total'}},
{$sort:{total:1}}
])
查詢每個(gè)欄目下商品的平均價(jià)格并升序排序
SELECT category_id,AVG(price) AS avg FROM goods GROUP BY category_id ORDER BY avg ASC
db.goods.aggregate([
{$group:{_id:'$category_id', count:{$sum:1}, average:{$avg:'$price'}}},
{$sort:average:-1}
])
mapReduce
mapReduce是一個(gè)輕松并行化到多臺(tái)服務(wù)器的聚合方法人芽,它會(huì)拆分問題望几,再將各個(gè)部分發(fā)送到不同機(jī)器上,讓每臺(tái)機(jī)器都完成一部分萤厅。當(dāng)所有機(jī)器都完成后橄抹,再將結(jié)果匯集起來形成最終完整的結(jié)果。
mapReduce最開始是映射(map)惕味,將操作映射到集合中的每個(gè)文檔楼誓。這個(gè)操作要么無作為,要么產(chǎn)生一些鍵和x個(gè)值名挥。接著進(jìn)入中間環(huán)節(jié)(洗牌shuffle)疟羹,按照分組并將產(chǎn)生的鍵值組成列表放到對(duì)應(yīng)的鍵中≠骶螅化簡(jiǎn)(reduce)則把列表中的值化簡(jiǎn)成一個(gè)單值榄融。這個(gè)值被返回,然后接著進(jìn)行洗牌蹋艺,直到每個(gè)鍵的列表只有一個(gè)值為止剃袍,這個(gè)值就是最后的結(jié)果黄刚。
mapReduce的代價(jià)是速度捎谨,group不是很快,mapReduce更慢憔维,絕不要用在實(shí)時(shí)環(huán)境中涛救,要作為后臺(tái)任務(wù)運(yùn)行,將創(chuàng)建一個(gè)保存結(jié)果的結(jié)合业扒,可對(duì)這個(gè)集合進(jìn)行實(shí)時(shí)查詢检吆。
mapReduce的工作過程
- map 映射
現(xiàn)將同一個(gè)組的數(shù)據(jù)映射到一個(gè)文檔(數(shù)組)上,在映射環(huán)節(jié)想要得到文檔中每個(gè)鍵程储,map()使用emit()返回要處理的值蹭沛。edit()會(huì)給mapReduce一個(gè)鍵和一個(gè)值臂寝,鍵類似group所使用鍵key。 - reduce 歸約
將數(shù)組(同一組)數(shù)據(jù)進(jìn)行運(yùn)算摊灭,reduce()由兩個(gè)參數(shù)咆贬,一個(gè)是key也就是emit()返回的第一個(gè)值,另一個(gè)是數(shù)組帚呼,由一個(gè)或多個(gè)對(duì)應(yīng)于鍵的文檔構(gòu)成掏缎。reduce()一定要能夠被反復(fù)調(diào)用,不論是映射環(huán)節(jié)還是前一個(gè)簡(jiǎn)化環(huán)節(jié)煤杀。
mapReduce語法
db.runCommand({
mapreduce:字符串,集合名
map:函數(shù),
reduce:函數(shù),
[query:文檔眷蜈,發(fā)往map()前先給過渡的文檔],
[sort:文檔,發(fā)往map()前先給文檔排序],
[limit:整數(shù)沈自,發(fā)往map()的文檔數(shù)量上限],
[out:字符串酌儒,統(tǒng)計(jì)結(jié)果保存的集合],
[keeptemp:布爾值,鏈接關(guān)閉時(shí)臨時(shí)結(jié)果集合是否保存],
[finalize:函數(shù)酥泛,將reduce的結(jié)果發(fā)給此函數(shù)做最后處理],
[scope:文檔今豆,js代碼中要用到的變量],
//jsMode=true時(shí) BSON>JS>map>reduce>BSON
//jsMode=false時(shí) BSON>JS>map>BSON>JS>reduce>BSON,可處理非常大的mapreduce
[jsMode:布爾值柔袁,是否減少執(zhí)行過程中BSON和JS的轉(zhuǎn)換呆躲,默認(rèn)為true],
[verbose:布爾值捶索,是否產(chǎn)生更加詳細(xì)的服務(wù)器日期插掂,默認(rèn)為true]
})
MongoDB沒有模式,所以并不曉得每個(gè)文檔由多少個(gè)鍵腥例,通常找到集合的所有鍵的最好方法就是用MapReduce辅甥。
查詢結(jié)合中所有鍵
var map = function(){
for(var k in this){ //this當(dāng)前映射文檔的引用
emit(k, {count:1});//將文檔某個(gè)鍵的計(jì)數(shù)返回
}
}
var reduce = function(key,emits){
var total = 0;
for(var k in emits){
}
}
計(jì)算每個(gè)欄目下商品庫存總量
SELECT category_id,SUM(store) AS sum FROM goods GROUP BY category_id
var map = function(){
emit(this.category_id, this.store);//獲得欄目下商品的庫存量
};
var reduce = function(key,store){
return Array.sum(store);
}
db.goods.mapReduce(map,reduce,query,{out:'result'});
查詢每個(gè)欄目下商品的平均價(jià)格
var map = function(){
emit(this.category_id, this.price);
}
var reduce = function(category_id,price){
return Array.avg(price);
}
db.goods.mapReduce(map, reduce, {out:'result'})
將MongoDB組成的shard分片集群把地震數(shù)據(jù)分布到各節(jié)點(diǎn)上,將中國區(qū)域按10個(gè)經(jīng)度10個(gè)維度為一組約30塊燎竖,并用mapReduce計(jì)算地震數(shù)據(jù)璃弄,統(tǒng)計(jì)每組上每月的地震次數(shù)及地震級(jí)別。分析出結(jié)果把地震高發(fā)區(qū)用偏紅色標(biāo)注构回,低發(fā)區(qū)偏綠標(biāo)注夏块。