最近線上數(shù)據(jù)庫(kù)遷到haproxy上番枚,突然出現(xiàn)了很多數(shù)據(jù)庫(kù)連接失敗的錯(cuò)誤霎烙,經(jīng)過排查是因?yàn)槲覀兪褂昧薓ysql的ReplicationDriver,數(shù)據(jù)庫(kù)連接池使用的是druid, 而druid針對(duì)Mysql Replication 連接的檢查實(shí)現(xiàn)上有個(gè)bug ,導(dǎo)致的凭峡。當(dāng)我正準(zhǔn)備提issue的時(shí)候搂漠,發(fā)現(xiàn)很多人都遇到了這個(gè)問題耀怜,所以想寫遍文章記錄一下作瞄。話不多說(shuō)茶宵,下面我們來(lái)正式復(fù)盤一下這個(gè)問題和定位解決的過程。
之前公司的DBA新上線了一套HAProxy用來(lái)替代之前的VIP的高可用法案宗挥,所以我們也將從庫(kù)的連接從VIP遷到了HAProxy.但是上線后不久我們發(fā)現(xiàn)線上開始出現(xiàn)數(shù)據(jù)庫(kù)連接錯(cuò)誤乌庶,而且凌晨的時(shí)候最多。
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
相信有一點(diǎn)經(jīng)驗(yàn)的java開發(fā)對(duì)這個(gè)錯(cuò)誤并不陌生契耿,導(dǎo)致這個(gè)問題的原因也非常好定位瞒大,無(wú)非是客戶端與數(shù)據(jù)庫(kù)的連接被服務(wù)端主動(dòng)斷開,而客戶端還傻傻的用這個(gè)已經(jīng)被斷開的連接去請(qǐng)求數(shù)據(jù)庫(kù)搪桂,導(dǎo)致失敗透敌。
我們知道Mysql有一個(gè)wait_timeout的配置會(huì)自動(dòng)將空閑時(shí)間超過這個(gè)值(通常為8小時(shí))的連接斷開,因?yàn)槲锢頂?shù)據(jù)庫(kù)的配置并沒有變動(dòng)踢械,而且代碼中連接池中的配置的最小空閑時(shí)間遠(yuǎn)小于數(shù)據(jù)庫(kù)wait_timeout酗电,所以這個(gè)假設(shè)首先被排除。
那么會(huì)不會(huì)是HAProxy將我們的連接給斷開了呢(很有可能) 内列? 于是我們查看了HAProxy的配置
timeout connect:60s // 定義haproxy將客戶端請(qǐng)求轉(zhuǎn)發(fā)至后端服務(wù)器所等待的超時(shí)時(shí)長(zhǎng)
timeout client: 120s //客戶端非活動(dòng)狀態(tài)的超時(shí)時(shí)長(zhǎng)
timeout server: 120s //客戶端與服務(wù)器端建立連接后撵术,等待服務(wù)器端的超時(shí)時(shí)長(zhǎng)
發(fā)現(xiàn)HAProxy會(huì)主動(dòng)將空閑時(shí)間時(shí)間超過1分鐘的連接斷開,于是我們修改了druid的配置话瞧,將數(shù)據(jù)庫(kù)空閑驗(yàn)證的時(shí)間修改為timeBetweenEvictionRunsMillis修改為50s(原來(lái)是60s,考慮到極限情況如果設(shè)置為60s的話依然會(huì)存在不能蹦塾耄活的情況),但是經(jīng)過測(cè)試后我們發(fā)現(xiàn)問題依然存在移稳。
既然將痹棠桑活時(shí)間設(shè)置到了60s以內(nèi)為什么還會(huì)出現(xiàn)連接被斷開呢?稍微思考一下个粱,可能的原因不外乎只有兩個(gè)
- HAProxy維護(hù)的連接有問題
- 保活策略沒有生效
順著這個(gè)思路我們首先排除了HAProxy的問題(簡(jiǎn)單來(lái)說(shuō)就是HAProxy會(huì)保持客戶端和服務(wù)器的會(huì)話翻翩,保證客戶端到HAProxy的連接和HAProxy到服務(wù)器的連接是一致的都许,他們的空閑時(shí)間始終是一樣的) 那么我們?cè)賮?lái)看看是不是钡巨保活策略沒有生效呢?我們目前用的是druid來(lái)管理我們的數(shù)據(jù)庫(kù)連接池 胶征, 要弄清這個(gè)問題塞椎,我們得先看看druid是進(jìn)行工作及怎么進(jìn)行活性檢測(cè)的。
上面這張圖表示了druid在獲取線程池的大致的邏輯過程:druid在初始化時(shí)會(huì)創(chuàng)建兩個(gè)守護(hù)線程睛低,分別承擔(dān)線程的創(chuàng)建和銷毀任務(wù)案狠,當(dāng)用戶線程出現(xiàn)等待獲取線程的操作時(shí)(且線程池中的線程數(shù)不大于最大活動(dòng)線程數(shù)),創(chuàng)建線程會(huì)自動(dòng)創(chuàng)建新的連接并放到線程池中钱雷,所以當(dāng)用戶線程需要新的連接時(shí)骂铁,只需要直接從線程池獲取即可。用戶線程從線程池中獲取到連接會(huì)根據(jù)用戶的配置決定是否線程進(jìn)行有效性驗(yàn)證罩抗,如果驗(yàn)證線程有效則返回線程拉庵,如果無(wú)效則將該連接關(guān)閉,(DestoryConnectionThread自動(dòng)回收已關(guān)閉的連接)套蒂,然后嘗試重新從連接池中獲取連接钞支,知道獲取到有效連接為止并返回連接。下面我們來(lái)看看代碼的具體實(shí)現(xiàn)操刀。
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (;;) { // 死循環(huán)烁挟,直到獲取到有效連接為止,依賴CreateConnectionThread 保證 骨坑,連接池中始終有有效連接資源
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try {
poolableConnection = getConnectionInternal(maxWaitMillis); // 從連接池中獲取連接
} catch (GetConnectionTimeoutException ex) {
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {
LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
if (isTestOnBorrow()) {
// 撼嗓。。卡啰。這里不討論 省略
} else {
Connection realConnection = poolableConnection.getConnection();
if (realConnection.isClosed()) {
discardConnection(null); // 傳入null静稻,避免重復(fù)關(guān)閉
continue; // 如果連接已經(jīng)關(guān)閉,丟棄匈辱,嘗試重新獲取新的連接
}
if (isTestWhileIdle()) { // 驗(yàn)證空閑連接有效性的配置 testWhileIdle = true
final long currentTimeMillis = System.currentTimeMillis();
final long lastActiveTimeMillis = poolableConnection.getConnectionHolder().getLastActiveTimeMillis();
final long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.getTimeBetweenEvictionRunsMillis();
if (timeBetweenEvictionRunsMillis <= 0) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
// 當(dāng)前連接的空閑時(shí)間是否大于timeBetweenEvictionRunsMillis振湾,如果大于才檢測(cè),否則跳過
if (idleMillis >= timeBetweenEvictionRunsMillis) {
boolean validate = testConnectionInternal(poolableConnection.getConnection());
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(realConnection); //丟棄連接
continue; // 嘗試重新獲取新的連接
}
}
}
}
...
return poolableConnection;
}
}
protected boolean testConnectionInternal(Connection conn) {
...
try {
if (validConnectionChecker != null) {
// 檢查連接有效性
return validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);
}
if (conn.isClosed()) {
return false;
}
if (null == validationQuery) {
return true;
}
Statement stmt = null;
ResultSet rset = null;
try {
stmt = conn.createStatement();
if (getValidationQueryTimeout() > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rset = stmt.executeQuery(validationQuery);
if (!rset.next()) {
return false;
}
} finally {
JdbcUtils.close(rset);
JdbcUtils.close(stmt);
}
return true;
} catch (Exception ex) {
// skip
return false;
} finally {
if (sqlFile != null) {
JdbcSqlStat.setContextSqlFile(sqlFile);
}
if (sqlName != null) {
JdbcSqlStat.setContextSqlName(sqlName);
}
}
}
public class MySqlValidConnectionChecker extends ValidConnectionCheckerAdapter implements ValidConnectionChecker, Serializable {
public static final int DEFAULT_VALIDATION_QUERY_TIMEOUT = 1000;
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(MySqlValidConnectionChecker.class);
private Class<?> clazz;
private Method ping;
// 是否使用ping 檢測(cè)連接的有效性亡脸,如果為false 則使用select 1 查詢
private boolean usePingMethod = false;
public MySqlValidConnectionChecker(){
try {
clazz = Utils.loadClass("com.mysql.jdbc.MySQLConnection");
if (clazz == null) {
clazz = Utils.loadClass("com.mysql.cj.jdbc.ConnectionImpl");
}
if (clazz != null) {
ping = clazz.getMethod("pingInternal", boolean.class, int.class);
}
if (ping != null) {
usePingMethod = true;
}
} catch (Exception e) {
LOG.warn("Cannot resolve com.mysql.jdbc.Connection.ping method. Will use 'SELECT 1' instead.", e);
}
configFromProperties(System.getProperties());
}
public boolean isValidConnection(Connection conn, String validateQuery, int validationQueryTimeout) throws Exception {
if (conn.isClosed()) {
return false;
}
if (usePingMethod) {
if (conn instanceof DruidPooledConnection) {
conn = ((DruidPooledConnection) conn).getConnection();
}
if (conn instanceof ConnectionProxy) {
conn = ((ConnectionProxy) conn).getRawObject();
}
// 當(dāng)前的conn 是否是 com.mysql.jdbc.MySQLConnection (or com.mysql.cj.jdbc.ConnectionImpl)
if (clazz.isAssignableFrom(conn.getClass())) {
if (validationQueryTimeout < 0) {
validationQueryTimeout = DEFAULT_VALIDATION_QUERY_TIMEOUT;
}
//使用反射調(diào)用MySQLConnection.pingInternal 方法押搪,檢查連接有效性,并且會(huì)刷新連接的空閑時(shí)間
// 如果失敗則會(huì)拋出異常浅碾,上層捕獲
ping.invoke(conn, true, validationQueryTimeout * 1000);
return true;
}
}
// 當(dāng)usePingMethod=false或者 conn 不是 com.mysql.jdbc.MySQLConnection (or com.mysql.cj.jdbc.ConnectionImpl) 會(huì)執(zhí)行一下方法
if (validateQuery == null || validateQuery.length() == 0) {
return true;
}
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
if (validationQueryTimeout > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
// 執(zhí)行 select 1 大州,并且會(huì)刷新連接的空閑時(shí)間
// 如果失敗則會(huì)拋出異常,上層捕獲
rs = stmt.executeQuery(validateQuery);
return true;
} finally {
JdbcUtils.close(rs);
JdbcUtils.close(stmt);
}
}
...
}
通過調(diào)試垂谢,我發(fā)現(xiàn)clazz.isAssignableFrom(conn.getClass())
為false,也就是說(shuō)這里的conn并不是com.mysql.jdbc.MySQLConnection
厦画,原來(lái)為了DB的讀寫分離項(xiàng)目使用的是數(shù)據(jù)庫(kù)驅(qū)動(dòng)是RepliationDriver而不是默認(rèn)的Driver(jdbc:mysql:replication://
), 因此使用的連接也是com.mysql.jdbc.ReplicationConnection
, 而ReplicationConnection直接繼承自 com.mysql.jdbc.Connection
并沒有繼承com.mysql.jdbc.MySQLConnection
(僅限于mysql-connection-java 1.5.38版本之前,而我們線上使用的是1.5.35)
com.mysql.jdbc.ReplicationConnection
在mysql-connector-java 1.5.38版本之前(ReplicationConnection源碼)的是public class ReplicationConnection implements Connection, PingTarget
5.1.35版本1.5.38([GitHub源碼]開始將
com.mysql.jdbc.ReplicationConnection
抽象成接口(直接繼承自com.mysql.jdbc.MySQLConnection
),并使用他的的子類com.mysql.jdbc.JDBC4ReplicationMySQLConnection
(JDBC4ReplicationMySQLConnection源碼)來(lái)實(shí)現(xiàn)根暑,com.mysql.jdbc.JDBC4ReplicationMySQLConnection
的內(nèi)部功能則是由新增的代理類com.mysql.jdbc.ReplicationConnectionProxy
來(lái)實(shí)現(xiàn)的 (ReplicationConnectionProxy實(shí)現(xiàn)了原本com.mysql.jdbc.ReplicationConnection
類實(shí)現(xiàn)的功能)
所以這里無(wú)論usePingMethod設(shè)置的值是什么力试,MySqlValidConnectionChecker都是執(zhí)行SELECT 1 操作. 下面我們來(lái)具體看一下執(zhí)行過程
先打開Mysql的日志 (當(dāng)然是本地開發(fā)環(huán)境嘍,我這里為了方便排嫌,主從庫(kù)配置的是同一個(gè)地址)
set global general_log = on;
我們會(huì)得到以下日志(這里我只保留了核心日志信息畸裳,并進(jìn)行了一些脫敏處理)
event_time thread_id command_type argument
2019-01-15T07:51:45.814624Z 18 Query SELECT 1 // 執(zhí)行select 1的線程ID是 18
2019-01-15T07:51:45.822731Z 19 Query select * from xxx_table where xxx_column = 'xxx' limit 0,1 // 實(shí)際執(zhí)行語(yǔ)句的線程ID是 19
我們很容易發(fā)現(xiàn)執(zhí)行檢查操作的線程和執(zhí)行業(yè)務(wù)查詢的線程是不一樣的,因此我們可以斷定檢查用的連接和執(zhí)行業(yè)務(wù)的數(shù)據(jù)庫(kù)連接不是同一個(gè)淳地,執(zhí)行業(yè)務(wù)操作的數(shù)據(jù)庫(kù)連接沒有辈篮活,空閑時(shí)間并沒有被刷新颇象,所以該連接一旦長(zhǎng)時(shí)間沒有訪問就會(huì)被斷開伍伤,導(dǎo)致出現(xiàn)連接不可用。
/**
* 夯到!這里省略了非相關(guān)代碼
* 簡(jiǎn)單來(lái)說(shuō)就是維護(hù)了兩個(gè)連接集合嚷缭,一個(gè)是兩個(gè)連接集合,一個(gè)包含主庫(kù)連接耍贾,一個(gè)包含一個(gè)或多個(gè)從庫(kù)連接阅爽, 當(dāng)readonly = true 是使用從庫(kù)連接,否則使用主庫(kù)連接
*
*/
public class ReplicationConnection implements Connection, PingTarget {
protected Connection currentConnection;
protected LoadBalancedConnection masterConnection;
protected LoadBalancedConnection slavesConnection;
private ReplicationConnectionGroup connectionGroup;
private boolean readOnly = false;
/**
* 當(dāng)設(shè)置readOnly時(shí)荐开,切換當(dāng)前連接
*/
public synchronized void setReadOnly(boolean readOnly) throws SQLException {
if (readOnly) {
if (this.currentConnection != this.slavesConnection) {
switchToSlavesConnection();
}
} else {
if (this.currentConnection != this.masterConnection) {
switchToMasterConnection();
}
}
this.readOnly = readOnly;
// allow master connection to be set to/from read-only if
// there are no slaves
if (this.currentConnection == this.masterConnection) {
this.currentConnection.setReadOnly(this.readOnly);
}
}
private synchronized void switchToMasterConnection() throws SQLException {
if (this.masterConnection == null || this.masterConnection.isClosed()) {
this.initializeMasterConnection();
}
swapConnections(this.masterConnection, this.slavesConnection);
this.masterConnection.setReadOnly(false);
}
private synchronized void switchToSlavesConnection() throws SQLException {
if (this.slavesConnection == null || this.slavesConnection.isClosed()) {
this.initializeSlaveConnection();
}
if (this.slavesConnection != null) {
swapConnections(this.slavesConnection, this.masterConnection);
this.slavesConnection.setReadOnly(true);
}
}
}
問題找到了付翁,解決方案也比較簡(jiǎn)單, druid的高版本已經(jīng)支持自定義ValidConnectionChecker
package com.hujiang.coupon.common.druid.check;
/**
* connection checker for Mysql Replication Driver (support Normal Driver as well)
*/
@Slf4j
public class MySqlReplicationValidConnectionChecker extends ValidConnectionCheckerAdapter implements ValidConnectionChecker, Serializable {
public static final int DEFAULT_VALIDATION_QUERY_TIMEOUT = 1;
public static final String DEFAULT_VALIDATION_QUERY = "SELECT 1";
private static final long serialVersionUID = 1L;
private boolean usePingMethod = false;
public MySqlReplicationValidConnectionChecker() {
configFromProperties(System.getProperties());
}
@Override
public void configFromProperties(Properties properties) {
String property = properties.getProperty("druid.mysql.usePingMethod");
if ("true".equals(property)) {
setUsePingMethod(true);
} else if ("false".equals(property)) {
setUsePingMethod(false);
}
}
public boolean isUsePingMethod() {
return usePingMethod;
}
public void setUsePingMethod(boolean usePingMethod) {
this.usePingMethod = usePingMethod;
}
@Override
public boolean isValidConnection(Connection conn, String validateQuery, int validationQueryTimeout) throws Exception {
if (conn.isClosed()) {
return false;
}
if (conn instanceof DruidPooledConnection) {
conn = ((DruidPooledConnection) conn).getConnection();
}
if (conn instanceof ConnectionProxy) {
conn = ((ConnectionProxy) conn).getRawObject();
}
if (usePingMethod) {
if (conn instanceof MySQLConnection) {
// 使用ping 方法替代原來(lái)的pingInternal 方法晃听,ReplicationMySQLConnection內(nèi)部實(shí)現(xiàn)中會(huì)同時(shí)執(zhí)行master和slave節(jié)點(diǎn)的ping操作
((MySQLConnection) conn).ping();
return true;
}
}
String query = validateQuery;
if (validateQuery == null || validateQuery.isEmpty()) {
query = DEFAULT_VALIDATION_QUERY;
}
if(conn instanceof ReplicationConnection){
select_1(((ReplicationConnection) conn).getMasterConnection(),query,validationQueryTimeout);
select_1(((ReplicationConnection) conn).getSlavesConnection(),query,validationQueryTimeout);
}else{
select_1(conn, query, validationQueryTimeout);
}
return true;
}
private void select_1(Connection conn, String validateQuery, int validationQueryTimeout) throws SQLException {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
if (validationQueryTimeout > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rs = stmt.executeQuery(validateQuery);
} finally {
JdbcUtils.close(rs);
JdbcUtils.close(stmt);
}
}
}
// 可以看到com.mysql.jdbc.ReplicationMySQLConnection 的ping()方法的實(shí)現(xiàn)
public class ReplicationMySQLConnection extends MultiHostMySQLConnection implements ReplicationConnection {
@Override
public synchronized void ping() throws SQLException {
Connection conn;
try {
if ((conn = getValidatedMasterConnection()) != null) {
conn.ping();
}
} catch (SQLException e) {
if (isMasterConnection()) {
throw e;
}
}
try {
if ((conn = getValidatedSlavesConnection()) != null) {
conn.ping();
}
} catch (SQLException e) {
if (!isMasterConnection()) {
throw e;
}
}
}
...
}