從Room源碼看抽象與封裝——數(shù)據(jù)流

目錄

源碼解析目錄
從Room源碼看抽象與封裝——SQLite的抽象
從Room源碼看抽象與封裝——數(shù)據(jù)庫的創(chuàng)建
從Room源碼看抽象與封裝——數(shù)據(jù)庫的升降級
從Room源碼看抽象與封裝——Dao
從Room源碼看抽象與封裝——數(shù)據(jù)流

前言

之前關于Room源碼分析的四篇文章基本上涵蓋了Room使用的全流程肥荔,從抽象基礎到數(shù)據(jù)庫創(chuàng)建再到增刪改查的實現(xiàn)活烙,可謂很全面了纳鼎。你以為這就完了嗎隔缀?Too young瓷蛙!如果Room只是這些內(nèi)容的話歌径,那只能說它是個還不錯的ORM框架亿絮,對SQLite進行了良好的抽象鼠锈,并且效率也很高郭膛。Room最大的不同在于它是Jetpack包的一部分晨抡,它與Jetpack包中其它部分有著非常好的配合,尤其是LiveData则剃。以之前定義的Dao為例:

@Dao
interface UserDao {
    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserById(userId: Int): User?

    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserLiveDataById(userId: Int): LiveData<User>
    
    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserObservableById(userId: Int): Observable<User>
    
    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserFlowableById(userId: Int): Flowable<User>
}

如上所示耘柱,Dao中的查詢不僅可以返回Entity對象User,還可以以數(shù)據(jù)流的形式返回LiveData或者是RxJava中的Observable棍现,F(xiàn)lowable调煎。當我們“增刪改”User表時,如果在此之前以如上形式獲取到了User數(shù)據(jù)流己肮,那么該User數(shù)據(jù)流就會“更新”士袄,即相應查詢會重新執(zhí)行一遍,以把最新的User數(shù)據(jù)傳遞出去谎僻,至于User是否真的有變化窖剑,這需要我們自行判斷。
這篇文章會介紹Room是如何實現(xiàn)數(shù)據(jù)流的戈稿。

1. 概述

如果說之前對Room方方面面的解析只是“繁”的話西土,那么Room對于數(shù)據(jù)流的實現(xiàn)真的是有點“難”了。整個流程比較復雜鞍盗,為了防止你看的云里霧里需了,我先大致描述一下整個流程跳昼,然后再對各個部分進行詳細講解。
Room實現(xiàn)數(shù)據(jù)流大致包含如下幾個步驟:

  1. 創(chuàng)建一個數(shù)據(jù)庫臨時表(表名為room_table_modification_log肋乍,保存在內(nèi)存中鹅颊,非本地存儲),表中包含兩列table_idinvalidated墓造,這個表記錄了當前數(shù)據(jù)庫哪個Table被修改了(“增刪改”都屬于修改)堪伍。
  2. 如果Dao中的查詢涉及到了數(shù)據(jù)流返回,那么生成代碼就會幫我們向數(shù)據(jù)庫中添加一個關于當前被查詢表(對于上面的例子來說就是User這個表觅闽,可能會涉及到多個表)的TRIGGER(觸發(fā)器)帝雇,這個觸發(fā)器的主要任務是當有操作修改了被查詢表中的數(shù)據(jù)時,就把room_table_modification_log表中對應的table_idinvalidated置為1蛉拙,表示“失效”了尸闸。
  3. Room中所有“增刪改”都是放在Transaction中的,在endTransaction時孕锄,Room都會查看room_table_modification_log表中是否有invalidated為1的吮廉,如果有,再次執(zhí)行相應查詢(invalidated會被重置為0)畸肆,并把查詢的數(shù)據(jù)通過相應數(shù)據(jù)流傳遞出去宦芦。
  4. 在數(shù)據(jù)流不再使用時(例如Observable被dispose),相應觸發(fā)器也會被丟棄轴脐,這樣就不會再對這個表進行“追蹤”了踪旷。

2. 創(chuàng)建臨時表

