1. 說明
在項目引入了阿里druid數據源以后,可以設置監(jiān)控頁面,阿里druid自帶的監(jiān)控的功能枢希,頁面大概如下圖所示丐枉,可以看到一些sql信息,比如sql語句髓霞,sql時間等等卦睹。
但是這個是有一定缺陷的,比如它采集的sql是沒有限制的方库,會把所有的sql拿出來结序,當sql數量非常多時,很難在這個頁面中找到你需要的sql纵潦,同時它的sql統(tǒng)計也沒有存儲徐鹤,數據會不斷的丟失。此外邀层,如果在大數據環(huán)境里返敬,sql生成的速度非常快時被济,這個頁面也許會出現卡死等情況救赐。
2. 內容
本文主要包含的內容是將druid采集到sql數據進行篩選剔除,只保留與查詢相關的慢sql只磷,并進行持久化经磅,以日志的形式保存在日志文件中。
同時利用日志文件钮追,在elk環(huán)境中预厌,不斷的顯示慢sql。大概如下圖所示元媚。
3. 持久化sql到日志文件轧叽,方式一
3.1 配置文件中開啟了stat
spring.datasource.filters: stat,wall,log4j
3.2 準備一個Slf4jLogFilter的Bean,并設置到datasource中
@Bean
public Slf4jLogFilter logFilter(){
Slf4jLogFilter filter = new Slf4jLogFilter();
filter.setResultSetLogEnabled(false);
filter.setConnectionLogEnabled(false);
filter.setStatementParameterClearLogEnable(false);
filter.setStatementCreateAfterLogEnabled(false);
filter.setStatementCloseAfterLogEnabled(false);
filter.setStatementParameterSetLogEnabled(false);
filter.setStatementPrepareAfterLogEnabled(false);
return filter;
}
List list= new ArrayList<Filter>(){{add(logFilter);}};
druidDataSource.setProxyFilters(list);
如果有多個數據源苗沧,那么在想要統(tǒng)計的數據源中都需要setProxyFilters。
3.3 配置logback-spring.xml文件
<appender name="druidlog" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>
druidlog/%d{yyyy-MM-dd}.log
</FileNamePattern>
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<logger name="druid" level="DEBUG">
<appender-ref ref="druidlog"/>
</logger>
運行項目跑數據炭晒,就可以看到相應的日志文件如下:
{conn-10001} connected
{conn-10001} pool-connect
{conn-10001, pstmt-20000} created. select * from ACT_GE_PROPERTY where NAME_ = ?
{conn-10001, pstmt-20000} Parameters : [schema.version]
{conn-10001, pstmt-20000} Types : [VARCHAR]
{conn-10001, pstmt-20000} executed. 43.613649 millis. select * from ACT_GE_PROPERTY where NAME_ = ?
{conn-10001, pstmt-20000} executed. 43.613649 millis. select * from ACT_GE_PROPERTY where NAME_ = ?
{conn-10001, pstmt-20000, rs-50000} open
{conn-10001, pstmt-20000, rs-50000} Header: [NAME_, VALUE_, REV_]
{conn-10001, pstmt-20000, rs-50000} Result: [schema.version, 5.22.0.0, 1]
{conn-10001, pstmt-20000, rs-50000} closed
{conn-10001, pstmt-20000} clearParameters.
{conn-10001} pool-recycle
{conn-10001} pool-connect
{conn-10001} pool-recycle
{conn-10001} pool-connect
{conn-10001, pstmt-20001} created. select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task
order by id desc
{conn-10001, pstmt-20001} Parameters : []
{conn-10001, pstmt-20001} Types : []
{conn-10001, pstmt-20001} executed. 6.238774 millis. select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task
order by id desc
{conn-10001, pstmt-20001} executed. 6.238774 millis. select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task
order by id desc
{conn-10001, pstmt-20001, rs-50001} open
{conn-10001, pstmt-20001, rs-50001} Header: [id, cron_expression, method_name, is_concurrent, description, update_by, bean_class, create_date, job_status, job_group, update_date, create_by, spring_bean, job_name]
{conn-10001, pstmt-20001, rs-50001} Result: [2, 0/10 * * * * ?, run1, 1, , 4028ea815a3d2a8c015a3d2f8d2a0002, com.bootdo.common.task.WelcomeJob, 2017-05-19 18:30:56.0, 0, group1, 2017-05-19 18:31:07.0, null, , welcomJob]
{conn-10001, pstmt-20001, rs-50001} closed
{conn-10001, pstmt-20001} clearParameters.
{conn-10001} pool-recycle
{conn-10001} pool-connect
{conn-10001, pstmt-20002} created. select `cid`,`title`,`slug`,`created`,`modified`,`type`,`tags`,`categories`,`hits`,`comments_num`,`allow_comment`,`allow_ping`,`allow_feed`,`status`,`author`,`gtm_create`,`gtm_modified` from blog_content
WHERE type = ?
order by cid desc
limit 0, 10
{conn-10001, pstmt-20002} Parameters : [article]
但是上面的日志文件有一些問題待逞,比如記錄的內容比較亂,我們希望它可以一行記錄一條sql网严。那么就需要去捕獲它的sql识樱,然后自己格式化輸出到日志。
3.4 捕獲sql震束,格式化輸出
- 首先需要增加如下配置
datasource.setTimeBetweenLogStatsMillis(3000);
- 然后構造StatLogger怜庸,并設置給datasource
@Component
public class MyStatLogger extends DruidDataSourceStatLoggerAdapter implements DruidDataSourceStatLogger {
private static Logger logger = LoggerFactory.getLogger("druid");
@Override
public void log(DruidDataSourceStatValue statValue) {
if (statValue.getSqlList().size() > 0) {
for (JdbcSqlStatValue sqlStat : statValue.getSqlList()) {
Map<String, Object> sqlStatMap = new LinkedHashMap<String, Object>();
String sql;
sql = sqlStat.getSql().replace("\t", "");
sql = sql.replace("\n", "");
sqlStatMap.put("sql", sql);
if (sqlStat.getExecuteCount() > 0) {
sqlStatMap.put("executeCount", sqlStat.getExecuteCount());
sqlStatMap.put("executeMillisMax", sqlStat.getExecuteMillisMax());
sqlStatMap.put("executeMillisTotal", sqlStat.getExecuteMillisTotal());
sqlStatMap.put("createtime", LocalDateTime.now());
sqlStatMap.put("systemName", "JCPT");
}
logger.error(sqlStatMap.toString());
}
}
}
}
- 提升log輸出級別到ERROR
<appender name="druidlog" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>
druidlog/%d{yyyy-MM-dd}.log
</FileNamePattern>
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<logger name="druid" level="ERROR">
<appender-ref ref="druidlog"/>
</logger>
再次運行代碼,查看日志輸出內容垢村,會如下:
{sql=select * from ACT_GE_PROPERTY where NAME_ = ?, executeCount=1, executeMillisMax=45, executeMillisTotal=45, createtime=2019-08-02T11:42:44.106, systemName=SMRZ}
{sql=select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task order by id desc, executeCount=1, executeMillisMax=6, executeMillisTotal=6, createtime=2019-08-02T11:42:50.107, systemName=SMRZ}
{sql=select `cid`,`title`,`slug`,`created`,`modified`,`type`,`tags`,`categories`,`hits`,`comments_num`,`allow_comment`,`allow_ping`,`allow_feed`,`status`,`author`,`gtm_create`,`gtm_modified` from blog_content WHERE type = ? order by cid desc limit 0, 10, executeCount=1, executeMillisMax=40, executeMillisTotal=40, createtime=2019-08-02T11:43:05.110, systemName=SMRZ}
{sql=select count(*) from blog_content WHERE type = ?, executeCount=1, executeMillisMax=5, executeMillisTotal=5, createtime=2019-08-02T11:43:05.114, systemName=SMRZ}
{sql=select `user_id`,`username`,`name`,`password`,`dept_id`,`email`,`mobile`,`status`,`user_id_create`,`gmt_create`,`gmt_modified`,`sex`,`birth`,`pic_id`,`live_address`,`hobby`,`province`,`city`,`district` from sys_user WHERE username = ? order by user_id desc, executeCount=1, executeMillisMax=69, executeMillisTotal=69, createtime=2019-08-02T11:43:08.114, systemName=SMRZ}
{sql=insert into sys_log(`user_id`, `username`, `operation`, `time`, `method`, `params`, `ip`, `gmt_create`)values(?, ?, ?, ?, ?, ?, ?, ?), executeCount=1, executeMillisMax=300, executeMillisTotal=300, createtime=2019-08-02T11:43:08.114, systemName=SMRZ}
{sql=select distinctm.menu_id , parent_id, name, url,perms,`type`,icon,order_num,gmt_create, gmt_modifiedfrom sys_menu mleftjoin sys_role_menu rm on m.menu_id = rm.menu_id left joinsys_user_role uron rm.role_id =ur.role_id where ur.user_id = ?andm.type in(0,1)order bym.order_num}
{sql=insert into sys_log(`user_id`, `username`, `operation`, `time`, `method`, `params`, `ip`, `gmt_create`)values(?, ?, ?, ?, ?, ?, ?, ?), executeCount=1, executeMillisMax=42, executeMillisTotal=42, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select distinctm.menu_id , parent_id, name, url,perms,`type`,icon,order_num,gmt_create, gmt_modifiedfrom sys_menu mleftjoin sys_role_menu rm on m.menu_id = rm.menu_id left joinsys_user_role uron rm.role_id =ur.role_id where ur.user_id = ?andm.type in(0,1)order bym.order_num, executeCount=1, executeMillisMax=278, executeMillisTotal=278, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select `id`,`type`,`url`,`create_date` from sys_file where id = ?, executeCount=1, executeMillisMax=28, executeMillisTotal=28, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select DISTINCTn.id ,`type`,`title`,`content`,`files`,r.is_read,`status`,`create_by`,`create_date`,`update_by`,`update_date`,`remarks`,`del_flag`from oa_notify_record r right JOIN oa_notify n on r.notify_id = n.id WHERE r.is_read = ? and r.user_id = ? order by is_read ASC, update_date DESC limit ?, ?, executeCount=1, executeMillisMax=41, executeMillisTotal=41, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select count(*)fromoa_notify_record r right JOIN oa_notify n on r.notify_id= n.id wherer.user_id =? andr.is_read = ?, executeCount=1, executeMillisMax=7, executeMillisTotal=7, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select table_name tableName, engine, table_comment tableComment, create_time createTime from information_schema.tables where table_schema = (select database()), executeCount=1, executeMillisMax=7, executeMillisTotal=7, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task order by id desc limit ?, ?, executeCount=1, executeMillisMax=5, executeMillisTotal=5, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
這里的操作思路大概是用我們自己輸出的日志替代druid輸出的日志割疾,同時提升了日志輸出級別,屏蔽了druid輸出的日志嘉栓,那么就只剩我們自己要輸出的日志
3. 持久化sql到日志文件宏榕,方式二
上面方式一輸出的日志有一個問題,輸出的sql日志沒有參數胸懈,所有的參數都以担扑?代替恰响,我們希望能夠輸出帶參數的日志趣钱。還有一個問題,使用這種方式的話胚宦,原來druid的sql統(tǒng)計頁面就采集不到sql了首有,我們希望在采集日志到log文件的同時不影響其自身的統(tǒng)計頁面。
3.1 思路
在druid中Slf4jLogFilter提供了一個參數可以用來設置sql輸出帶參數:
filter.setStatementExecutableSqlLogEnable(true);
但是這個設置只對其自身打印的sql有效枢劝,對我們重寫的StatLogger無效井联,那么只能繼續(xù)使用自身的sql打印,但是我們需要重寫druid的部分源代碼您旁,更改sql輸出格式烙常,和限制只輸出慢sql。
3.2 步驟
- 將druidDataSource.setStatLogger(myStatLogger)剔除
@Bean(name="mysqlDataSource")
@Qualifier("mysqlDataSource")
@ConfigurationProperties(prefix ="spring.datasource-mysql")
@Primary
public DataSource mysqlDataSource(){
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setConnectionProperties(connectionProperties.replace("${publicKey}", publicKey));
List list= new ArrayList<Filter>(){{add(logFilter);}};
druidDataSource.setProxyFilters(list);
return druidDataSource;
}
- 取消配置
datasource.setTimeBetweenLogStatsMillis(3000);
- 繼承Slf4jLogFilter
package com.base.web.common.adapter;
import cn.hutool.core.date.DateUtil;
import com.alibaba.druid.filter.logging.Slf4jLogFilter;
import com.alibaba.druid.proxy.jdbc.CallableStatementProxy;
import com.alibaba.druid.proxy.jdbc.JdbcParameter;
import com.alibaba.druid.proxy.jdbc.PreparedStatementProxy;
import com.alibaba.druid.proxy.jdbc.StatementProxy;
import com.alibaba.druid.sql.SQLUtils;
import com.base.web.common.domain.DruidSqlLogBean;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* @author zhanglei
* @email ah.zhanglei4@aisino.com
* @date 2019/8/8
*/
public class DruidSlf4jLoggerFilterEx extends Slf4jLogFilter {
private static final Logger LOG = LoggerFactory
.getLogger(DruidSlf4jLoggerFilterEx.class);
private Logger statementLogger = LoggerFactory.getLogger(statementLoggerName);
private SQLUtils.FormatOption statementSqlFormatOption = new SQLUtils.FormatOption(false, true);
private static final String JCPT_NAME = "JCPT";
private static final int TIME_LINE = 3000;
private static final String SELECT_LOWER = "select";
private static final String SELECT_CAPITAL = "SELECT";
private void recordSql(String formattedSql,double millis,StatementProxy statement,String message){
DruidSqlLogBean druidSqlLogBean = new DruidSqlLogBean();
druidSqlLogBean.setDate(DateUtil.now());
druidSqlLogBean.setSql(formattedSql);
druidSqlLogBean.setMillisecond(millis);
druidSqlLogBean.setSystem(JCPT_NAME);
druidSqlLogBean.setMessage(message);
druidSqlLogBean.setConneid(String.valueOf(statement.getConnectionProxy().getId()));
druidSqlLogBean.setStmtid(stmtId(statement));
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
statementLog(gson.toJson(druidSqlLogBean));
}
private String stmtId(StatementProxy statement) {
StringBuffer buf = new StringBuffer();
if (statement instanceof CallableStatementProxy) {
buf.append("cstmt-");
} else if (statement instanceof PreparedStatementProxy) {
buf.append("pstmt-");
} else {
buf.append("stmt-");
}
buf.append(statement.getId());
return buf.toString();
}
@Override
protected void statementLog(String message) {
if(message.contains(JCPT_NAME)){
statementLogger.error(message);
}
}
@Override
protected void statementExecuteAfter(StatementProxy statement, String sql, boolean firstResult) {
if (!(sql.contains(SELECT_LOWER)||sql.contains(SELECT_CAPITAL))){
return;
}
statement.setLastExecuteTimeNano();
double nanos = statement.getLastExecuteTimeNano();
double millis = nanos / (1000 * 1000);
if (millis < TIME_LINE){
return;
}
int parametersSize = statement.getParametersSize();
List<Object> parameters = new ArrayList<Object>(parametersSize);
for (int i = 0; i < parametersSize; ++i) {
JdbcParameter jdbcParam = statement.getParameter(i);
parameters.add(jdbcParam != null
? jdbcParam.getValue()
: null);
}
String formattedSql = formatparamSql(sql,parametersSize,statement,parameters);
if (millis > TIME_LINE){
if (sql.contains(SELECT_LOWER)||sql.contains(SELECT_CAPITAL)){
formattedSql = formattedSql.replace("\t", " ");
formattedSql = formattedSql.replace("\n", " ");
recordSql(formattedSql,millis,statement,"");
}
}
}
private String formatparamSql(String sql, int parametersSize, StatementProxy statement, List<Object> parameters) {
String formattedSql = sql;
if (parametersSize>0){
String dbType = statement.getConnectionProxy().getDirectDataSource().getDbType();
formattedSql = SQLUtils.format(sql, dbType, parameters, this.statementSqlFormatOption);
}
return formattedSql;
}
}
上述代碼主要重寫了兩個方法鹤盒,一個是statementLog蚕脏,用來輸出log日志,一個是statementExecuteAfter侦锯,用來做sql參數添加驼鞭、sql過濾,sql時間計算尺碰,sql格式化等等挣棕。
- 構造bean
@Bean
public DruidSlf4jLoggerFilterEx logFilter() {
DruidSlf4jLoggerFilterEx filter = new DruidSlf4jLoggerFilterEx();
filter.setStatementExecutableSqlLogEnable(true);
filter.setStatementLogEnabled(true);
filter.setResultSetLogEnabled(false);
filter.setConnectionLogEnabled(false);
filter.setDataSourceLogEnabled(false);
filter.setStatementCreateAfterLogEnabled(false);
filter.setStatementPrepareAfterLogEnabled(false);
filter.setStatementPrepareCallAfterLogEnabled(false);
filter.setStatementExecuteAfterLogEnabled(false);
filter.setStatementExecuteQueryAfterLogEnabled(false);
filter.setStatementExecuteUpdateAfterLogEnabled(false);
filter.setStatementExecuteBatchAfterLogEnabled(false);
filter.setStatementCloseAfterLogEnabled(false);
filter.setStatementParameterSetLogEnabled(false);
filter.setStatementParameterClearLogEnable(false);
return filter;
}
運行代碼應該可以看到有參數译隘,并且格式化了的日志
{"sql":"select xxx from xxx where xxx = 'xxx' and xxx = xxx order by xxx desc limit 10","date":"2019-08-12 15:41:12","time":"3698.787206","system":"xxx","conneid":"110004","stmtid":"pstmt-120031"}
再看druid自帶的統(tǒng)計頁面,也已經可以正常顯示了洛心。
- 問題
如果上述步驟都做好了固耘,但是依然沒有效果,可以檢查有沒有下面的坑词身。
雖然設置了
filter.setStatementExecutableSqlLogEnable(true);
filter.setStatementLogEnabled(true);
但是本人在測試的時候玻驻,這里設置的參數并不起作用。
那么可以在DruidSlf4jLoggerFilterEx代碼中重寫如下代碼:
@Override
public boolean isStatementLogEnabled() {
return true;
}
@Override
public boolean isStatementExecutableSqlLogEnable() {
return true;
}
3.3 記錄錯誤的sql
如果想把錯誤的sql也記錄下來偿枕,需要再重寫一個druid的方法璧瞬,在DruidSlf4jLoggerFilterEx中:
@Override
protected void statement_executeErrorAfter(StatementProxy statement,
String sql, Throwable error) {
if (!this.isStatementLogErrorEnabled()) {
return;
}
if (!(sql.contains(SELECT_LOWER)||sql.contains(SELECT_CAPITAL))){
return;
}
int parametersSize = statement.getParametersSize();
List<Object> parameters = new ArrayList<Object>(parametersSize);
for (int i = 0; i < parametersSize; ++i) {
JdbcParameter jdbcParam = statement.getParameter(i);
parameters.add(jdbcParam != null
? jdbcParam.getValue()
: null);
}
String formattedSql = formatparamSql(sql,parametersSize,statement,parameters);
statement.setLastExecuteTimeNano();
double nanos = statement.getLastExecuteTimeNano();
double millis = nanos / (1000 * 1000);
formattedSql = formattedSql.replace("\t", " ");
formattedSql = formattedSql.replace("\n", " ");
String message = error.getLocalizedMessage();
message = message.replace("\\t", " ");
message = message.replace("\\n", " ");
recordSql(formattedSql,millis,statement,"ERROT_MESSAGE--"+message);
}
增加配置
到此,記錄sql日志完成渐夸。
4 elk操作
目前嗤锉,整個流程是
項目運行產生日志文件,filebeat讀取日志文件墓塌,發(fā)送給logstash瘟忱,logstash收到以后存儲在kafka上面,然后另一個logstash讀取kafka數據苫幢,之后存儲在elasticsearch上面访诱,最后kibana利用es的數據展示。
這里只介紹filebeat的相關操作
-
log文件地址配置(/etc/filebeat/filebeat.yml)
-
輸出logstash地址配置(/etc/filebeat/filebeat.yml)
運行
nohup ./filebeat -e -c /etc/filebeat/filebeat.yml -d "publish" &查看運行l(wèi)og
/var/log/filebeat/filebeat