目錄
源碼解析目錄
從Room源碼看抽象與封裝——SQLite的抽象
從Room源碼看抽象與封裝——數(shù)據(jù)庫的創(chuàng)建
從Room源碼看抽象與封裝——數(shù)據(jù)庫的升降級
從Room源碼看抽象與封裝——Dao
從Room源碼看抽象與封裝——數(shù)據(jù)流
前言
之前關于Room源碼分析的四篇文章基本上涵蓋了Room使用的全流程肥荔,從抽象基礎到數(shù)據(jù)庫創(chuàng)建再到增刪改查的實現(xiàn)活烙,可謂很全面了纳鼎。你以為這就完了嗎隔缀?Too young瓷蛙!如果Room只是這些內(nèi)容的話歌径,那只能說它是個還不錯的ORM框架亿絮,對SQLite進行了良好的抽象鼠锈,并且效率也很高郭膛。Room最大的不同在于它是Jetpack包的一部分晨抡,它與Jetpack包中其它部分有著非常好的配合,尤其是LiveData则剃。以之前定義的Dao為例:
@Dao
interface UserDao {
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserById(userId: Int): User?
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserLiveDataById(userId: Int): LiveData<User>
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserObservableById(userId: Int): Observable<User>
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserFlowableById(userId: Int): Flowable<User>
}
如上所示耘柱,Dao中的查詢不僅可以返回Entity對象User,還可以以數(shù)據(jù)流的形式返回LiveData或者是RxJava中的Observable棍现,F(xiàn)lowable调煎。當我們“增刪改”User表時,如果在此之前以如上形式獲取到了User數(shù)據(jù)流己肮,那么該User數(shù)據(jù)流就會“更新”士袄,即相應查詢會重新執(zhí)行一遍,以把最新的User數(shù)據(jù)傳遞出去谎僻,至于User是否真的有變化窖剑,這需要我們自行判斷。
這篇文章會介紹Room是如何實現(xiàn)數(shù)據(jù)流的戈稿。
1. 概述
如果說之前對Room方方面面的解析只是“繁”的話西土,那么Room對于數(shù)據(jù)流的實現(xiàn)真的是有點“難”了。整個流程比較復雜鞍盗,為了防止你看的云里霧里需了,我先大致描述一下整個流程跳昼,然后再對各個部分進行詳細講解。
Room實現(xiàn)數(shù)據(jù)流大致包含如下幾個步驟:
- 創(chuàng)建一個數(shù)據(jù)庫臨時表(表名為
room_table_modification_log
肋乍,保存在內(nèi)存中鹅颊,非本地存儲),表中包含兩列table_id
和invalidated
墓造,這個表記錄了當前數(shù)據(jù)庫哪個Table被修改了(“增刪改”都屬于修改)堪伍。 - 如果Dao中的查詢涉及到了數(shù)據(jù)流返回,那么生成代碼就會幫我們向數(shù)據(jù)庫中添加一個關于當前被查詢表(對于上面的例子來說就是User這個表觅闽,可能會涉及到多個表)的TRIGGER(觸發(fā)器)帝雇,這個觸發(fā)器的主要任務是當有操作修改了被查詢表中的數(shù)據(jù)時,就把
room_table_modification_log
表中對應的table_id
的invalidated
置為1蛉拙,表示“失效”了尸闸。 - Room中所有“增刪改”都是放在Transaction中的,在
endTransaction
時孕锄,Room都會查看room_table_modification_log
表中是否有invalidated
為1的吮廉,如果有,再次執(zhí)行相應查詢(invalidated
會被重置為0)畸肆,并把查詢的數(shù)據(jù)通過相應數(shù)據(jù)流傳遞出去宦芦。 - 在數(shù)據(jù)流不再使用時(例如Observable被dispose),相應觸發(fā)器也會被丟棄轴脐,這樣就不會再對這個表進行“追蹤”了踪旷。
2. 創(chuàng)建臨時表
Room中關于數(shù)據(jù)流的實現(xiàn)幾乎全部包含在InvalidationTracker
類中,這個類是在RoomDatabase
中被創(chuàng)建的豁辉,只是在之前的分析中被刻意忽略了。
public abstract class RoomDatabase {
private final InvalidationTracker mInvalidationTracker;
public RoomDatabase() {
mInvalidationTracker = createInvalidationTracker();
}
/**
* 由生成代碼實現(xiàn)
*/
@NonNull
protected abstract InvalidationTracker createInvalidationTracker();
/**
* 由生成代碼調(diào)用舀患,初始化 InvalidationTracker
*/
protected void internalInitInvalidationTracker(@NonNull SupportSQLiteDatabase db) {
mInvalidationTracker.internalInit(db);
}
@NonNull
public InvalidationTracker getInvalidationTracker() {
return mInvalidationTracker;
}
}
public final class AppDatabase_Impl extends AppDatabase {
@Override
protected SupportSQLiteOpenHelper createOpenHelper(DatabaseConfiguration configuration) {
final SupportSQLiteOpenHelper.Callback _openCallback = new RoomOpenHelper(configuration, new RoomOpenHelper.Delegate(1) {
@Override
public void onOpen(SupportSQLiteDatabase _db) {
mDatabase = _db;
_db.execSQL("PRAGMA foreign_keys = ON");
//數(shù)據(jù)庫打開時初始化 InvalidationTracker
internalInitInvalidationTracker(_db);
//...
}
//...
});
//...
}
@Override
protected InvalidationTracker createInvalidationTracker() {
//...
return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "User"); //User表示要觀察的表名徽级,可能包含很多個
}
}
注解處理器生成了AppDatabase_Impl
,其中不僅實現(xiàn)了createOpenHelper
聊浅,還實現(xiàn)了createInvalidationTracker
餐抢,并且在數(shù)據(jù)庫打開時(onOpen
回調(diào)被調(diào)用),InvalidationTracker
被初始化低匙,也是在初始化時旷痕,臨時表被創(chuàng)建,踏出萬里長征第一步顽冶。
public class InvalidationTracker {
private static final String UPDATE_TABLE_NAME = "room_table_modification_log";
private static final String TABLE_ID_COLUMN_NAME = "table_id";
private static final String INVALIDATED_COLUMN_NAME = "invalidated";
//建表
private static final String CREATE_TRACKING_TABLE_SQL = "CREATE TEMP TABLE " + UPDATE_TABLE_NAME
+ "(" + TABLE_ID_COLUMN_NAME + " INTEGER PRIMARY KEY, "
+ INVALIDATED_COLUMN_NAME + " INTEGER NOT NULL DEFAULT 0)";
//重置
static final String RESET_UPDATED_TABLES_SQL = "UPDATE " + UPDATE_TABLE_NAME
+ " SET " + INVALIDATED_COLUMN_NAME + " = 0 WHERE " + INVALIDATED_COLUMN_NAME + " = 1 ";
//找出哪個表改變了
static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME
+ " WHERE " + INVALIDATED_COLUMN_NAME + " = 1;";
//根據(jù)表名找表名對應的ID
final ArrayMap<String, Integer> mTableIdLookup;
//哪些表需要被觀察
final String[] mTableNames;
//tableNames是可變參數(shù)欺抗,表示可能有多個要被“追蹤”的表
public InvalidationTracker(RoomDatabase database, Map<String, String> shadowTablesMap,
Map<String, Set<String>> viewTables, String... tableNames) {
//shadowTablesMap, viewTables可以忽略,一般都為空
final int size = tableNames.length;
mTableNames = new String[size];
for (int id = 0; id < size; id++) {
final String tableName = tableNames[id].toLowerCase(Locale.US);
//表對應的ID其實就是它的位置强重,因為是臨時表存儲在內(nèi)存中绞呈,所以這個ID只需要能分別誰是誰就可以了
mTableIdLookup.put(tableName, id);
mTableNames[id] = tableName;
}
//...
}
/**
* 初始化臨時表
*/
void internalInit(SupportSQLiteDatabase database) {
synchronized (this) {
database.execSQL("PRAGMA temp_store = MEMORY;"); //臨時表保存在內(nèi)存中
database.execSQL("PRAGMA recursive_triggers='ON';");
//創(chuàng)建臨時表
database.execSQL(CREATE_TRACKING_TABLE_SQL);
syncTriggers(database);
//重置臨時表是需要經(jīng)常執(zhí)行的贸人,把它編譯成SQLiteStatement方便執(zhí)行
mCleanupStatement = database.compileStatement(RESET_UPDATED_TABLES_SQL);
}
}
}
臨時表的表名是room_table_modification_log
,創(chuàng)建它的目的是追蹤我們需要觀察的那些表中(例如User表)數(shù)據(jù)是否發(fā)生了變化佃声。把建表的SQL翻譯以下就是
CREATE TEMP TABLE room_table_modification_log
(table_id INTEGER PRIMARY KEY, invalidated INTEGER NOT NULL DEFAULT 0)
就像下面這樣
table_id | invalidated |
---|---|
0 | 0 |
創(chuàng)建表之后不會有數(shù)據(jù)艺智,數(shù)據(jù)(0,0)只是方便你理解。
因為這個表是臨時表圾亏,并且在創(chuàng)建它之前設置了PRAGMA temp_store = MEMORY
十拣,因此room_table_modification_log
表被放在了內(nèi)存中,并不會被保存在本地文件中(當然也沒有必要)志鹃。因為要放在內(nèi)存中夭问,所以說room_table_modification_log
選擇使用把要追蹤的表名轉成ID進行存儲,這樣節(jié)省內(nèi)存弄跌,而這個ID并沒有什么特殊含義甲喝,只需要跟要追蹤的表名一一對應就可以,所以使用0,1,2這樣的位置作為ID就可以铛只。
需要明確一點埠胖,room_table_modification_log
創(chuàng)建后,其中是沒有數(shù)據(jù)的淳玩。創(chuàng)建InvalidationTracker
時傳入的tableNames
參數(shù)只是表明直撤,返回數(shù)據(jù)流的查詢涉及到了這些表,但是在數(shù)據(jù)庫使用過程中蜕着,可能根本就沒有執(zhí)行相關查詢谋竖,所以也就不需要追蹤。也就是說room_table_modification_log
的追蹤是“懶”的承匣,直到返回數(shù)據(jù)流的查詢方法被調(diào)用時蓖乘,相應數(shù)據(jù)才會被插入到room_table_modification_log
表中,開啟追蹤韧骗。至于數(shù)據(jù)是如何被插入的嘉抒,invalidated
又是如何在01之間翻轉的,下面會介紹袍暴。
3. 添加/移除觸發(fā)器
觸發(fā)器是數(shù)據(jù)庫中的一個概念些侍,你可以把它簡單理解為在特定條件下“觸發(fā)”的一系列行為,這種特定條件在我們這里就是UPDATE, DELETE, INSERT這三種操作政模,而“一系列行為”就是把room_table_modification_log
表中的invalidated
置為1岗宣。看來觸發(fā)器的確非常適合于追蹤表中數(shù)據(jù)被修改這種情形淋样。
那么觸發(fā)器是由誰添加的呢耗式?源頭就是返回數(shù)據(jù)流的查詢方法,當這些方法被調(diào)用時,觸發(fā)器就會被添加纽什。然后措嵌,從Dao中查詢方法被調(diào)用,到觸發(fā)器被添加芦缰,這中間又經(jīng)歷了很多轉換企巢,我們暫且把這流程省略直接看觸發(fā)器相關的內(nèi)容。
public class InvalidationTracker {
/**
* An observer that can listen for changes in the database.
*/
public abstract static class Observer {
final String[] mTables;
public Observer(@NonNull String[] tables) {
mTables = Arrays.copyOf(tables, tables.length);
}
public abstract void onInvalidated(@NonNull Set<String> tables);
}
}
別管我們查詢是以怎樣的數(shù)據(jù)流返回让蕾,最終都是向數(shù)據(jù)庫添加了一個觀察者浪规,這個觀察者的onInvalidated
會在我們觀察的Table“失效”時被回調(diào),因此探孝,我們可以再次進行查詢笋婿,進而把最新的數(shù)據(jù)傳遞出去,形成數(shù)據(jù)流顿颅。而onInvalidated
之所以被回調(diào)缸濒,依賴的就是room_table_modification_log
表。
當向數(shù)據(jù)庫添加一個觀察者時(其實是向InvalidationTracker
添加)粱腻,就會看要觀察者提供的mTables
中是否還存在沒有被“追蹤”的Table庇配,如果有則會向room_table_modification_log
表中插入一條數(shù)據(jù),以開啟對該Table的“追蹤”绍些,另外還會添加一個觸發(fā)器捞慌,在UPDATE, DELETE, INSERT時設置對應table_id
的invalidated
為1。
public class InvalidationTracker {
//主要職責是記錄哪些 Table 已經(jīng)被“追蹤”了
private ObservedTableTracker mObservedTableTracker;
final SafeIterableMap<Observer, ObserverWrapper> mObserverMap = new SafeIterableMap<>();
//添加觀察者
@WorkerThread
public void addObserver(@NonNull Observer observer) {
//一般情況下柬批,與 mTables一致
final String[] tableNames = resolveViews(observer.mTables);
int[] tableIds = new int[tableNames.length];
final int size = tableNames.length;
for (int i = 0; i < size; i++) {
//查找 Table對應的ID
Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
if (tableId == null) {
throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
}
tableIds[i] = tableId;
}
//會進行一下包裝
ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames);
ObserverWrapper currentObserver;
synchronized (mObserverMap) {
//保存下來啸澡,如果之前添加過,啥也不干
currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
}
//如果之前沒有添加過該觀察者氮帐,并且 tableId之前沒有添加過嗅虏,就會添加觸發(fā)器
if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
//需要“同步”
syncTriggers();
}
}
//移除觀察者,在Observable dispose上沐,LiveData被回收時調(diào)用
@WorkerThread
public void removeObserver(@NonNull final Observer observer) {
ObserverWrapper wrapper;
synchronized (mObserverMap) {
wrapper = mObserverMap.remove(observer);
}
if (wrapper != null && mObservedTableTracker.onRemoved(wrapper.mTableIds)) {
//需要“同步”
syncTriggers();
}
}
}
向InvalidationTracker
添加觀察者的主要流程是皮服,根據(jù)Observer
要觀察的mTables
,轉換成對應的tableIds
奄容,之后把Observer
包裝成ObserverWrapper
,ObserverWrapper
的主要作用是記錄下被觀察表的信息产徊,諸如tableIds, tableNames昂勒。最后,把觀察者保存下來舟铜,并且在必要時syncTriggers
戈盈,顧名思義就是同步觸發(fā)器的意思(可能添加也可能移除觸發(fā)器)。
之前說過,只有在被觀察的表還未被“追蹤”時塘娶,才會添加觸發(fā)器归斤,如果某個表在之前就已經(jīng)被“追蹤”了,那自然不需要重復添加觸發(fā)器刁岸。而哪些表已經(jīng)被“追蹤”脏里,是否需要停止追蹤,這些信息保存在ObservedTableTracker
中虹曙,這里就不展開其代碼了迫横。
移除觀察者就更簡單了,一目了然酝碳,就不廢話了矾踱。
我接著來看主線流程syncTriggers
:
public class InvalidationTracker {
void syncTriggers() {
if (!mDatabase.isOpen()) {
return;
}
syncTriggers(mDatabase.getOpenHelper().getWritableDatabase());
}
void syncTriggers(SupportSQLiteDatabase database) {
if (database.inTransaction()) {
return;
}
try {
// This method runs in a while loop because while changes are synced to db, another
// runnable may be skipped. If we cause it to skip, we need to do its work.
while (true) {
Lock closeLock = mDatabase.getCloseLock();
closeLock.lock();
try {
//上文提到的 ObservedTableTracker
//返回為null,證明沒有要同步的疏哗,否則就是“有活要干”
final int[] tablesToSync = mObservedTableTracker.getTablesToSync();
if (tablesToSync == null) {
return;
}
final int limit = tablesToSync.length;
database.beginTransaction();
try {
for (int tableId = 0; tableId < limit; tableId++) {
switch (tablesToSync[tableId]) {
//活來了呛讲,可能是開始追蹤表
case ObservedTableTracker.ADD:
startTrackingTable(database, tableId);
break;
//也可能是停止追蹤表
case ObservedTableTracker.REMOVE:
stopTrackingTable(database, tableId);
break;
}
}
database.setTransactionSuccessful();
} finally {
database.endTransaction();
}
mObservedTableTracker.onSyncCompleted();
} finally {
closeLock.unlock();
}
}
} catch (IllegalStateException | SQLiteException exception) {
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
}
}
}
其實Room實現(xiàn)數(shù)據(jù)流的整體流程并不復雜,關鍵就是其中需要處理非常多的線程安全的問題返奉,導致源碼很復雜贝搁。syncTriggers
就是這樣的,其主干就是startTrackingTable
或者stopTrackingTable
或者啥也不干衡瓶,但是考慮到線程安全的問題徘公,就需要加鎖,循環(huán)查詢等等哮针」孛妫總之,開啟追蹤是在startTrackingTable
十厢,停止追蹤是在stopTrackingTable
等太。
public class InvalidationTracker {
private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
//開啟追蹤,向room_table_modification_log插入數(shù)據(jù)蛮放,并添加觸發(fā)器
private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
writableDb.execSQL(
"INSERT OR IGNORE INTO " + UPDATE_TABLE_NAME + " VALUES(" + tableId + ", 0)");
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
stringBuilder.append(" AFTER ")
.append(trigger)
.append(" ON `")
.append(tableName)
.append("` BEGIN UPDATE ")
.append(UPDATE_TABLE_NAME)
.append(" SET ").append(INVALIDATED_COLUMN_NAME).append(" = 1")
.append(" WHERE ").append(TABLE_ID_COLUMN_NAME).append(" = ").append(tableId)
.append(" AND ").append(INVALIDATED_COLUMN_NAME).append(" = 0")
.append("; END");
writableDb.execSQL(stringBuilder.toString());
}
}
//停止追蹤缩抡,只是丟棄觸發(fā)器,并不會刪除room_table_modification_log中的數(shù)據(jù)
private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("DROP TRIGGER IF EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
writableDb.execSQL(stringBuilder.toString());
}
}
//觸發(fā)器的名稱
private static void appendTriggerName(StringBuilder builder, String tableName,
String triggerType) {
builder.append("`")
.append("room_table_modification_trigger_")
.append(tableName)
.append("_")
.append(triggerType)
.append("`");
}
}
以User表為例(其table_id
為0)包颁,那么開啟“追蹤”:
INSERT OR IGNORE INTO room_table_modification_log VALUES(0, 0)
CREATE TEMP TRIGGER IF NOT EXISTS `room_table_modification_trigger_user_UPDATE`
AFTER UPDATE ON `user`
BEGIN
UPDATE room_table_modification_log
SET invalidated = 1 WHERE table_id = 0 AND invalidated = 0;
END
會為UPDATE, DELETE, INSERT分別創(chuàng)建TRIGGER瞻想,TIRGGER中的內(nèi)容都是一樣的。
停止“追蹤”:
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_UPDATE`
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_DELETE`
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_INSERT`
并不會把room_table_modification_log
中相應的數(shù)據(jù)也刪了娩嚼,說不定下次還能用到蘑险,只是不再更新invalidated
。
4. 數(shù)據(jù)流
有了上面這些基礎岳悟,在看數(shù)據(jù)流的實現(xiàn)就比較簡單了佃迄。下面會以RxJava Observable數(shù)據(jù)流為例泼差,看看數(shù)據(jù)流究竟是如何實現(xiàn)的。
@Dao
interface UserDao {
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserObservableById(userId: Int): Observable<User>
}
會生成如下代碼:
public final class UserDao_Impl implements UserDao {
@Override
public Observable<User> getUserObservableById(final int userId) {
final String _sql = "SELECT * FROM user WHERE uid = ?";
final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 1);
int _argIndex = 1;
_statement.bindLong(_argIndex, userId);
//這里是關鍵
return RxRoom.createObservable(__db, false, new String[]{"user"}, new Callable<User>() {
@Override
public User call() throws Exception {
//以下是正常的數(shù)據(jù)庫查詢流程
final Cursor _cursor = DBUtil.query(__db, _statement, false);
try {
final int _cursorIndexOfUid = CursorUtil.getColumnIndexOrThrow(_cursor, "uid");
final int _cursorIndexOfFirstName = CursorUtil.getColumnIndexOrThrow(_cursor, "first_name");
final int _cursorIndexOfLastName = CursorUtil.getColumnIndexOrThrow(_cursor, "last_name");
final User _result;
if(_cursor.moveToFirst()) {
final int _tmpUid;
_tmpUid = _cursor.getInt(_cursorIndexOfUid);
final String _tmpFirstName;
_tmpFirstName = _cursor.getString(_cursorIndexOfFirstName);
final String _tmpLastName;
_tmpLastName = _cursor.getString(_cursorIndexOfLastName);
_result = new User(_tmpUid,_tmpFirstName,_tmpLastName);
} else {
_result = null;
}
return _result;
} finally {
_cursor.close();
}
}
@Override
protected void finalize() {
_statement.release();
}
});
}
}
跟普通查詢最大的不同就是以RxRoom.createObservable
創(chuàng)建了Observable
呵俏。來看一下RxRoom
:
public class RxRoom {
//其中參數(shù)callable就是要進行的查詢
public static <T> Observable<T> createObservable(final RoomDatabase database,
final boolean inTransaction, final String[] tableNames, final Callable<T> callable) {
//不用太在意這個scheduler堆缘,就是一個線程池,不同情況下用不同的線程池
Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
//fromCallable會天然地阻止 null普碎,如果callable返回null會被過濾掉
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createObservable(database, tableNames)
.subscribeOn(scheduler)
.unsubscribeOn(scheduler)
.observeOn(scheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});
}
public static Observable<Object> createObservable(final RoomDatabase database,
final String... tableNames) {
return Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
//新建一個InvalidationTracker.Observer
final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
tableNames) {
//在觀察的表“失效”時回調(diào)
@Override
public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
//只是發(fā)射一個“信號”吼肥,表明需要重新查詢
emitter.onNext(NOTHING);
}
};
//向InvalidationTracker添加觀察者
database.getInvalidationTracker().addObserver(observer);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
//dispose時移除觀察者
database.getInvalidationTracker().removeObserver(observer);
}
}));
// emit once to avoid missing any data and also easy chaining
emitter.onNext(NOTHING);
}
});
}
}
很簡單,就是向InvalidationTracker
添加觀察者随常,在Observable
被dispose
的時候移除觀察者潜沦。因為RxJava 2不喜歡null,所以RxRoom
的做法是僅僅發(fā)送數(shù)據(jù)“失效”的信號绪氛,然后使用flatMapMaybe
操作符結合Maybe.fromCallable
天然地過濾掉了null唆鸡,很巧妙(巧妙也不是一天達成的,在之前的版本中沒有使用這個操作符枣察,還是使用我們總是用的那種“笨拙”的方法過濾null)争占。
還差最后一個環(huán)節(jié)就能形成閉環(huán)了,那就是啥時候去查詢room_table_modification_log
表序目,看看哪個被“追蹤”的表“失效”了臂痕。最合適的時機就是“增刪改”結束的時候:
public abstract class RoomDatabase {
public void beginTransaction() {
assertNotMainThread();
SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();
//Transaction開始時“同步”Trigger
mInvalidationTracker.syncTriggers(database);
database.beginTransaction();
}
//Transaction結束時refreshVersionsAsync
public void endTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
}
public class InvalidationTracker {
public void refreshVersionsAsync() {
if (mPendingRefresh.compareAndSet(false, true)) {
mDatabase.getQueryExecutor().execute(mRefreshRunnable);
}
}
Runnable mRefreshRunnable = new Runnable() {
@Override
public void run() {
final Lock closeLock = mDatabase.getCloseLock();
Set<Integer> invalidatedTableIds = null;
try {
closeLock.lock();
if (!ensureInitialization()) {
return;
}
if (!mPendingRefresh.compareAndSet(true, false)) {
// no pending refresh
return;
}
if (mDatabase.inTransaction()) {
// current thread is in a transaction. when it ends, it will invoke
// refreshRunnable again. mPendingRefresh is left as false on purpose
// so that the last transaction can flip it on again.
return;
}
//查詢room_table_modification_log,獲取“失效”的表
invalidatedTableIds = checkUpdatedTable();
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
} finally {
closeLock.unlock();
}
//如果有“失效”的表
if (invalidatedTableIds != null && !invalidatedTableIds.isEmpty()) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
//通知觀察者猿涨,如果是觀察者“關心”的表握童,就會回調(diào) onInvalidated
entry.getValue().notifyByTableInvalidStatus(invalidatedTableIds);
}
}
}
}
//查詢出room_table_modification_log中invalidated為1的,然后把它重置為0叛赚;返回的是 table_id的集合
private Set<Integer> checkUpdatedTable() {
//主要就是執(zhí)行下面SQL
//SELECT * FROM room_table_modification_log WHERE invalidated = 1
//UPDATE room_table_modification_log SET invalidated = 0 WHERE invalidated = 1
}
};
}
“增刪改”都是放在Transaction中的澡绩,因此endTransaction
是個查詢room_table_modification_log
的好時機。通過room_table_modification_log
獲取“失效”的表俺附,如果InvalidationTracker
的觀察者觀察的是“失效”表中的某一個或幾個肥卡,就會回調(diào)觀察者的onInvalidated
方法。onInvalidated
方法正如前面展示的那樣事镣,會發(fā)送一個信號步鉴,然后相應查詢就會被再次執(zhí)行,最新的數(shù)據(jù)就會被傳遞出去璃哟。至此氛琢,整個流程完整地閉合。
某個數(shù)據(jù)表“失效”僅僅是說這個表中的數(shù)據(jù)被修改了随闪,但是阳似,是不是修改的我們查詢關心的部分,其實并不知道蕴掏,有可能重新查詢的結果跟之前是一樣的障般。如果不想接收到重復的數(shù)據(jù),對于RxJava而言可以使用
distinctUntilChanged
來進行過濾盛杰,對于LiveData而言可以使用Transformations.distinctUntilChanged
來過濾挽荡。
以上僅以RxJava Observable為例展示了數(shù)據(jù)流是如何實現(xiàn)的,對于Flowable而言是類似的即供。但是對于LiveData而言定拟,跟Observable就不太一樣了,中間多了很多處理(主要是防止在我們沒有保存LiveData的情況下逗嫡,LiveData被意外回收)青自,整個流程下來比較繁瑣,但是思路是一樣的驱证,這里就省略了延窜。
5. 總結
圖中有一些不合適的地方,removeObserver
并不是有Dao調(diào)用的抹锄,而是查詢返回的RaJava Observable或者LiveData“不再用”了的時候被各自調(diào)用的逆瑞。其中,“重新查詢”的意思就是數(shù)據(jù)流被更新伙单,數(shù)據(jù)流相關的RaJava Observable或者LiveData都沒有在包含在圖中获高。
能從Room分析的第一篇文章看到這里的,應該頒發(fā)一個最佳堅持獎吻育,請留言或者點贊讓我看見你們的雙手念秧。這是這一系列文章的最后一篇,文章題目叫“從Room源碼看抽象與封裝”布疼,主要是因為寫第一篇文章的時候摊趾,想從“抽象與封裝”作為切入點去分析Room的源碼,沒有想到會寫這么長缎除。前面幾篇文章還是比較符合題目的带欢,后面的文章基本上就剩源碼解析了,不是說這里面不包含抽象封裝的內(nèi)容衙傀,而是源碼已經(jīng)比較復雜了抵窒,再去談抽象封裝只會更加混亂『浞唬總之铸董,希望你有所收獲,歡迎留言與我交流粟害。