Room中關于數(shù)據(jù)流的實現(xiàn)幾乎全部包含在InvalidationTracker類中,這個類是在RoomDatabase中被創(chuàng)建的豁辉,只是在之前的分析中被刻意忽略了。

public abstract class RoomDatabase {
    private final InvalidationTracker mInvalidationTracker;
    
    public RoomDatabase() {
        mInvalidationTracker = createInvalidationTracker();
    }
    
    /**
     * 由生成代碼實現(xiàn)
     */
    @NonNull
    protected abstract InvalidationTracker createInvalidationTracker();
    
    /**
     * 由生成代碼調(diào)用舀患,初始化 InvalidationTracker
     */
    protected void internalInitInvalidationTracker(@NonNull SupportSQLiteDatabase db) {
        mInvalidationTracker.internalInit(db);
    }

    @NonNull
    public InvalidationTracker getInvalidationTracker() {
        return mInvalidationTracker;
    }
}

public final class AppDatabase_Impl extends AppDatabase {
    @Override
    protected SupportSQLiteOpenHelper createOpenHelper(DatabaseConfiguration configuration) {
        final SupportSQLiteOpenHelper.Callback _openCallback = new RoomOpenHelper(configuration, new RoomOpenHelper.Delegate(1) {
            @Override
            public void onOpen(SupportSQLiteDatabase _db) {
                mDatabase = _db;
                _db.execSQL("PRAGMA foreign_keys = ON");
                //數(shù)據(jù)庫打開時初始化 InvalidationTracker
                internalInitInvalidationTracker(_db);
                //...
            }
            //...
        });
        //...
    }
      
    @Override
    protected InvalidationTracker createInvalidationTracker() {
        //...
        return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "User"); //User表示要觀察的表名徽级,可能包含很多個
    }
}

注解處理器生成了AppDatabase_Impl,其中不僅實現(xiàn)了createOpenHelper聊浅,還實現(xiàn)了createInvalidationTracker餐抢,并且在數(shù)據(jù)庫打開時(onOpen回調(diào)被調(diào)用),InvalidationTracker被初始化低匙,也是在初始化時旷痕,臨時表被創(chuàng)建,踏出萬里長征第一步顽冶。

public class InvalidationTracker {

    private static final String UPDATE_TABLE_NAME = "room_table_modification_log";

    private static final String TABLE_ID_COLUMN_NAME = "table_id";

    private static final String INVALIDATED_COLUMN_NAME = "invalidated";

    //建表
    private static final String CREATE_TRACKING_TABLE_SQL = "CREATE TEMP TABLE " + UPDATE_TABLE_NAME
            + "(" + TABLE_ID_COLUMN_NAME + " INTEGER PRIMARY KEY, "
            + INVALIDATED_COLUMN_NAME + " INTEGER NOT NULL DEFAULT 0)";

    //重置
    static final String RESET_UPDATED_TABLES_SQL = "UPDATE " + UPDATE_TABLE_NAME
            + " SET " + INVALIDATED_COLUMN_NAME + " = 0 WHERE " + INVALIDATED_COLUMN_NAME + " = 1 ";

    //找出哪個表改變了
    static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME
            + " WHERE " + INVALIDATED_COLUMN_NAME + " = 1;";
            
    //根據(jù)表名找表名對應的ID
    final ArrayMap<String, Integer> mTableIdLookup;
    //哪些表需要被觀察
    final String[] mTableNames;
    
    //tableNames是可變參數(shù)欺抗,表示可能有多個要被“追蹤”的表
    public InvalidationTracker(RoomDatabase database, Map<String, String> shadowTablesMap,
            Map<String, Set<String>> viewTables, String... tableNames) {
        //shadowTablesMap, viewTables可以忽略,一般都為空
        final int size = tableNames.length;
        mTableNames = new String[size];
        for (int id = 0; id < size; id++) {
            final String tableName = tableNames[id].toLowerCase(Locale.US);
            //表對應的ID其實就是它的位置强重,因為是臨時表存儲在內(nèi)存中绞呈,所以這個ID只需要能分別誰是誰就可以了
            mTableIdLookup.put(tableName, id);
            mTableNames[id] = tableName;
        }
        //...
    }
    
