大量數(shù)據(jù)導(dǎo)出導(dǎo)致系統(tǒng)內(nèi)存溢出的解決辦法
在web開發(fā)中,我們經(jīng)逞ù担可能會(huì)遇到導(dǎo)出報(bào)表等統(tǒng)計(jì)功能氢惋,常規(guī)做法就是會(huì)從數(shù)據(jù)庫中拉取大量數(shù)據(jù),甚至有可能還會(huì)統(tǒng)計(jì)所有數(shù)據(jù)的一個(gè)總量座舍。當(dāng)我們一次性讀取所有數(shù)據(jù)到內(nèi)存中時(shí)沮翔,就極可能導(dǎo)致系統(tǒng)OOM。因?yàn)槲业暮笈_(tái)系統(tǒng)使用的是NodeJS的Nest框架曲秉,數(shù)據(jù)庫ORM使用的是Sequelize采蚀,下面就這種問題我總結(jié)一下處理的方法。
方法一承二、修改V8內(nèi)存大小
從《深入淺出NodeJS》中我們得知64位系統(tǒng)內(nèi)存限制約為1.4GB榆鼠,所以我們一次性講數(shù)據(jù)加載進(jìn)內(nèi)存中,就有可能超過這個(gè)限制亥鸠,導(dǎo)致OOM妆够。但是NodeJS提供了一個(gè)程序運(yùn)行參數(shù) --max-old-space-size
,可以通過該參數(shù)指定V8所占用的內(nèi)存空間负蚊,這樣可在一定程度上便面程序內(nèi)存溢出
方法二神妹、使用非V8內(nèi)存
這也是在項(xiàng)目中采用的方法。Buffer是一個(gè)NodeJS的擴(kuò)展對象家妆,使用底層的系統(tǒng)內(nèi)存灾螃,不占用V8內(nèi)存空間。與之相關(guān)的文件系統(tǒng)fs和流Stream流操作揩徊,都不會(huì)占用V8內(nèi)存腰鬼。下面我就流操作的解決方法嵌赠,詳細(xì)梳理一遍。
MySQL數(shù)據(jù)庫有流式讀取方式熄赡,不是一次性讀取所有數(shù)據(jù)到內(nèi)存姜挺,而是根據(jù)使用者的需要,部分的讀取數(shù)據(jù)彼硫。但是Sequelize這個(gè)ORM模型又不支持流式讀取炊豪,所以在系統(tǒng)中我們額外引入了支持流式查詢的Knex查詢構(gòu)造器和Mysql2驅(qū)動(dòng)程序。
1拧篮、構(gòu)建數(shù)據(jù)庫服務(wù)
import { Injectable } from "@nestjs/common";
import * as config from "config";
import { getLogger } from "log4js";
import * as Knex from "knex";
const logger = getLogger("Knex");
interface MysqlConfig {
name: string;
read: [
{
host: string,
port: number,
username: string,
password: string
}
];
}
const { name: database, read: [conf] } = config.get<MysqlConfig>("mysql");
const knex = Knex({
client: "mysql2",
pool: {
min: 1,
max: 10
},
connection: {
database,
port: conf.port,
host: conf.host,
user: conf.username,
password: conf.password,
decimalNumbers: true
}
});
@Injectable()
export class ReadService {
async* exec(sql, params, limit = 5000) {
// 讀取limit條數(shù)據(jù)词渤,通過yield返回?cái)?shù)據(jù)
// xxx...
}
}
2、創(chuàng)建Csv相關(guān)服務(wù)
import * as path from "path";
import * as convert from "iconv-lite";
import * as Achiver from "archiver";
import { getLogger } from "log4js";
import { createReadStream, promises as fs, writeFileSync } from "fs";
const logger = getLogger("ExportCsv");
export class Csv {
private readonly name: string;
private readonly path: string;
// 是否壓縮
private readonly compress: boolean;
constructor(name: string, columns: string[], rows: any[] = [], compress: boolean = false) {
this.compress = compress;
this.name = path.basename(name, ".csv");
this.path = path.format({
ext: ".csv",
root: process.cwd(),
name: Date.now() + this.name
});
let txt = columns.join(",");
if (Array.isArray(rows)) {
txt += Csv.parse(rows);
}
// 同步創(chuàng)建一個(gè)只包含字段名的文件
writeFileSync(this.path, convert.encode(txt, "GBK", { addBOM: true }));
}
// 轉(zhuǎn)換數(shù)據(jù)中一些特殊符號
private static parse(rows): string {
return "\r\n" + rows.map(r => {
return r
.map(x => {
if (typeof x === "string") {
x = x.replace(/,/g, "串绩,").replace(/\n/g, "-");
}
return x;
})
.join(",");
}
).join("\r\n");
}
// 在同一個(gè)文件中缺虐,異步新增數(shù)據(jù)
async append(rows: any[]) {
return fs.appendFile(this.path, convert.encode(Csv.parse(rows), "GBK", { addBOM: true }));
}
// 根據(jù)傳入的compress參數(shù)判斷,是否壓縮文件
// res也是一個(gè)流對象礁凡,所有可以進(jìn)行流的相關(guān)操作
async send(res) {
if (this.compress) {
// 返回壓縮后的文件
} else {
// 返回普通文件
}
}
// 監(jiān)控當(dāng)文件下載完畢后高氮,則刪除服務(wù)器生成的文件
private observe(res) {
return new Promise((resolve, reject) => {
res.on("end", () => {
resolve();
fs.unlink(this.path).catch(logger.error.bind(logger));
});
res.on("error", err => reject(err));
});
}
}
3、在控制器函數(shù)調(diào)用服務(wù)
// 用于統(tǒng)計(jì)總量的數(shù)據(jù)
const total = {
money: 0,
deposit: 0
};
// 查詢大量數(shù)據(jù)的SQL
const sql = `xxx`;
// 實(shí)例化Csv對象
const csv = new Csv(`訂單結(jié)算數(shù)據(jù)${start}至${end}.csv`, title, null, true);
// 執(zhí)行SQL顷牌,一次性讀取10000條數(shù)據(jù)剪芍,返回的是一個(gè)可遍歷的生成器
const reader = this.reader.exec(sql, params, 10000);
// 遍歷生成器,不停的向文件中寫數(shù)據(jù)
// 如果我們直接導(dǎo)出數(shù)據(jù)窟蓝,那么在csv.append方法中罪裹,直接返回[[1, 22, 33], [2, 44, 55]]
// 如果我們要根據(jù)所有數(shù)據(jù),進(jìn)行一個(gè)統(tǒng)計(jì)計(jì)算运挫,那么我們可以新寫一個(gè)parseRows方法來進(jìn)行統(tǒng)計(jì)
for await (const x of reader) {
await csv.append(await this.parseRows(x, { ems, sites, total }));
}
// 總量數(shù)據(jù)
const row = [
_.round(total.money, 2),
_.round(total.deposit, 2),
];
// 寫入文件
await csv.append([row]);
// 返回csv對象坊谁,在攔截器中處理
return csv;
4、攔截器中處理返回形式
import { Injectable, NestInterceptor, ExecutionContext, CallHandler } from "@nestjs/common";
import { map } from "rxjs/operators";
import { Csv } from "@shared/commons";
export interface Response<T> {
code: number;
data: T;
}
@Injectable()
export class FormatInterceptor<T> implements NestInterceptor<T, Response<T>> {
intercept(context: ExecutionContext, next: CallHandler) {
return next.handle().pipe(map(data => {
if (data instanceof Csv) {
// 處理數(shù)據(jù)導(dǎo)出數(shù)據(jù)格式
const res = context.switchToHttp().getResponse();
// send方法就是csv服務(wù)中的導(dǎo)出方法
return data.send(res);
}
// 處理常規(guī)API數(shù)據(jù)格式
const result: any = { code: 0, data: "success" };
if (data) {
if (data.rows) {
data.meta = data.rows;
data.total = data.count;
delete data.rows;
delete data.count;
}
result.data = data;
}
return result;
}));
}
}
自此整個(gè)流程:流式讀取數(shù)據(jù)->異步寫入文件->壓縮文件->返回文件前端下載->刪除生成的文件就結(jié)束了滑臊。這樣我們分步驟處理數(shù)據(jù),就不會(huì)因?yàn)閿?shù)據(jù)量過大導(dǎo)致內(nèi)存溢出了箍铲。部分方法中的代碼我沒有寫上來雇卷,如果有需要可以互相探討一下。