本系列主要關(guān)注安卓數(shù)據(jù)庫(kù)的線程行為泼返,分為四個(gè)部分:
(1)SQLiteOpenHelper的getReadableDatabase和getWritableDatabase
(2)SQLiteDatabase的實(shí)現(xiàn)以及多線程行為
(3)連接緩存池SQLiteConnectionPool
(4)SQLiteDatabase多線程實(shí)踐
本篇主要關(guān)注SQLiteConnectionPool
(連接池)在并發(fā)下的行為牢贸。
上文提到恭陡,SQLiteDatabase
會(huì)在每個(gè)線程中使用一個(gè)SQLiteSession
变勇,而SQLiteSession
會(huì)共用一個(gè)SQLiteConnectionPool
對(duì)象,并通過SQLiteConnectionPool
的acquireConnection
和releaseConnection
方法來獲取和釋放數(shù)據(jù)庫(kù)連接(一個(gè)SQLiteConnection
對(duì)象)止状。
public SQLiteConnection acquireConnection(String sql, int connectionFlags,
CancellationSignal cancellationSignal) {
return waitForConnection(sql, connectionFlags, cancellationSignal);//看名字是要等待什么鎖了
}
connectionFlags
是用SQLiteDatabase.getThreadDefaultConnectionFlags
的返回值一路傳下來的吱殉,這個(gè)方法在前文討論過這個(gè)方法,會(huì)記錄兩件事:1.數(shù)據(jù)庫(kù)是只讀還是可寫珊拼;2.當(dāng)前是否主線程食呻。
waitForConnection
方法比較長(zhǎng),我們一段一段地看澎现。
1 嘗試立即獲取連接
//是否可寫連接仅胞。可寫的連接同一時(shí)間只能存在一個(gè)剑辫。
final boolean wantPrimaryConnection =
(connectionFlags & CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY) != 0;
final ConnectionWaiter waiter;
final int nonce;
synchronized (mLock) {//加鎖干旧。留意這一段代碼中加鎖部分并未結(jié)束。
throwIfClosedLocked();
// Abort if canceled.
if (cancellationSignal != null) {
cancellationSignal.throwIfCanceled();
}
// Try to acquire a connection.
SQLiteConnection connection = null;
if (!wantPrimaryConnection) {
//嘗試獲取只讀連接
connection = tryAcquireNonPrimaryConnectionLocked(
sql, connectionFlags); // might throw
}
if (connection == null) {
//嘗試獲取可寫連接
connection = tryAcquirePrimaryConnectionLocked(connectionFlags); // might throw
}
if (connection != null) {
return connection;
}
到這里是嘗試直接獲取連接妹蔽。嘗試的方法有tryAcquireNonPrimaryConnectionLocked
和tryAcquirePrimaryConnectionLocked
莱革。只讀時(shí)只需要 non primary connection,而需要寫時(shí)要primary connection讹开。
先看tryAcquirePrimaryConnectionLocked
:
// Might throw.
private SQLiteConnection tryAcquirePrimaryConnectionLocked(int connectionFlags) {
// If the primary connection is available, acquire it now.
SQLiteConnection connection = mAvailablePrimaryConnection;//同時(shí)只能存在一個(gè)可寫連接,用一個(gè)成員變量mAvailablePrimaryConnection緩存空閑連接
if (connection != null) {//有緩存返回即可捐名。finishAcquirePrimaryConnection會(huì)把connection放到mAcquiredConnections中旦万。mAcquiredConnections存儲(chǔ)正在使用的連接。
mAvailablePrimaryConnection = null;//不再空閑
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
// Make sure that the primary connection actually exists and has just been acquired.
//如果上一個(gè)if造成了不再空閑镶蹋,則mAcquiredConnections中就會(huì)有一個(gè)primary connection成艘,這里就會(huì)返回null。上一層的waitForConnection接到null會(huì)進(jìn)入等待狀態(tài)贺归,這個(gè)后面討論淆两。
for (SQLiteConnection acquiredConnection : mAcquiredConnections.keySet()) {
if (acquiredConnection.isPrimaryConnection()) {
return null;
}
}
//如果沒有在上面返回null,那么這一定是第一次請(qǐng)求primary connnection拂酣,或者有一個(gè)連接泄露了(未recycle的情況下finalize)秋冰,這時(shí)候就需要用openConnectionLocked去新開一個(gè)連接。
// Uhoh. No primary connection! Either this is the first time we asked
// for it, or maybe it leaked?
connection = openConnectionLocked(mConfiguration,
true /*primaryConnection*/); // might throw
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
然后看tryAcquireNonPrimaryConnectionLocked
:
// Might throw.
private SQLiteConnection tryAcquireNonPrimaryConnectionLocked(
String sql, int connectionFlags) {
// Try to acquire the next connection in the queue.
SQLiteConnection connection;
//只讀連接可以有多個(gè)婶熬,用一個(gè)ArrayList緩存了所有空閑連接
final int availableCount = mAvailableNonPrimaryConnections.size();
if (availableCount > 1 && sql != null) {
// If we have a choice, then prefer a connection that has the
// prepared statement in its cache.
// 如上面的英文注釋說的剑勾,如果有不止一個(gè)連接可選埃撵,那么挑選緩存了相同sql語(yǔ)句的那個(gè)∷淞恚可能SQLiteConnection對(duì)此有優(yōu)化暂刘?
for (int i = 0; i < availableCount; i++) {
connection = mAvailableNonPrimaryConnections.get(i);
if (connection.isPreparedStatementInCache(sql)) {//如果有相同sql,返回
mAvailableNonPrimaryConnections.remove(i);
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
}
}
if (availableCount > 0) {
// Otherwise, just grab the next one.
//沒有挑到捂刺,隨便給一個(gè)
connection = mAvailableNonPrimaryConnections.remove(availableCount - 1);
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
// 一個(gè)空閑連接都沒有谣拣。
// Expand the pool if needed.
int openConnections = mAcquiredConnections.size();
if (mAvailablePrimaryConnection != null) {
openConnections += 1;
}
// 上面在計(jì)算有多少已打開連接(空閑+使用中)。這里肯定沒有空閑non primary連接了族展,而如果有空閑primary連接森缠,則要 += 1。
if (openConnections >= mMaxConnectionPoolSize) {
// 超過數(shù)據(jù)庫(kù)連接限制苛谷,放棄治療辅鲸。連接限制與數(shù)據(jù)庫(kù)底層實(shí)現(xiàn)有關(guān)。
return null;
}
// 沒超限腹殿,還能再開一個(gè)連接独悴。所以開連接并返回。
connection = openConnectionLocked(mConfiguration,
false /*primaryConnection*/); // might throw
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
在這一步中锣尉,進(jìn)行了從緩存中取得連接的嘗試刻炒;而如果無法取得連接,也進(jìn)行了打開連接的嘗試自沧。如果再無法打開的話坟奥,就會(huì)拿到一個(gè)null了。后續(xù)就需要進(jìn)行等待拇厢。
2 等待獲取連接
// 留意這里還在上一個(gè)鎖mLock中
// No connections available. Enqueue a waiter in priority order.
final int priority = getPriority(connectionFlags);//主線程中的連接優(yōu)先級(jí)更高爱谁,記得嗎?
final long startTime = SystemClock.uptimeMillis();
// waiter是一個(gè)ConnectionWaiter對(duì)象孝偎。它同時(shí)也是一個(gè)鏈表访敌,有一個(gè)同類的mNext成員變量。
// obtainConnectionWaiterLocked會(huì)去復(fù)用(取鏈表頭)或者新建一個(gè)對(duì)象衣盾。
waiter = obtainConnectionWaiterLocked(Thread.currentThread(), startTime,
priority, wantPrimaryConnection, sql, connectionFlags);
ConnectionWaiter predecessor = null;
// 按照優(yōu)先級(jí)向mConnectionWaiterQueue添加waitor對(duì)象寺旺。mConnectionWaiterQueue不是復(fù)用池,而是有效的等待隊(duì)列(也是鏈表)势决。
ConnectionWaiter successor = mConnectionWaiterQueue;
while (successor != null) {
if (priority > successor.mPriority) {
waiter.mNext = successor;
break;
}
predecessor = successor;
successor = successor.mNext;
}
if (predecessor != null) {
predecessor.mNext = waiter;
} else {
mConnectionWaiterQueue = waiter;
}
nonce = waiter.mNonce;//觀察recycleConnectionWaiterLocked方法阻塑,mNonce在waiter每次被復(fù)用完成回收時(shí)自增1
}//鎖mLock結(jié)束
到這里就是把需要等待的連接信息封裝到ConnectionWaiter
中,并將ConnectionWaiter
對(duì)象放到一個(gè)鏈表里果复。那么什么時(shí)候會(huì)結(jié)束等待并返回呢陈莽?繼續(xù)看代碼:
// Set up the cancellation listener.
if (cancellationSignal != null) {
cancellationSignal.setOnCancelListener(new CancellationSignal.OnCancelListener() {
@Override
public void onCancel() {
synchronized (mLock) {
if (waiter.mNonce == nonce) {//nonce的作用在這里體現(xiàn)。防止waiter對(duì)象復(fù)用造成誤取消。
cancelConnectionWaiterLocked(waiter);
}
}
}
});
}
這一段用于額外處理取消信號(hào)的传透。在等待連接過程中取消耘沼,就可以把這一個(gè)waiter去除了。
接下來:
try {
// Park the thread until a connection is assigned or the pool is closed.
// Rethrow an exception from the wait, if we got one.
long busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS;
long nextBusyTimeoutTime = waiter.mStartTime + busyTimeoutMillis;
for (;;) {//循環(huán)開始
// Detect and recover from connection leaks.
// 管理泄露連接的朱盐。如果一個(gè)SQLiteConnection在finalize時(shí)還未關(guān)閉群嗤,則會(huì)置泄露狀態(tài)。
// mConnectionLeaked是一個(gè)AtomicBoolean兵琳。
if (mConnectionLeaked.compareAndSet(true, false)) {
synchronized (mLock) {
wakeConnectionWaitersLocked();//有泄露連接被關(guān)閉的話狂秘,最大連接限制下就可能有位置空出來,這時(shí)候就可以嘗試分配一個(gè)連接
}
}
// Wait to be unparked (may already have happened), a timeout, or interruption.
// 等待躯肌。那么unpark在哪里者春?在wakeConnectionWaitersLocked中。這個(gè)方法在上面泄露測(cè)試時(shí)調(diào)用過清女。
// 還有cancelConnectionWaiterLocked中钱烟,取消等待自然要喚醒線程處理一下。
LockSupport.parkNanos(this, busyTimeoutMillis * 1000000L);
// Clear the interrupted flag, just in case.
Thread.interrupted();
// Check whether we are done waiting yet.
synchronized (mLock) {
throwIfClosedLocked();
//等到了一個(gè)Connection嫡丙。這個(gè)mAssignedConnection是何時(shí)賦值的呢拴袭?
//也是在wakeConnectionWaitersLocked中賦值的。
final SQLiteConnection connection = waiter.mAssignedConnection;
final RuntimeException ex = waiter.mException;
if (connection != null || ex != null) {
recycleConnectionWaiterLocked(waiter);//回收waiter曙博,會(huì)造成mNonce自增1
if (connection != null) {
return connection;
}
throw ex; // rethrow!
}
//沒拿到連接拥刻,繼續(xù)等。
final long now = SystemClock.uptimeMillis();
if (now < nextBusyTimeoutTime) {
busyTimeoutMillis = now - nextBusyTimeoutTime;
} else {
logConnectionPoolBusyLocked(now - waiter.mStartTime, connectionFlags);
busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS;
nextBusyTimeoutTime = now + busyTimeoutMillis;
}
}
}//循環(huán)結(jié)束
} finally {
// Remove the cancellation listener.
if (cancellationSignal != null) {
cancellationSignal.setOnCancelListener(null);
}
}
}
在這一步中父泳,用ConnectionWaiter
來封裝等待中的連接信息般哼,并按優(yōu)先級(jí)放入一個(gè)鏈表,隨后進(jìn)入等待狀態(tài)惠窄。獲取到連接后蒸眠,等待狀態(tài)結(jié)束,返回連接杆融。
3 連接的釋放
這里我們可以先預(yù)估以下:釋放連接時(shí)需要把被釋放的連接放回到空閑連接集合黔宛,并進(jìn)行unpark操作,通知正在等待連接的線程擒贸。
代碼如下:
public void releaseConnection(SQLiteConnection connection) {
synchronized (mLock) {
AcquiredConnectionStatus status = mAcquiredConnections.remove(connection);//從活躍連接池中移除
if (status == null) {
throw new IllegalStateException("Cannot perform this operation "
+ "because the specified connection was not acquired "
+ "from this pool or has already been released.");
}
if (!mIsOpen) {
closeConnectionAndLogExceptionsLocked(connection);
} else if (connection.isPrimaryConnection()) {
if (recycleConnectionLocked(connection, status)) {
assert mAvailablePrimaryConnection == null;
mAvailablePrimaryConnection = connection;//放回可寫連接mAvailablePrimaryConnection
}
wakeConnectionWaitersLocked();//通知其它線程
} else if (mAvailableNonPrimaryConnections.size() >= mMaxConnectionPoolSize - 1) {
closeConnectionAndLogExceptionsLocked(connection);
} else {
if (recycleConnectionLocked(connection, status)) {
mAvailableNonPrimaryConnections.add(connection);//放回空閑只讀連接池
}
wakeConnectionWaitersLocked();//通知其它線程
}
}
}
4 總結(jié)
綜上所述,SQLiteConnectionPool
提供數(shù)據(jù)庫(kù)連接的流程如下:
(1)從緩存中獲取一個(gè)空閑的連接觉渴。若有多個(gè)空閑連接介劫,優(yōu)先挑選執(zhí)行過相同SQL的那個(gè)。注意如果是寫操作的話案淋,則會(huì)返回一個(gè)primary connection座韵,并將其它嘗試獲得primary connection的線程阻塞,直到當(dāng)前線程結(jié)束使用連接。而只讀的操作則可以同時(shí)存在多個(gè)誉碴,并可以和寫操作的連接共存宦棺。
(2)如果緩存中沒有連接,檢查底層數(shù)據(jù)庫(kù)是否可以容納更多連接黔帕。如果可以代咸,新建一個(gè)連接并返回。
(3)如果底層數(shù)據(jù)庫(kù)不再允許增加連接成黄,則進(jìn)入等待呐芥。到超時(shí)或者有其它連接被釋放結(jié)束等待。如果此時(shí)可以獲取連接奋岁,則返回連接思瘟。如果不能,進(jìn)入新一輪等待闻伶。
5 多線程下的transaction
了解了以上的特性之后滨攻,transaction的多線程行為就比較好理解了。
以下是SQLiteDatabase
的beginTransaction
方法:
private void beginTransaction(SQLiteTransactionListener transactionListener,
boolean exclusive) {
acquireReference();
try {
getThreadSession().beginTransaction(
exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE :
SQLiteSession.TRANSACTION_MODE_IMMEDIATE,
transactionListener,
getThreadDefaultConnectionFlags(false /*readOnly*/), null);
} finally {
releaseReference();
}
}
留意flags參數(shù)傳入的readOnly為false蓝翰,所以SQLiteSession
會(huì)從SQLiteConnectionPool
中獲取一個(gè)獨(dú)占的連接光绕。并且在SQLiteSession
執(zhí)行其它SQL語(yǔ)句的情況下,執(zhí)行完成會(huì)將連接釋放回連接池霎箍,而beginTransaction
操作則不會(huì)奇钞,而是持有這一個(gè)連接直至同一線程內(nèi)調(diào)用endTransaction
。這里再貼一遍SQLiteSession.execute
源碼:
public void execute(String sql, Object[] bindArgs, int connectionFlags,
CancellationSignal cancellationSignal) {
if (sql == null) {
throw new IllegalArgumentException("sql must not be null.");
}
if (executeSpecial(sql, bindArgs, connectionFlags, cancellationSignal)) {
return;
}
acquireConnection(sql, connectionFlags, cancellationSignal); // might throw
try {
mConnection.execute(sql, bindArgs, cancellationSignal); // might throw
} finally {
//這里釋放了連接(其實(shí)是交還給連接池)
releaseConnection(); // might throw
}
}
所以漂坏,當(dāng)有一個(gè)線程在transaction過程中時(shí)景埃,其它線程的寫操作和beginTransaction
操作都會(huì)被阻塞住,直至當(dāng)前線程的transaction完成才會(huì)按照優(yōu)先級(jí)挑選一個(gè)線程繼續(xù)顶别。