    /**
     * 初始化臨時表
     */
    void internalInit(SupportSQLiteDatabase database) {
        synchronized (this) {
            database.execSQL("PRAGMA temp_store = MEMORY;"); //臨時表保存在內(nèi)存中
            database.execSQL("PRAGMA recursive_triggers='ON';");
            //創(chuàng)建臨時表
            database.execSQL(CREATE_TRACKING_TABLE_SQL);
            syncTriggers(database);
            //重置臨時表是需要經(jīng)常執(zhí)行的贸人,把它編譯成SQLiteStatement方便執(zhí)行
            mCleanupStatement = database.compileStatement(RESET_UPDATED_TABLES_SQL);
        }
    }
}

臨時表的表名是room_table_modification_log,創(chuàng)建它的目的是追蹤我們需要觀察的那些表中(例如User表)數(shù)據(jù)是否發(fā)生了變化佃声。把建表的SQL翻譯以下就是

CREATE TEMP TABLE room_table_modification_log
    (table_id INTEGER PRIMARY KEY,  invalidated INTEGER NOT NULL DEFAULT 0)

就像下面這樣

table_id invalidated
0 0

創(chuàng)建表之后不會有數(shù)據(jù)艺智,數(shù)據(jù)(0,0)只是方便你理解。

因為這個表是臨時表圾亏,并且在創(chuàng)建它之前設置了PRAGMA temp_store = MEMORY十拣,因此room_table_modification_log表被放在了內(nèi)存中,并不會被保存在本地文件中(當然也沒有必要)志鹃。因為要放在內(nèi)存中夭问,所以說room_table_modification_log選擇使用把要追蹤的表名轉成ID進行存儲,這樣節(jié)省內(nèi)存弄跌,而這個ID并沒有什么特殊含義甲喝,只需要跟要追蹤的表名一一對應就可以,所以使用0,1,2這樣的位置作為ID就可以铛只。
需要明確一點埠胖,room_table_modification_log創(chuàng)建后,其中是沒有數(shù)據(jù)的淳玩。創(chuàng)建InvalidationTracker時傳入的tableNames參數(shù)只是表明直撤,返回數(shù)據(jù)流的查詢涉及到了這些表,但是在數(shù)據(jù)庫使用過程中蜕着,可能根本就沒有執(zhí)行相關查詢谋竖,所以也就不需要追蹤。也就是說room_table_modification_log的追蹤是“懶”的承匣,直到返回數(shù)據(jù)流的查詢方法被調(diào)用時蓖乘,相應數(shù)據(jù)才會被插入到room_table_modification_log表中,開啟追蹤韧骗。至于數(shù)據(jù)是如何被插入的嘉抒,invalidated又是如何在01之間翻轉的,下面會介紹袍暴。

3. 添加/移除觸發(fā)器

觸發(fā)器是數(shù)據(jù)庫中的一個概念些侍,你可以把它簡單理解為在特定條件下“觸發(fā)”的一系列行為,這種特定條件在我們這里就是UPDATE, DELETE, INSERT這三種操作政模,而“一系列行為”就是把room_table_modification_log表中的invalidated置為1岗宣。看來觸發(fā)器的確非常適合于追蹤表中數(shù)據(jù)被修改這種情形淋样。
那么觸發(fā)器是由誰添加的呢耗式?源頭就是返回數(shù)據(jù)流的查詢方法,當這些方法被調(diào)用時,觸發(fā)器就會被添加纽什。然后措嵌,從Dao中查詢方法被調(diào)用,到觸發(fā)器被添加芦缰,這中間又經(jīng)歷了很多轉換企巢,我們暫且把這流程省略直接看觸發(fā)器相關的內(nèi)容。

public class InvalidationTracker {
    /**
     * An observer that can listen for changes in the database.
     */
    public abstract static class Observer {
        final String[] mTables;

        public Observer(@NonNull String[] tables) {
            mTables = Arrays.copyOf(tables, tables.length);
        }

        public abstract void onInvalidated(@NonNull Set<String> tables);
    }
}

