前言
因?yàn)槲覀兂S玫腞xjava澳腹,所以這里會(huì)結(jié)合RxRoom做分析,所以需要你有Rxjava相關(guān)的知識(shí)儲(chǔ)備耿焊。
完整源碼參見(jiàn)googlesamples,可以自己跑一下,體驗(yàn)一下储笑。
簡(jiǎn)單用法
我在閱讀一份代碼時(shí)最喜歡的入手點(diǎn)就是先看怎么用,從用法去往前推導(dǎo)圆恤。
看下面的簡(jiǎn)單代碼,也就是所謂的三大組件突倍。
//首先是實(shí)體類對(duì)應(yīng)到數(shù)據(jù)庫(kù)也就是一個(gè)表
@Entity(tableName = "users")
public class User {
@PrimaryKey
@ColumnInfo(name = "userid")
private String mId;
@ColumnInfo(name = "username")
private String mUserName;
...
}
//數(shù)據(jù)操作類
@Dao
public interface UserDao {
@Query("SELECT * FROM users LIMIT 1")
Flowable<User> getUser();
@Insert(onConflict = OnConflictStrategy.REPLACE)
void insertUser(User user);
...
}
//Database
@Database(
entities = {User.class,UserGroup.class},
version = 1)
public abstract class UsersDatabase extends RoomDatabase {
private static volatile UsersDatabase INSTANCE;
public abstract UserDao userDao();
public static UsersDatabase getInstance(Context context) {
if (INSTANCE == null) {
synchronized (UsersDatabase.class) {
if (INSTANCE == null) {
INSTANCE =
Room.databaseBuilder(
context.getApplicationContext(), UsersDatabase.class, "Sample.db")
.build();
}
}
}
return INSTANCE;
}
}
定義好上面的3個(gè)東西,我們就能直接用來(lái)增刪改查
了盆昙。
val database = UsersDatabase.getInstance(this)
//插入一條數(shù)據(jù)
val user = User("kingty")
database.userDao().insertUser(user)
//監(jiān)聽(tīng)一個(gè)查詢的實(shí)時(shí)變化
database.userDao().getUser().subscribe {
Log.d("TAG", "table changed => " + it.userName + " => " + it.id)
}
用法非常簡(jiǎn)單羽历,做一些注解,它就自動(dòng)幫你完成了DAO
中的所有操作淡喜,并且還可以監(jiān)聽(tīng)數(shù)據(jù)庫(kù)的變化實(shí)時(shí)更新數(shù)據(jù)秕磷。這一切看起來(lái)比較夢(mèng)幻。那么問(wèn)題來(lái)了炼团,它是怎么做到這一切的澎嚣?
怎么通過(guò)注解就可以操作數(shù)據(jù)庫(kù)了疏尿?
其實(shí)ORM
的本質(zhì)就是用一些手段把你的數(shù)據(jù)類
和操作
變成SQL語(yǔ)句
而已。而這一切無(wú)非就是兩種手段易桃,編譯時(shí)做(apt)或者運(yùn)行時(shí)做(reflect)褥琐。出于效率考慮現(xiàn)在大部分都是編譯時(shí)做。編譯以下晤郑,所以我們來(lái)翻一翻build
目錄踩衩,看一下Room給我們生成了一些什么東西。
- build
- generate
- source
- apt
- com.kingty.roomtest
- UserDao_Impl.java
- UsersDatabase_Impl.java
- com.kingty.roomtest
- apt
- source
- generate
就發(fā)現(xiàn)在上面這個(gè)目錄下生成了兩個(gè)類贩汉,這里我們不深究這里是怎么生成這兩個(gè)類的驱富,說(shuō)起來(lái)可以說(shuō)兩天兩夜。其實(shí)是我也不懂匹舞。有興趣的同學(xué)應(yīng)該早就知道了褐鸥,沒(méi)興趣的我說(shuō)不說(shuō)也無(wú)所謂了〈突總之就是叫榕,編譯的時(shí)候通過(guò)我們剛才那些個(gè)注解給我們生成了真正的實(shí)現(xiàn)類,幫助我們完成了我們想要的操作姊舵。
來(lái)讓我們看一下里面都生成了一些什么晰绎?
先看一下UserDao_Impl.java
這個(gè)類做了什么?
x@SuppressWarnings("unchecked")
public final class UserDao_Impl implements UserDao {
private final RoomDatabase __db;
private final EntityInsertionAdapter __insertionAdapterOfUser;
private final SharedSQLiteStatement __preparedStmtOfDeleteAllUsers;
public UserDao_Impl(RoomDatabase __db) {
this.__db = __db;
this.__insertionAdapterOfUser = new EntityInsertionAdapter<User>(__db) {
@Override
public String createQuery() {
return "INSERT OR REPLACE INTO `users`(`userid`,`username`) VALUES (?,?)";
}
@Override
public void bind(SupportSQLiteStatement stmt, User value) {
if (value.getId() == null) {
stmt.bindNull(1);
} else {
stmt.bindString(1, value.getId());
}
if (value.getUserName() == null) {
stmt.bindNull(2);
} else {
stmt.bindString(2, value.getUserName());
}
}
};
this.__preparedStmtOfDeleteAllUsers = new SharedSQLiteStatement(__db) {
@Override
public String createQuery() {
final String _query = "DELETE FROM Users";
return _query;
}
};
}
@Override
public void insertUser(User user) {
__db.beginTransaction();
try {
__insertionAdapterOfUser.insert(user);
__db.setTransactionSuccessful();
} finally {
__db.endTransaction();
}
}
@Override
public void deleteAllUsers() {
final SupportSQLiteStatement _stmt = __preparedStmtOfDeleteAllUsers.acquire();
__db.beginTransaction();
try {
_stmt.executeUpdateDelete();
__db.setTransactionSuccessful();
} finally {
__db.endTransaction();
__preparedStmtOfDeleteAllUsers.release(_stmt);
}
}
@Override
public Flowable<User> getUser() {
final String _sql = "SELECT * FROM users LIMIT 1";
final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0);
return RxRoom.createFlowable(__db, new String[]{"users"}, new Callable<User>() {
@Override
public User call() throws Exception {
final Cursor _cursor = DBUtil.query(__db, _statement, false);
try {
final int _cursorIndexOfMId = _cursor.getColumnIndexOrThrow("userid");
final int _cursorIndexOfMUserName = _cursor.getColumnIndexOrThrow("username");
final User _result;
if(_cursor.moveToFirst()) {
final String _tmpMId;
_tmpMId = _cursor.getString(_cursorIndexOfMId);
final String _tmpMUserName;
_tmpMUserName = _cursor.getString(_cursorIndexOfMUserName);
_result = new User(_tmpMId,_tmpMUserName);
} else {
_result = null;
}
return _result;
} finally {
_cursor.close();
}
}
@Override
protected void finalize() {
_statement.release();
}
});
}
}
首先括丁,在初始化UserDao_Impl
的時(shí)候通過(guò)EntityInsertionAdapter
幫你創(chuàng)建了插入數(shù)據(jù)的adapter
荞下,里面有插入數(shù)據(jù)的sql模板,和綁定數(shù)據(jù)的方法史飞。通俗一點(diǎn)說(shuō)就是在這里幫你拼好了插入數(shù)據(jù)的SQL語(yǔ)句尖昏。接下來(lái)就是幫你實(shí)現(xiàn)了你在UserDao
中定義的接口。
增刪改
的套路大概類似构资,都是__db.beginTransaction();
然后執(zhí)行拼好的SQL語(yǔ)句抽诉,然后__db.endTransaction();
注意此DB非彼DB,這個(gè)DB是封裝過(guò)的RoomDatabase吐绵。后面我們會(huì)著重講一下這個(gè)beginTransaction
和endTransaction
,他們還是比較重要的一個(gè)環(huán)節(jié)迹淌。 重要環(huán)節(jié)一
。
查
的套路就不一樣了,為什么它不一樣己单,public Flowable<User> getUser()
這個(gè)方法明顯看起來(lái)大坨一些唉窃,這就是它為什么不一樣。簡(jiǎn)單閱讀一下荷鼠,它其實(shí)利用RxRoom創(chuàng)建了一個(gè)Flowable
,其中有3個(gè)參數(shù)我們注意一下句携,第一個(gè)是__db
也就是database,第二個(gè)是new String[]{"users"}
,是一個(gè)表名的數(shù)組,第3個(gè)就是查詢完成之后組裝成User
的回調(diào)。特別注意的是第二個(gè)參數(shù)允乐,這個(gè)表名的數(shù)組的作用矮嫉。這是重要環(huán)節(jié)二
。
再看看UsersDatabase_Impl.java
這個(gè)類的生成了什么牍疏?
@SuppressWarnings("unchecked")
public final class UsersDatabase_Impl extends UsersDatabase {
private volatile UserDao _userDao;
@Override
protected SupportSQLiteOpenHelper createOpenHelper(DatabaseConfiguration configuration) {
final SupportSQLiteOpenHelper.Callback _openCallback = new RoomOpenHelper(configuration, new RoomOpenHelper.Delegate(1) {
@Override
public void createAllTables(SupportSQLiteDatabase _db) {
_db.execSQL("CREATE TABLE IF NOT EXISTS `users` (`userid` TEXT NOT NULL, `username` TEXT, PRIMARY KEY(`userid`))");
_db.execSQL("CREATE TABLE IF NOT EXISTS `usergroups` (`userGroupId` TEXT NOT NULL, `groupName` TEXT, PRIMARY KEY(`userGroupId`))");
_db.execSQL("CREATE TABLE IF NOT EXISTS room_master_table (id INTEGER PRIMARY KEY,identity_hash TEXT)");
_db.execSQL("INSERT OR REPLACE INTO room_master_table (id,identity_hash) VALUES(42, \"8890a9730e4846f27da03382221fc877\")");
}
@Override
public void dropAllTables(SupportSQLiteDatabase _db) {
_db.execSQL("DROP TABLE IF EXISTS `users`");
_db.execSQL("DROP TABLE IF EXISTS `usergroups`");
}
@Override
protected void onCreate(SupportSQLiteDatabase _db) {
if (mCallbacks != null) {
for (int _i = 0, _size = mCallbacks.size(); _i < _size; _i++) {
mCallbacks.get(_i).onCreate(_db);
}
}
}
@Override
public void onOpen(SupportSQLiteDatabase _db) {
mDatabase = _db;
internalInitInvalidationTracker(_db);
if (mCallbacks != null) {
for (int _i = 0, _size = mCallbacks.size(); _i < _size; _i++) {
mCallbacks.get(_i).onOpen(_db);
}
}
}
@Override
public void onPreMigrate(SupportSQLiteDatabase _db) {
DBUtil.dropFtsSyncTriggers(_db);
}
@Override
public void onPostMigrate(SupportSQLiteDatabase _db) {
}
@Override
protected void validateMigration(SupportSQLiteDatabase _db) {
final HashMap<String, TableInfo.Column> _columnsUsers = new HashMap<String, TableInfo.Column>(2);
_columnsUsers.put("userid", new TableInfo.Column("userid", "TEXT", true, 1));
_columnsUsers.put("username", new TableInfo.Column("username", "TEXT", false, 0));
final HashSet<TableInfo.ForeignKey> _foreignKeysUsers = new HashSet<TableInfo.ForeignKey>(0);
final HashSet<TableInfo.Index> _indicesUsers = new HashSet<TableInfo.Index>(0);
final TableInfo _infoUsers = new TableInfo("users", _columnsUsers, _foreignKeysUsers, _indicesUsers);
final TableInfo _existingUsers = TableInfo.read(_db, "users");
if (! _infoUsers.equals(_existingUsers)) {
throw new IllegalStateException("Migration didn't properly handle users(com.kingty.roomtest.User).\n"
+ " Expected:\n" + _infoUsers + "\n"
+ " Found:\n" + _existingUsers);
}
final HashMap<String, TableInfo.Column> _columnsUsergroups = new HashMap<String, TableInfo.Column>(2);
_columnsUsergroups.put("userGroupId", new TableInfo.Column("userGroupId", "TEXT", true, 1));
_columnsUsergroups.put("groupName", new TableInfo.Column("groupName", "TEXT", false, 0));
final HashSet<TableInfo.ForeignKey> _foreignKeysUsergroups = new HashSet<TableInfo.ForeignKey>(0);
final HashSet<TableInfo.Index> _indicesUsergroups = new HashSet<TableInfo.Index>(0);
final TableInfo _infoUsergroups = new TableInfo("usergroups", _columnsUsergroups, _foreignKeysUsergroups, _indicesUsergroups);
final TableInfo _existingUsergroups = TableInfo.read(_db, "usergroups");
if (! _infoUsergroups.equals(_existingUsergroups)) {
throw new IllegalStateException("Migration didn't properly handle usergroups(com.kingty.roomtest.UserGroup).\n"
+ " Expected:\n" + _infoUsergroups + "\n"
+ " Found:\n" + _existingUsergroups);
}
}
}, "8890a9730e4846f27da03382221fc877", "1fdb937160bfb054175cfe5daf922b3b");
final SupportSQLiteOpenHelper.Configuration _sqliteConfig = SupportSQLiteOpenHelper.Configuration.builder(configuration.context)
.name(configuration.name)
.callback(_openCallback)
.build();
final SupportSQLiteOpenHelper _helper = configuration.sqliteOpenHelperFactory.create(_sqliteConfig);
return _helper;
}
@Override
protected InvalidationTracker createInvalidationTracker() {
final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0);
HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0);
return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "users","usergroups");
}
@Override
public void clearAllTables() {
super.assertNotMainThread();
final SupportSQLiteDatabase _db = super.getOpenHelper().getWritableDatabase();
try {
super.beginTransaction();
_db.execSQL("DELETE FROM `users`");
_db.execSQL("DELETE FROM `usergroups`");
super.setTransactionSuccessful();
} finally {
super.endTransaction();
_db.query("PRAGMA wal_checkpoint(FULL)").close();
if (!_db.inTransaction()) {
_db.execSQL("VACUUM");
}
}
}
@Override
public UserDao userDao() {
if (_userDao != null) {
return _userDao;
} else {
synchronized(this) {
if(_userDao == null) {
_userDao = new UserDao_Impl(this);
}
return _userDao;
}
}
}
}
這個(gè)類中邏輯比較清晰蠢笋。首先是創(chuàng)建了一個(gè)SupportSQLiteOpenHelper
來(lái)幫你拼了一些必要的SQL語(yǔ)句,比如create table,drop table,open和migrate遷移數(shù)據(jù)等等鳞陨。后面還有一個(gè)初始化真正的DAO實(shí)現(xiàn)類UserDao_Impl
和刪除相關(guān)的表數(shù)據(jù)的方法clearAllTables
這些都是一些比較好理解的昨寞。然后我們會(huì)看到一個(gè)我們不好理解的方法createInvalidationTracker ()
,這個(gè)是用來(lái)做什么的?我們先把這個(gè)疑問(wèn)留下厦滤,叫做 重要環(huán)節(jié)三
正式初略閱讀Room的源碼
下面我們提出幾個(gè)問(wèn)題:
- 在上面生成的代碼中我們看到操作的執(zhí)行都是通過(guò)一個(gè)叫
RoomDatabase
的_db
來(lái)做的援岩,那RoomDatabase
是什么? - 重要環(huán)節(jié)一掏导,在執(zhí)行前后
beginTransaction
和endTransaction
做了什么? - 重要環(huán)節(jié)二享怀,創(chuàng)建
Flowable
的時(shí)候做了什么,為什么需要table names
? - 重要環(huán)節(jié)三趟咆,
createInvalidationTracker()
是做什么用的添瓷? - 最后,串聯(lián)起上面的問(wèn)題值纱,當(dāng)一個(gè)表發(fā)生更改鳞贷,監(jiān)聽(tīng)一個(gè)查詢的實(shí)時(shí)變化是怎么做到的?
帶著這幾個(gè)問(wèn)題虐唠,我們大體的去閱讀一下源碼搀愧。閱讀過(guò)程中我們有一個(gè)原則,就是先不要特別在意細(xì)節(jié)疆偿,先捋通大概的邏輯流程妈橄。如果你對(duì)細(xì)節(jié)感興趣,再去扣細(xì)節(jié)翁脆。
在項(xiàng)目中加以下引用
implementation 'androidx.room:room-runtime:2.1.0-alpha01'
annotationProcessor 'androidx.room:room-compiler:2.1.0-alpha01'
implementation 'androidx.room:room-rxjava2:2.1.0-alpha01'
編譯之后我們可以在External Libraries
目錄下看到以下幾個(gè)包:
- androidx.room:room-commom
- androidx.room:room-runtime
- androidx.room:room-rxjava
- androidx.sqlite:sqlite
- androidx.sqlite:sqlite-framework
我先簡(jiǎn)單的介紹下這幾個(gè)包大概是做什么的眷蚓。
androidx.sqlite:sqlite
這個(gè)包主要是重新定義了一層SQLite的Support接口。
androidx.sqlite:sqlite-framework
這個(gè)包主要是利用原有的android的Sqlite
相關(guān)的API實(shí)現(xiàn)了上面定義的接口反番。
這兩個(gè)包主要是對(duì)原有的API做了一層代理封裝沙热,我的理解是便于擴(kuò)展。因此我們?cè)诳?code>Room代碼的時(shí)候這部分代碼大概瀏覽一下就OK罢缸,不必深究篙贸。
androidx.room:room-commom
包中定義了一些公共的屬性,和我們用到的所有的注解枫疆。
androidx.room:room-runtime
是我們需要主要閱讀的邏輯所在的包爵川,Room的核心邏輯都在這個(gè)包中。
androidx.room:room-rxjava
當(dāng)我們需要返回一個(gè)Rx包裝過(guò)的結(jié)果的時(shí)候息楔,需要這個(gè)包寝贡。里面就是一個(gè)重要類RxRoom.java
用來(lái)把Query包裝成一個(gè)可觀察的對(duì)象扒披。
下面我們帶著上面的問(wèn)題來(lái)看一下代碼。
RoomDatabase
是做什么的圃泡?
代碼太長(zhǎng)就不全貼碟案,我們看一下它持有的成員變量
protected volatile SupportSQLiteDatabase mDatabase;
private Executor mQueryExecutor;
private SupportSQLiteOpenHelper mOpenHelper;
private final InvalidationTracker mInvalidationTracker;
private boolean mAllowMainThreadQueries;
boolean mWriteAheadLoggingEnabled;
它其實(shí)是對(duì)數(shù)據(jù)庫(kù)的進(jìn)一步封裝,利用真正的SupportSQLiteDatabase
和你UsersDatabase_Impl
自動(dòng)生成的createOpenHelper()
提供的SupportSQLiteOpenHelper
來(lái)操作數(shù)據(jù)庫(kù)颇蜡。進(jìn)一步封裝了Transcation
并封裝了一些其他的邏輯价说。
beginTransaction
和endTransaction
做了什么?
實(shí)際上這也是上一個(gè)問(wèn)題的一部分,先看看beginTransaction
的代碼
/**
* Wrapper for {@link SupportSQLiteDatabase#beginTransaction()}.
*/
public void beginTransaction() {
assertNotMainThread();//禁止主線程執(zhí)行
SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();//拿到真正的數(shù)據(jù)庫(kù)對(duì)象
mInvalidationTracker.syncTriggers(database);//??
database.beginTransaction();//開(kāi)啟事務(wù)
}
從上面的代碼來(lái)看其他3句都非常好理解风秤,正常的數(shù)據(jù)庫(kù)事務(wù)流程鳖目,但是在開(kāi)啟事務(wù)之前做了一個(gè)操作mInvalidationTracker.syncTriggers(database);
我們先不忙解釋這個(gè)是什么意思。我們?cè)诳纯?code>endTransaction
/**
* Wrapper for {@link SupportSQLiteDatabase#endTransaction()}.
*/
public void endTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();//結(jié)束事務(wù)
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
第一句是正常的結(jié)束事務(wù)的語(yǔ)句缤弦,但是結(jié)束之后等待最后一個(gè)事務(wù)結(jié)束领迈,會(huì)做一個(gè)操作mInvalidationTracker.refreshVersionsAsync();
也就是說(shuō)在開(kāi)啟事務(wù)之前,結(jié)束事務(wù)之后都調(diào)用了InvalidationTracker做了一些邏輯甸鸟,再結(jié)合上面的第四個(gè)問(wèn)題惦费,重要環(huán)節(jié)三createInvalidationTracker()是做什么用的?
,一切問(wèn)題都指向了InvalidationTracker
抢韭。
InvalidationTracker
這個(gè)類是什么作用薪贫,我們先從上面的第四個(gè)問(wèn)題看起。
createInvalidationTracker()
是做什么用的刻恭?
下面是在自動(dòng)生成的UsersDatabase_Impl.java
類中的方法
@Override
protected InvalidationTracker createInvalidationTracker() {
final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0);
HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0);
return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "users","usergroups");
}
在RoomDatabase在被初始化的時(shí)候調(diào)用這個(gè)方法賦值給成員變量
/**
* Creates a RoomDatabase.
* <p>
* You cannot create an instance of a database, instead, you should acquire it via
* {@link Room#databaseBuilder(Context, Class, String)} or
* {@link Room#inMemoryDatabaseBuilder(Context, Class)}.
*/
public RoomDatabase() {
mInvalidationTracker = createInvalidationTracker();
}
因此回答上面的問(wèn)題就是createInvalidationTracker()
給RoomDatabase提供了一個(gè)mInvalidationTracker實(shí)例瞧省。
mInvalidationTracker起的作用是什么?
我們來(lái)看一下RoomDatabase.java
中的調(diào)用流程鳍贾。
1.初始化
public RoomDatabase() {
mInvalidationTracker = createInvalidationTracker();
}
2.init中調(diào)用mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,configuration.name);
@CallSuper
public void init(@NonNull DatabaseConfiguration configuration) {
mOpenHelper = createOpenHelper(configuration);
boolean wal = false;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN) {
wal = configuration.journalMode == JournalMode.WRITE_AHEAD_LOGGING;
mOpenHelper.setWriteAheadLoggingEnabled(wal);
}
mCallbacks = configuration.callbacks;
mQueryExecutor = configuration.queryExecutor;
mAllowMainThreadQueries = configuration.allowMainThreadQueries;
mWriteAheadLoggingEnabled = wal;
if (configuration.multiInstanceInvalidation) {
mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,
configuration.name);
}
}
3.每次Transaction 開(kāi)始之前調(diào)用mInvalidationTracker.syncTriggers
/**
* Wrapper for {@link SupportSQLiteDatabase#beginTransaction()}.
*/
public void beginTransaction() {
assertNotMainThread();
SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();
mInvalidationTracker.syncTriggers(database);
database.beginTransaction();
}
4. 最后一個(gè)Transaction 結(jié)束之后調(diào)用mInvalidationTracker.refreshVersionsAsync
/**
* Wrapper for {@link SupportSQLiteDatabase#endTransaction()}.
*/
public void endTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
5鞍匾,close數(shù)據(jù)庫(kù)的時(shí)候mInvalidationTracker.stopMultiInstanceInvalidation();與第2步對(duì)應(yīng)。
/**
* Closes the database if it is already open.
*/
public void close() {
if (isOpen()) {
try {
mCloseLock.lock();
mInvalidationTracker.stopMultiInstanceInvalidation();
mOpenHelper.close();
} finally {
mCloseLock.unlock();
}
}
}
上面就是InvalidationTracker在RoomDatabase中的整個(gè)生命周期中的調(diào)用情況骑科。從代碼上來(lái)看它其實(shí)是在track整個(gè)數(shù)據(jù)的更改情況橡淑,因?yàn)樗诿總€(gè)transcation前后做了一些調(diào)用。結(jié)合上面最后的一個(gè)問(wèn)題當(dāng)一個(gè)表發(fā)生更改咆爽,監(jiān)聽(tīng)一個(gè)查詢的實(shí)時(shí)變化是怎么做到的
梁棠。大概可以猜測(cè)出來(lái)這個(gè)類的主要作用是來(lái)保證數(shù)據(jù)發(fā)生更改的時(shí)候,保證可以通知到這個(gè)表上其他的Query斗埂。
怎么樣實(shí)現(xiàn)的監(jiān)聽(tīng)符糊?
我們發(fā)現(xiàn)上面還有一個(gè)問(wèn)題我們還沒(méi)有提到重要環(huán)節(jié)二,創(chuàng)建Flowable的時(shí)候做了什么呛凶,為什么需要table names?
我們從這里入手講起男娄。先看下面RxRoom
中的代碼
public static Flowable<Object> createFlowable(final RoomDatabase database,
final String... tableNames) {
return Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
tableNames) {
@Override
public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
if (!emitter.isCancelled()) {
emitter.onNext(NOTHING);
}
}
};
if (!emitter.isCancelled()) {
database.getInvalidationTracker().addObserver(observer);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
database.getInvalidationTracker().removeObserver(observer);
}
}));
}
// emit once to avoid missing any data and also easy chaining
if (!emitter.isCancelled()) {
emitter.onNext(NOTHING);
}
}
}, BackpressureStrategy.LATEST);
}
/**
* Helper method used by generated code to bind a Callable such that it will be run in
* our disk io thread and will automatically block null values since RxJava2 does not like null.
*
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public static <T> Flowable<T> createFlowable(final RoomDatabase database,
final String[] tableNames, final Callable<T> callable) {
Scheduler scheduler = Schedulers.from(database.getQueryExecutor());
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createFlowable(database, tableNames)
.observeOn(scheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});
}
在上面生成的UserDao_Impl.java
類中getUser()
這個(gè)方法中調(diào)用的createFlowable
這個(gè)方法,也就是上面的第二個(gè)方法,它實(shí)際上調(diào)用的上面的第一個(gè)方法flatmap到這個(gè)本次的查詢模闲。也就是說(shuō)只要第一個(gè)方法中的Flowable發(fā)射一次數(shù)據(jù)建瘫,那么這個(gè)查詢就會(huì)執(zhí)行一次,并返回結(jié)果(也就是執(zhí)行這個(gè)callable)围橡。這里應(yīng)該就能看出一點(diǎn)端倪暖混,其實(shí)第一個(gè)方法就是創(chuàng)建出來(lái)一個(gè)觀察這個(gè)表變化的觀察者InvalidationTracker.Observer
并把它添加到InvalidationTracker的觀察者列表中去缕贡,因?yàn)橐粋€(gè)表肯定不止一個(gè)觀察者翁授,所有的Query應(yīng)該都需要觀察表的更改。也就是上面的這行代碼database.getInvalidationTracker().addObserver(observer);
到這里RxRoom
這個(gè)類的使命就完成了晾咪,他就是這樣一個(gè)簡(jiǎn)單的功能收擦,后面你也不需要再關(guān)心它。
InvalidationTracker.Observer
是一個(gè)靜態(tài)類谍倦,就注意一下其中的一個(gè)方法
/**
* Called when one of the observed tables is invalidated in the database.
*
* @param tables A set of invalidated tables. This is useful when the observer targets
* multiple tables and you want to know which table is invalidated. This will
* be names of underlying tables when you are observing views.
public abstract void onInvalidated(@NonNull Set<String> tables);
從備注上已經(jīng)寫的很清楚了塞赂,就是表發(fā)生更改狀態(tài)的時(shí)候會(huì)調(diào)用這個(gè)方法,emitter
就會(huì)發(fā)射數(shù)據(jù)昼蛀,通知Query
去requery.
我們著重看一下addObserver
這個(gè)方法干了什么宴猾?
@WorkerThread
public void addObserver(@NonNull Observer observer) {
final String[] tableNames = resolveViews(observer.mTables);
int[] tableIds = new int[tableNames.length];
final int size = tableNames.length;
long[] versions = new long[tableNames.length];
// TODO sync versions ?
for (int i = 0; i < size; i++) {
Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
if (tableId == null) {
throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
}
tableIds[i] = tableId;
versions[i] = mMaxVersion;
}
ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames, versions);
ObserverWrapper currentObserver;
synchronized (mObserverMap) {
currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
}
if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
syncTriggers();
}
}
首先對(duì)Observer
做了一層包裝,主要就是包裝了當(dāng)表發(fā)生變化的時(shí)候通過(guò)各種方式去通知也就是執(zhí)行mObserver.onInvalidated(invalidatedTables);
,接下來(lái)叼旋,把包裝后的wrapper放進(jìn)map里仇哆。然后在滿足特定條件下會(huì)執(zhí)行syncTriggers();
這個(gè)似曾相識(shí),在上面RoomDatabase
開(kāi)始一個(gè)事務(wù)之前也執(zhí)行這個(gè)方法夫植。我們來(lái)仔細(xì)看看這個(gè)方法做了什么讹剔。
void syncTriggers(SupportSQLiteDatabase database) {
if (database.inTransaction()) {
// we won't run this inside another transaction.
return;
}
try {
// This method runs in a while loop because while changes are synced to db, another
// runnable may be skipped. If we cause it to skip, we need to do its work.
while (true) {
Lock closeLock = mDatabase.getCloseLock();
closeLock.lock();
try {
// there is a potential race condition where another mSyncTriggers runnable
// can start running right after we get the tables list to sync.
final int[] tablesToSync = mObservedTableTracker.getTablesToSync();
if (tablesToSync == null) {
return;
}
final int limit = tablesToSync.length;
try {
database.beginTransaction();
for (int tableId = 0; tableId < limit; tableId++) {
switch (tablesToSync[tableId]) {
case ObservedTableTracker.ADD:
startTrackingTable(database, tableId);
break;
case ObservedTableTracker.REMOVE:
stopTrackingTable(database, tableId);
break;
}
}
database.setTransactionSuccessful();
} finally {
database.endTransaction();
}
mObservedTableTracker.onSyncCompleted();
} finally {
closeLock.unlock();
}
}
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
}
}
這個(gè)方法看起來(lái)很長(zhǎng),其實(shí)是在做一件事.ObservedTableTracker
維護(hù)了一個(gè)需要被觀察的表的列表详民,就是發(fā)現(xiàn)有新的表需要被觀察就執(zhí)行startTrackingTable(database, tableId);
,有表不需要被觀察了就執(zhí)行stopTrackingTable(database, tableId);
延欠。
繼續(xù)往下看,看看這兩個(gè)方法做了什么沈跨?
private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
final String tableName = mShadowTableLookup.get(tableId, mTableNames[tableId]);
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("DROP TRIGGER IF EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
writableDb.execSQL(stringBuilder.toString());
}
}
private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
final String tableName = mShadowTableLookup.get(tableId, mTableNames[tableId]);
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
stringBuilder.append(" AFTER ")
.append(trigger)
.append(" ON `")
.append(tableName)
.append("` BEGIN INSERT OR REPLACE INTO ")
.append(UPDATE_TABLE_NAME)
.append(" VALUES(null, ")
.append(tableId)
.append("); END");
writableDb.execSQL(stringBuilder.toString());
}
}
插曲: InvalidationTracker自己維護(hù)了一個(gè)叫room_table_modification_log
的表由捎,有兩個(gè)字段,一個(gè)是version
它是自增的饿凛,還有一個(gè)是table_id狞玛,是被觀察的表的標(biāo)識(shí)。
其實(shí)就是當(dāng)需要去觀察一個(gè)表的時(shí)候startTrackingTable ()
就在數(shù)據(jù)庫(kù)上創(chuàng)建了三個(gè)數(shù)據(jù)庫(kù)的 Trigger
笤喳。關(guān)于Trigger
是什么這是數(shù)據(jù)庫(kù)基礎(chǔ)知識(shí)为居,請(qǐng)自備。也就是說(shuō)杀狡,只要在這個(gè)表上發(fā)生了插入修改或者刪除蒙畴,就會(huì)往room_table_modification_log
表里面插入一條數(shù)據(jù)INSERT OR REPLACE INTO room_table_modification_log VALUES(null, table_id)
。
當(dāng)不需要觀察一個(gè)表的時(shí)候,就通過(guò)stopTrackingTable
把這三個(gè)Trigger
刪除掉膳凝。
以上就是我們?cè)趧?chuàng)建一個(gè)Query做的事情碑隆。
我們先對(duì)創(chuàng)建一個(gè)Query的流程做一個(gè)小的總結(jié):
- 通過(guò)自動(dòng)生成的代碼創(chuàng)建一個(gè)Flowable
- RxRoom會(huì)根據(jù)這個(gè)Flowable創(chuàng)建一個(gè)InvalidationTracker.Observer
- InvalidationTracker把這個(gè)Observer加到自己的觀察列表中
- 如果之前沒(méi)有人觀察過(guò)這個(gè)表,會(huì)去創(chuàng)建這個(gè)表上修改的Trigger
到這里蹬音,我們似乎應(yīng)該有一點(diǎn)頭緒了上煤,既然每次有數(shù)據(jù)更新的時(shí)候就會(huì)往這個(gè)表中插入一條數(shù)據(jù),那在每一個(gè)Trascation
結(jié)束之后去查這個(gè)表就應(yīng)該可以知道哪些表上的Query可以更新著淆。所以我們回到上面的RoomDatabase
中看看endTrasction
之后的mInvalidationTracker.refreshVersionsAsync();
到底做了什么劫狠?
/**
* Enqueues a task to refresh the list of updated tables.
* <p>
* This method is automatically called when {@link RoomDatabase#endTransaction()} is called but
* if you have another connection to the database or directly use {@link
* SupportSQLiteDatabase}, you may need to call this manually.
*/
@SuppressWarnings("WeakerAccess")
public void refreshVersionsAsync() {
// TODO we should consider doing this sync instead of async.
if (mPendingRefresh.compareAndSet(false, true)) {
mDatabase.getQueryExecutor().execute(mRefreshRunnable);
}
}
@VisibleForTesting
Runnable mRefreshRunnable = new Runnable() {
@Override
public void run() {
final Lock closeLock = mDatabase.getCloseLock();
boolean hasUpdatedTable = false;
try {
closeLock.lock();
if (!ensureInitialization()) {
return;
}
if (!mPendingRefresh.compareAndSet(true, false)) {
// no pending refresh
return;
}
if (mDatabase.inTransaction()) {
// current thread is in a transaction. when it ends, it will invoke
// refreshRunnable again. mPendingRefresh is left as false on purpose
// so that the last transaction can flip it on again.
return;
}
mCleanupStatement.executeUpdateDelete();
mQueryArgs[0] = mMaxVersion;
if (mDatabase.mWriteAheadLoggingEnabled) {
// This transaction has to be on the underlying DB rather than the RoomDatabase
// in order to avoid a recursive loop after endTransaction.
SupportSQLiteDatabase db = mDatabase.getOpenHelper().getWritableDatabase();
try {
db.beginTransaction();
hasUpdatedTable = checkUpdatedTable();
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
} else {
hasUpdatedTable = checkUpdatedTable();
}
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
} finally {
closeLock.unlock();
}
if (hasUpdatedTable) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
entry.getValue().notifyByTableVersions(mTableVersions);
}
}
}
}
private boolean checkUpdatedTable() {
boolean hasUpdatedTable = false;
Cursor cursor = mDatabase.query(SELECT_UPDATED_TABLES_SQL, mQueryArgs);
//noinspection TryFinallyCanBeTryWithResources
try {
while (cursor.moveToNext()) {
final long version = cursor.getLong(0);
final int tableId = cursor.getInt(1);
mTableVersions[tableId] = version;
hasUpdatedTable = true;
// result is ordered so we can safely do this assignment
mMaxVersion = version;
}
} finally {
cursor.close();
}
return hasUpdatedTable;
}
}
static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME
+ " WHERE " + VERSION_COLUMN_NAME
+ " > ? ORDER BY " + VERSION_COLUMN_NAME + " ASC;";
它實(shí)際上是執(zhí)行了mRefreshRunnable
的,這個(gè)runnerable的邏輯非常清晰,先做一些邊界檢測(cè)永部,然后去checkUpdatedTable
,看有沒(méi)有用表在變化独泞,怎么檢測(cè)√β瘢看上面的sql語(yǔ)句懦砂,就是去查room_table_modification_log
中相同的table_id的version,如果有大于之前保存的maxversion的數(shù)據(jù)组橄,說(shuō)明有新的修改荞膘。然后調(diào)用ObserverWrapper 中的notifyByTableVersions
去通知表上的觀察者。
這也就回到了上面最后一個(gè)問(wèn)題當(dāng)一個(gè)表發(fā)生更改玉工,監(jiān)聽(tīng)一個(gè)查詢的實(shí)時(shí)變化是怎么做到的羽资?
。
MultiInstanceInvalidation
到這里我們還漏了一點(diǎn)沒(méi)有講到瓮栗。那就是剛才說(shuō)InvalidationTracker在RoomDatabase中的整個(gè)生命周期中的調(diào)用情況的時(shí)候還有初始化的時(shí)候和關(guān)閉數(shù)據(jù)庫(kù)的時(shí)候執(zhí)行了
mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,configuration.name);
和
mInvalidationTracker.stopMultiInstanceInvalidation();
因?yàn)槲覀冊(cè)谝弥胁豢赡苡肋h(yuǎn)是單標(biāo)上的查詢削罩。也就是說(shuō)我們一個(gè)查詢可能是連表的查詢,那么這個(gè)查詢的更新就會(huì)依賴于多個(gè)表的觀察操作费奸。這就引出了框架中的一個(gè)經(jīng)典的CS結(jié)構(gòu)的兩個(gè)類MultiInstanceInvalidationClient
, MultiInstanceInvalidationService
在初始化RoomDatabase
的時(shí)候我們會(huì)開(kāi)啟一個(gè)Client也就是startMultiInstanceInvalidation
,其實(shí)就是創(chuàng)建了有一個(gè)Client
void startMultiInstanceInvalidation(Context context, String name) {
mMultiInstanceInvalidationClient = new MultiInstanceInvalidationClient(context, name, this,
mDatabase.getQueryExecutor());
}
看一下Client初始化的過(guò)程
MultiInstanceInvalidationClient(Context context, String name,
InvalidationTracker invalidationTracker, Executor executor) {
mContext = context.getApplicationContext();
mName = name;
mInvalidationTracker = invalidationTracker;
mExecutor = executor;
mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
if (mStopped.get()) {
return;
}
try {
mService.broadcastInvalidation(mClientId,
tables.toArray(new String[0]));
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
}
}
@Override
boolean isRemote() {
return true;
}
};
Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
}
從上面來(lái)看弥激,其實(shí)在創(chuàng)建RoomDatabase
的時(shí)候創(chuàng)建Client的時(shí)候,我們就也創(chuàng)建了一個(gè)InvalidationTracker.Observer
愿阐,并且添加進(jìn)InvalidationTracker
的觀察列表微服,當(dāng)這個(gè)表發(fā)生更新的時(shí)候會(huì)通過(guò)服務(wù)端Service broadcastInvalidation
方法去通知客戶端Client。
@SuppressWarnings("WeakerAccess")
final Runnable mSetUpRunnable = new Runnable() {
@Override
public void run() {
try {
final IMultiInstanceInvalidationService service = mService;
if (service != null) {
mClientId = service.registerCallback(mCallback, mName);
mInvalidationTracker.addObserver(mObserver);
}
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot register multi-instance invalidation callback", e);
}
}
};
而每個(gè)Client在Setup的時(shí)候會(huì)去service.registerCallback
final IMultiInstanceInvalidationCallback mCallback =
new IMultiInstanceInvalidationCallback.Stub() {
@Override
public void onInvalidation(final String[] tables) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
mInvalidationTracker.notifyObserversByTableNames(tables);
}
});
}
};
這個(gè)callback
就是是說(shuō)收到broadcastInvalidation
的信息的時(shí)候會(huì)去執(zhí)行缨历。
這個(gè)流程就是在多個(gè)RoomDatabase
之間是如何溝通的以蕴,也就是說(shuō)在其他的RoomDatabase
也修改了你這個(gè)表,那是如何通知到你發(fā)生改變的辛孵。
小結(jié)
到這里我們?cè)谡w上把這個(gè)Room是如何做到響應(yīng)式的做了一個(gè)框架的解析丛肮。基本上也已經(jīng)瀏覽了整個(gè)Room的核心代碼魄缚。當(dāng)然其中還有很多的細(xì)節(jié)宝与,如果感興趣可以自己去好好讀一下焚廊。因?yàn)榭赡芪乙膊惶宄N乙彩浅趼缘淖x了一下做了一些自己的分析习劫∨匚粒肯定有理解不對(duì)的地方。大家閱讀過(guò)程中請(qǐng)辯證看待诽里,多多指正袒餐。