別管我們查詢是以怎樣的數(shù)據(jù)流返回让蕾,最終都是向數(shù)據(jù)庫添加了一個觀察者浪规,這個觀察者的onInvalidated會在我們觀察的Table“失效”時被回調(diào),因此探孝,我們可以再次進行查詢笋婿,進而把最新的數(shù)據(jù)傳遞出去,形成數(shù)據(jù)流顿颅。而onInvalidated之所以被回調(diào)缸濒,依賴的就是room_table_modification_log表。
當向數(shù)據(jù)庫添加一個觀察者時(其實是向InvalidationTracker添加)粱腻,就會看要觀察者提供的mTables中是否還存在沒有被“追蹤”的Table庇配,如果有則會向room_table_modification_log表中插入一條數(shù)據(jù),以開啟對該Table的“追蹤”绍些,另外還會添加一個觸發(fā)器捞慌,在UPDATE, DELETE, INSERT時設置對應table_idinvalidated為1。

public class InvalidationTracker {
    //主要職責是記錄哪些 Table 已經(jīng)被“追蹤”了
    private ObservedTableTracker mObservedTableTracker;

    final SafeIterableMap<Observer, ObserverWrapper> mObserverMap = new SafeIterableMap<>();

    //添加觀察者
    @WorkerThread
    public void addObserver(@NonNull Observer observer) {
        //一般情況下柬批,與 mTables一致
        final String[] tableNames = resolveViews(observer.mTables);
        int[] tableIds = new int[tableNames.length];
        final int size = tableNames.length;

        for (int i = 0; i < size; i++) {
            //查找 Table對應的ID
            Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
            if (tableId == null) {
                throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
            }
            tableIds[i] = tableId;
        }
        //會進行一下包裝
        ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames);
        ObserverWrapper currentObserver;
        synchronized (mObserverMap) {
            //保存下來啸澡,如果之前添加過,啥也不干
            currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
        }
        //如果之前沒有添加過該觀察者氮帐,并且 tableId之前沒有添加過嗅虏,就會添加觸發(fā)器
        if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
            //需要“同步”
            syncTriggers();
        }
    }
    
    //移除觀察者,在Observable dispose上沐,LiveData被回收時調(diào)用
    @WorkerThread
    public void removeObserver(@NonNull final Observer observer) {
        ObserverWrapper wrapper;
        synchronized (mObserverMap) {
            wrapper = mObserverMap.remove(observer);
        }
        if (wrapper != null && mObservedTableTracker.onRemoved(wrapper.mTableIds)) {
            //需要“同步”
            syncTriggers();
        }
    }
}

InvalidationTracker添加觀察者的主要流程是皮服,根據(jù)Observer要觀察的mTables,轉換成對應的tableIds奄容,之后把Observer包裝成ObserverWrapperObserverWrapper的主要作用是記錄下被觀察表的信息产徊,諸如tableIds, tableNames昂勒。最后,把觀察者保存下來舟铜,并且在必要時syncTriggers戈盈,顧名思義就是同步觸發(fā)器的意思(可能添加也可能移除觸發(fā)器)。
之前說過,只有在被觀察的表還未被“追蹤”時塘娶,才會添加觸發(fā)器归斤,如果某個表在之前就已經(jīng)被“追蹤”了,那自然不需要重復添加觸發(fā)器刁岸。而哪些表已經(jīng)被“追蹤”脏里,是否需要停止追蹤,這些信息保存在ObservedTableTracker中虹曙,這里就不展開其代碼了迫横。
移除觀察者就更簡單了,一目了然酝碳,就不廢話了矾踱。
我接著來看主線流程syncTriggers

public class InvalidationTracker {
    void syncTriggers() {
        if (!mDatabase.isOpen()) {
            return;
        }
        syncTriggers(mDatabase.getOpenHelper().getWritableDatabase());
    }
    
    void syncTriggers(SupportSQLiteDatabase database) {
        if (database.inTransaction()) {
            return;
        }
        try {
            // This method runs in a while loop because while changes are synced to db, another
            // runnable may be skipped. If we cause it to skip, we need to do its work.
            while (true) {
                Lock closeLock = mDatabase.getCloseLock();
                closeLock.lock();
                try {
                    //上文提到的 ObservedTableTracker
                    //返回為null,證明沒有要同步的疏哗,否則就是“有活要干”
                    final int[] tablesToSync = mObservedTableTracker.getTablesToSync();
                    if (tablesToSync == null) {
                        return;
                    }
                    final int limit = tablesToSync.length;
                    database.beginTransaction();
                    try {
                        for (int tableId = 0; tableId < limit; tableId++) {
                            switch (tablesToSync[tableId]) {
                                //活來了呛讲,可能是開始追蹤表
                                case ObservedTableTracker.ADD:
                                    startTrackingTable(database, tableId);
                                    break;
                                //也可能是停止追蹤表
                                case ObservedTableTracker.REMOVE:
                                    stopTrackingTable(database, tableId);
                                    break;
                            }
                        }
                        database.setTransactionSuccessful();
                    } finally {
                        database.endTransaction();
                    }
                    mObservedTableTracker.onSyncCompleted();
                } finally {
                    closeLock.unlock();
                }
            }
        } catch (IllegalStateException | SQLiteException exception) {
            Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
                    exception);
        }
    }
}

其實Room實現(xiàn)數(shù)據(jù)流的整體流程并不復雜,關鍵就是其中需要處理非常多的線程安全的問題返奉,導致源碼很復雜贝搁。syncTriggers就是這樣的,其主干就是startTrackingTable或者stopTrackingTable或者啥也不干衡瓶,但是考慮到線程安全的問題徘公,就需要加鎖,循環(huán)查詢等等哮针」孛妫總之,開啟追蹤是在startTrackingTable十厢,停止追蹤是在stopTrackingTable等太。

public class InvalidationTracker {
    private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
    
    //開啟追蹤,向room_table_modification_log插入數(shù)據(jù)蛮放,并添加觸發(fā)器
    private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
        writableDb.execSQL(
                "INSERT OR IGNORE INTO " + UPDATE_TABLE_NAME + " VALUES(" + tableId + ", 0)");
        final String tableName = mTableNames[tableId];
        StringBuilder stringBuilder = new StringBuilder();
        for (String trigger : TRIGGERS) {
            stringBuilder.setLength(0);
            stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
            appendTriggerName(stringBuilder, tableName, trigger);
            stringBuilder.append(" AFTER ")
                    .append(trigger)
                    .append(" ON `")
                    .append(tableName)
                    .append("` BEGIN UPDATE ")
                    .append(UPDATE_TABLE_NAME)
                    .append(" SET ").append(INVALIDATED_COLUMN_NAME).append(" = 1")
                    .append(" WHERE ").append(TABLE_ID_COLUMN_NAME).append(" = ").append(tableId)
                    .append(" AND ").append(INVALIDATED_COLUMN_NAME).append(" = 0")
                    .append("; END");
            writableDb.execSQL(stringBuilder.toString());
        }
    }
    
    //停止追蹤缩抡,只是丟棄觸發(fā)器,并不會刪除room_table_modification_log中的數(shù)據(jù)
    private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
        final String tableName = mTableNames[tableId];
        StringBuilder stringBuilder = new StringBuilder();
        for (String trigger : TRIGGERS) {
            stringBuilder.setLength(0);
            stringBuilder.append("DROP TRIGGER IF EXISTS ");
            appendTriggerName(stringBuilder, tableName, trigger);
            writableDb.execSQL(stringBuilder.toString());
        }
    }
    
    //觸發(fā)器的名稱
    private static void appendTriggerName(StringBuilder builder, String tableName,
            String triggerType) {
        builder.append("`")
                .append("room_table_modification_trigger_")
                .append(tableName)
                .append("_")
                .append(triggerType)
                .append("`");
    }
}

以User表為例(其table_id為0)包颁,那么開啟“追蹤”:

INSERT OR IGNORE INTO room_table_modification_log VALUES(0, 0)

CREATE TEMP TRIGGER IF NOT EXISTS `room_table_modification_trigger_user_UPDATE`
AFTER UPDATE ON `user`
BEGIN
UPDATE room_table_modification_log
SET invalidated = 1 WHERE table_id = 0 AND invalidated = 0;
END

會為UPDATE, DELETE, INSERT分別創(chuàng)建TRIGGER瞻想,TIRGGER中的內(nèi)容都是一樣的。

停止“追蹤”:

DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_UPDATE`
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_DELETE`
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_INSERT`

并不會把room_table_modification_log中相應的數(shù)據(jù)也刪了娩嚼,說不定下次還能用到蘑险,只是不再更新invalidated

4. 數(shù)據(jù)流

有了上面這些基礎岳悟,在看數(shù)據(jù)流的實現(xiàn)就比較簡單了佃迄。下面會以RxJava Observable數(shù)據(jù)流為例泼差,看看數(shù)據(jù)流究竟是如何實現(xiàn)的。

@Dao
interface UserDao {
    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserObservableById(userId: Int): Observable<User>
}

會生成如下代碼:

public final class UserDao_Impl implements UserDao {
  @Override
  public Observable<User> getUserObservableById(final int userId) {
    final String _sql = "SELECT * FROM user WHERE uid = ?";
    final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 1);
    int _argIndex = 1;
    _statement.bindLong(_argIndex, userId);
    //這里是關鍵
    return RxRoom.createObservable(__db, false, new String[]{"user"}, new Callable<User>() {
      @Override
      public User call() throws Exception {
        //以下是正常的數(shù)據(jù)庫查詢流程
        final Cursor _cursor = DBUtil.query(__db, _statement, false);
        try {
          final int _cursorIndexOfUid = CursorUtil.getColumnIndexOrThrow(_cursor, "uid");
          final int _cursorIndexOfFirstName = CursorUtil.getColumnIndexOrThrow(_cursor, "first_name");
          final int _cursorIndexOfLastName = CursorUtil.getColumnIndexOrThrow(_cursor, "last_name");
          final User _result;
          if(_cursor.moveToFirst()) {
            final int _tmpUid;
            _tmpUid = _cursor.getInt(_cursorIndexOfUid);
            final String _tmpFirstName;
            _tmpFirstName = _cursor.getString(_cursorIndexOfFirstName);
            final String _tmpLastName;
            _tmpLastName = _cursor.getString(_cursorIndexOfLastName);
            _result = new User(_tmpUid,_tmpFirstName,_tmpLastName);
          } else {
            _result = null;
          }
          return _result;
        } finally {
          _cursor.close();
        }
      }

      @Override
      protected void finalize() {
        _statement.release();
      }
    });
  }
}

跟普通查詢最大的不同就是以RxRoom.createObservable創(chuàng)建了Observable呵俏。來看一下RxRoom

public class RxRoom {
    //其中參數(shù)callable就是要進行的查詢
    public static <T> Observable<T> createObservable(final RoomDatabase database,
            final boolean inTransaction, final String[] tableNames, final Callable<T> callable) {
        //不用太在意這個scheduler堆缘,就是一個線程池,不同情況下用不同的線程池
        Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
        //fromCallable會天然地阻止 null普碎,如果callable返回null會被過濾掉
        final Maybe<T> maybe = Maybe.fromCallable(callable);
        return createObservable(database, tableNames)
                .subscribeOn(scheduler)
                .unsubscribeOn(scheduler)
                .observeOn(scheduler)
                .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                    @Override
                    public MaybeSource<T> apply(Object o) throws Exception {
                        return maybe;
                    }
                });
    }
    
    public static Observable<Object> createObservable(final RoomDatabase database,
            final String... tableNames) {
        return Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
                //新建一個InvalidationTracker.Observer
                final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
                        tableNames) {
                    //在觀察的表“失效”時回調(diào)
                    @Override
                    public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
                        //只是發(fā)射一個“信號”吼肥,表明需要重新查詢
                        emitter.onNext(NOTHING);
                    }
                };
                //向InvalidationTracker添加觀察者
                database.getInvalidationTracker().addObserver(observer);
                emitter.setDisposable(Disposables.fromAction(new Action() {
                    @Override
                    public void run() throws Exception {
                        //dispose時移除觀察者
                        database.getInvalidationTracker().removeObserver(observer);
                    }
                }));

                // emit once to avoid missing any data and also easy chaining
                emitter.onNext(NOTHING);
            }
        });
    }
}

很簡單,就是向InvalidationTracker添加觀察者随常,在Observabledispose的時候移除觀察者潜沦。因為RxJava 2不喜歡null,所以RxRoom的做法是僅僅發(fā)送數(shù)據(jù)“失效”的信號绪氛,然后使用flatMapMaybe操作符結合Maybe.fromCallable天然地過濾掉了null唆鸡,很巧妙(巧妙也不是一天達成的,在之前的版本中沒有使用這個操作符枣察,還是使用我們總是用的那種“笨拙”的方法過濾null)争占。


還差最后一個環(huán)節(jié)就能形成閉環(huán)了,那就是啥時候去查詢room_table_modification_log表序目,看看哪個被“追蹤”的表“失效”了臂痕。最合適的時機就是“增刪改”結束的時候:

public abstract class RoomDatabase {
    public void beginTransaction() {
        assertNotMainThread();
        SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();
        //Transaction開始時“同步”Trigger
        mInvalidationTracker.syncTriggers(database);
        database.beginTransaction();
    }

    //Transaction結束時refreshVersionsAsync
    public void endTransaction() {
        mOpenHelper.getWritableDatabase().endTransaction();
        if (!inTransaction()) {
            // enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
            // endTransaction call to do it.
            mInvalidationTracker.refreshVersionsAsync();
        }
    }
}


public class InvalidationTracker {
    public void refreshVersionsAsync() {
        if (mPendingRefresh.compareAndSet(false, true)) {
            mDatabase.getQueryExecutor().execute(mRefreshRunnable);
        }
    }
    
    Runnable mRefreshRunnable = new Runnable() {
        @Override
        public void run() {
            final Lock closeLock = mDatabase.getCloseLock();
            Set<Integer> invalidatedTableIds = null;
            try {
                closeLock.lock();

                if (!ensureInitialization()) {
                    return;
                }

                if (!mPendingRefresh.compareAndSet(true, false)) {
                    // no pending refresh
                    return;
                }

                if (mDatabase.inTransaction()) {
                    // current thread is in a transaction. when it ends, it will invoke
                    // refreshRunnable again. mPendingRefresh is left as false on purpose
                    // so that the last transaction can flip it on again.
                    return;
                }

                //查詢room_table_modification_log,獲取“失效”的表
                invalidatedTableIds = checkUpdatedTable();
            } catch (IllegalStateException | SQLiteException exception) {
                // may happen if db is closed. just log.
                Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
                        exception);
            } finally {
                closeLock.unlock();
            }
            //如果有“失效”的表
            if (invalidatedTableIds != null && !invalidatedTableIds.isEmpty()) {
                synchronized (mObserverMap) {
                    for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
                        //通知觀察者猿涨,如果是觀察者“關心”的表握童,就會回調(diào) onInvalidated
                        entry.getValue().notifyByTableInvalidStatus(invalidatedTableIds);
                    }
                }
            }
        }

        //查詢出room_table_modification_log中invalidated為1的,然后把它重置為0叛赚;返回的是 table_id的集合
        private Set<Integer> checkUpdatedTable() {
            //主要就是執(zhí)行下面SQL
            //SELECT * FROM room_table_modification_log WHERE invalidated = 1
            //UPDATE room_table_modification_log SET invalidated = 0 WHERE invalidated = 1
        }
    };

}

“增刪改”都是放在Transaction中的澡绩,因此endTransaction是個查詢room_table_modification_log的好時機。通過room_table_modification_log獲取“失效”的表俺附,如果InvalidationTracker的觀察者觀察的是“失效”表中的某一個或幾個肥卡,就會回調(diào)觀察者的onInvalidated方法。onInvalidated方法正如前面展示的那樣事镣,會發(fā)送一個信號步鉴,然后相應查詢就會被再次執(zhí)行,最新的數(shù)據(jù)就會被傳遞出去璃哟。至此氛琢,整個流程完整地閉合。

某個數(shù)據(jù)表“失效”僅僅是說這個表中的數(shù)據(jù)被修改了随闪,但是阳似,是不是修改的我們查詢關心的部分,其實并不知道蕴掏,有可能重新查詢的結果跟之前是一樣的障般。如果不想接收到重復的數(shù)據(jù),對于RxJava而言可以使用distinctUntilChanged來進行過濾盛杰,對于LiveData而言可以使用Transformations.distinctUntilChanged來過濾挽荡。

以上僅以RxJava Observable為例展示了數(shù)據(jù)流是如何實現(xiàn)的,對于Flowable而言是類似的即供。但是對于LiveData而言定拟,跟Observable就不太一樣了,中間多了很多處理(主要是防止在我們沒有保存LiveData的情況下逗嫡,LiveData被意外回收)青自,整個流程下來比較繁瑣,但是思路是一樣的驱证,這里就省略了延窜。

5. 總結

Room實現(xiàn)數(shù)據(jù)流的流程

圖中有一些不合適的地方,removeObserver并不是有Dao調(diào)用的抹锄,而是查詢返回的RaJava Observable或者LiveData“不再用”了的時候被各自調(diào)用的逆瑞。其中,“重新查詢”的意思就是數(shù)據(jù)流被更新伙单,數(shù)據(jù)流相關的RaJava Observable或者LiveData都沒有在包含在圖中获高。
能從Room分析的第一篇文章看到這里的,應該頒發(fā)一個最佳堅持獎吻育,請留言或者點贊讓我看見你們的雙手念秧。這是這一系列文章的最后一篇,文章題目叫“從Room源碼看抽象與封裝”布疼,主要是因為寫第一篇文章的時候摊趾,想從“抽象與封裝”作為切入點去分析Room的源碼,沒有想到會寫這么長缎除。前面幾篇文章還是比較符合題目的带欢,后面的文章基本上就剩源碼解析了,不是說這里面不包含抽象封裝的內(nèi)容衙傀,而是源碼已經(jīng)比較復雜了抵窒,再去談抽象封裝只會更加混亂『浞唬總之铸董,希望你有所收獲,歡迎留言與我交流粟害。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市颤芬,隨后出現(xiàn)的幾起案子悲幅,更是在濱河造成了極大的恐慌,老刑警劉巖吟孙,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異巷挥,居然都是意外死亡,警方通過查閱死者的電腦和手機凿宾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人楞件,你說我怎么就攤上這事。” “怎么了还最?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長辜梳。 經(jīng)常有香客問我作瞄,道長契耿,這世上最難降的妖魔是什么盯滚? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任背率,我火速辦了婚禮会油,結果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好罩抗,可當我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上亡脸,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天,我揣著相機與錄音根暑,去河邊找鬼怖糊。 笑死,一個胖子當著我的面吹牛耍贾,可吹牛的內(nèi)容都是我干的晃听。 我是一名探鬼主播辫狼,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼真椿,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了查库?” 一聲冷哼從身側響起樊销,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎拧揽,沒想到半個月后腺占,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體衰伯,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡怎顾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了寞钥。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片疙描。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖整份,靈堂內(nèi)的尸體忽然破棺而出待错,到底是詐尸還是另有隱情,我是刑警寧澤烈评,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布火俄,位于F島的核電站,受9級特大地震影響讲冠,放射性物質發(fā)生泄漏瓜客。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一竿开、第九天 我趴在偏房一處隱蔽的房頂上張望谱仪。 院中可真熱鬧,春花似錦否彩、人聲如沸疯攒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽敬尺。三九已至,卻和暖如春贴浙,著一層夾襖步出監(jiān)牢的瞬間砂吞,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工崎溃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蜻直,地道東北人。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓袁串,卻偏偏與公主長得像概而,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子囱修,可洞房花燭夜當晚...
    茶點故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容