LiveData的不足
- LiveData 是一個專用于 Android 的具備自主生命周期感知能力的可觀察的數(shù)據(jù)存儲器類席吴,被有意簡化設(shè)計,這使得開發(fā)者很容易上手,但其不足有如下兩點:
- LiveData只能在主線程更新數(shù)據(jù)(postValue底層也是切換到主線程的,而且可能會有丟數(shù)據(jù)的問題)抢腐;
- LiveData操作符不夠強大, 對于較為復(fù)雜的交互數(shù)據(jù)流場景姑曙,建議使用 RxJava 或 Flow;
- LiveData與Android平臺緊密相連,雖然LiveData在表現(xiàn)層中運行良好迈倍,但它并不適合領(lǐng)域?qū)由丝浚驗轭I(lǐng)域?qū)幼詈檬仟毩⒂谄脚_的;
- LiveData 對于 Java 開發(fā)者、初學(xué)者或是一些簡單場景而言仍是可行的解決方案啼染。對于MVVM架構(gòu)而言宴合,View和ViewModel之間可以通過LiveData交互(看了下面就知道其實也可以用StateFlow), ViewModel和Repository之間就可以通過Flow交互;
RxJava的不足
- RxJava還是相當(dāng)強大的,基于事件流的鏈?zhǔn)秸{(diào)用迹鹅,進(jìn)行耗時任務(wù)卦洽,線程切換,是一個很好的異步操作庫, 但是對于Android開發(fā)來說其也有一些不足之處
- 強大意味著復(fù)雜斜棚,其繁多的操作符簡直是初學(xué)者的噩夢阀蒂;
- 它是非官方的,google自然也就不會花大力氣去推廣和優(yōu)化弟蚀;
- 為項目的包體積帶來了額外的增加蚤霞;
Flow
- Flow 是一種 "冷流"(Cold Stream)。"冷流" 是一種數(shù)據(jù)源义钉,該類數(shù)據(jù)源的生產(chǎn)者會在每個監(jiān)聽者開始消費事件的時候執(zhí)行(即不消費則不生產(chǎn)數(shù)據(jù)昧绣,而LiveData的發(fā)送端并不依賴于接收端),從而在每個訂閱上創(chuàng)建新的數(shù)據(jù)流(有多個訂閱者的時候捶闸,他們各自的事件是獨立的)夜畴。一旦消費者停止監(jiān)聽或者生產(chǎn)者的阻塞結(jié)束,數(shù)據(jù)流將會被自動關(guān)閉删壮。
- Flow 是 Kotlin 協(xié)程與響應(yīng)式編程模型結(jié)合的產(chǎn)物贪绘,支持線程切換、背壓央碟,通過協(xié)程取消功能提供自動清理功能兔簇,因此傾向于執(zhí)行一些重型任務(wù)。
- 使用 take, first, toList 等操作符可以簡化 Flow 的相關(guān)代碼測試硬耍。
- Flow本身并不了解Android的生命周期,也不提供Android生命周期狀態(tài)變化時收集器的自動暫停和恢復(fù),可以使用LifecycleCoroutineScope的擴展边酒,如 launchWhenStarted來啟動coroutine來收集我們的Flow--這些收集器將自動暫停经柴,并與組件的Lifecycle同步恢復(fù)。
- 相較于 Channel墩朦,F(xiàn)low 末端操作符 會觸發(fā)數(shù)據(jù)流的執(zhí)行坯认,同時會根據(jù)生產(chǎn)者一側(cè)流操作來決定是成功完成操作還是拋出異常,因此 Flows 會自動地關(guān)閉數(shù)據(jù)流,不會在生產(chǎn)者一側(cè)泄漏資源牛哺;而一旦 Channel 沒有正確關(guān)閉陋气,生產(chǎn)者可能不會清理大型資源,因此 Channels 更容易造成資源泄漏引润。
Flow的一些常用操作符
// val flow = flowOf(1,2,3,4,5)
// val flow: Flow<Int> = flow {
// List(20) {
// emit(it)//發(fā)送數(shù)據(jù)
// delay(300)
// }
// }
val flow = (1..10).asFlow()
lifecycleScope.launch {
flow.flowOn(Dispatchers.IO)//設(shè)定它運行時所使用的調(diào)度器,設(shè)置的調(diào)度器只對它之前的操作有影響
.onStart { log("onStart") }
.flowOn(Dispatchers.Main)
.onEach {
log("onEach:$it")
delay(300)
}
.filter {//過濾
it % 2 == 0
}
.map {//變換
log("map:$it*$it")
it * it
}
.transform<Int,String> {
"num=$it"
// emit("num1=$it")
// emit("num2=$it")
}
.flowOn(Dispatchers.IO)
.onCompletion {//訂閱流的完成,執(zhí)行在流完成時的邏輯
log("onCompletion: $it")
}
.catch {//捕獲 Flow 的異常,catch 函數(shù)只能捕獲它的上游的異常
log("catch: $it")
}
.flowOn(Dispatchers.Main)
.collect {//消費Flow
log("collect1_1: $it")
}
//Flow 可以被重復(fù)消費
flow.collect { log("collect1_2: $it") }
//除了可以在 collect 處消費 Flow 的元素以外巩趁,還可以通過 onEach 來做到這一點。
// 這樣消費的具體操作就不需要與末端操作符放到一起淳附,collect 函數(shù)可以放到其他任意位置調(diào)用
flow.onEach {
log("onEach2:$it")
}
withContext(Dispatchers.IO) {
delay(1000)
flow.collect()
}
//除了使用子協(xié)程執(zhí)行上流外议慰,我們還可以使用launchIn函數(shù)來讓Flow使用全新的協(xié)程上下文
flow.onEach {
log("onEach2:$it")
}.launchIn(CoroutineScope(Dispatchers.IO))
.join()//主線程等待這個協(xié)程執(zhí)行結(jié)束
Flow的取消
lifecycleScope.launch(Dispatchers.IO) {
val flow2 = (1..10).asFlow().onEach { delay(1000) }
val job: Job = lifecycleScope.launch {
log("lifecycleScope.launch")
flow2.flowOn(Dispatchers.IO)//設(shè)定它運行時所使用的調(diào)度器
.collect {//消費Flow
log("flow2:$it")
}
}
delay(2000)
job.cancelAndJoin()
}
Flow 的背壓
- 只要是響應(yīng)式編程,就一定會有背壓問題奴曙,我們先來看看背壓究竟是什么别凹。
- 背壓問題在生產(chǎn)者的生產(chǎn)速率高于消費者的處理速率的情況下出現(xiàn)。為了保證數(shù)據(jù)不丟失洽糟,我們也會考慮添加緩存來緩解問題:
//為 Flow 添加緩沖
flow {
List(5) {
emit(it)
}
}.buffer().collect {
log("flow buffer collect:$it")
}
- 也可以為 buffer 指定一個容量炉菲。不過,如果我們只是單純地添加緩存坤溃,而不是從根本上解決問題就始終會造成數(shù)據(jù)積壓拍霜。
- 問題產(chǎn)生的根本原因是生產(chǎn)和消費速率的不匹配,除直接優(yōu)化消費者的性能以外浇雹,我們也可以采取一些取舍的手段沉御。
- 第一種是 conflate。與 Channel 的 Conflate 模式一致昭灵,新數(shù)據(jù)會覆蓋老數(shù)據(jù)吠裆,
flow {
List(10) {
emit(it)
}
}
.conflate()
.collect { value ->
log("flow conflate Collecting $value")
delay(100)
log("$value collected flow conflate ")
}
- 第二種是 collectLatest。顧名思義烂完,只處理最新的數(shù)據(jù)试疙,這看上去似乎與 conflate 沒有區(qū)別,其實區(qū)別大了:它并不會直接用新數(shù)據(jù)覆蓋老數(shù)據(jù)抠蚣,而是每一個都會被處理祝旷,只不過如果前一個還沒被處理完后一個就來了的話,處理前一個數(shù)據(jù)的邏輯就會被取消除 collectLatest 之外還有 mapLatest嘶窄、flatMapLatest 等等怀跛,都是這個作用。
flow {
List(10) {
emit(it)
}
}.collectLatest { value ->
log("flow collectLatest Collecting $value")
delay(100)
log("$value collected flow collectLatest ")
}
使用更為安全的方式收集 Android UI 數(shù)據(jù)流
- 在 Android 開發(fā)中柄冲,請使用 LifecycleOwner.addRepeatingJob吻谋、suspend Lifecycle.repeatOnLifecycle 或 Flow.flowWithLifecycle 從 UI 層安全地收集數(shù)據(jù)流。(lifecycle-runtime-ktx:2.4.+ 庫中所提供的)
lifecycleScope.launch {
delay(500)
repeatOnLifecycle(Lifecycle.State.STARTED) {
flow.collect { log("collect2: $it") }
}
}
lifecycleScope.launchWhenStarted {
delay(1000)
flow.collect { log("collect3: $it") }
}
lifecycleScope.launch {
delay(1500)
flow.flowWithLifecycle(lifecycle,Lifecycle.State.STARTED)
.collect { log("collect4: $it") }
}
SharedFlow
- 冷流和訂閱者只能是一對一的關(guān)系现横,當(dāng)我們要實現(xiàn)一個流漓拾,多個訂閱者的需求時阁最,就需要熱流了,SharedFlow就是一種熱流
- 其構(gòu)造函數(shù)如下
public fun <T> MutableSharedFlow(
replay: Int = 0,//當(dāng)新的訂閱者Collect時骇两,發(fā)送幾個已經(jīng)發(fā)送過的數(shù)據(jù)給它速种,默認(rèn)為0,即默認(rèn)新訂閱者不會獲取以前的數(shù)據(jù)
extraBufferCapacity: Int = 0,//表示減去replay低千,MutableSharedFlow還緩存多少數(shù)據(jù)配阵,默認(rèn)為0
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND//表示緩存策略,即緩沖區(qū)滿了之后Flow如何處理
//BufferOverflow.SUSPEND 策略栋操,也就是掛起策略, 默認(rèn)為掛起
//BufferOverflow.DROP_OLDEST: 丟棄舊數(shù)據(jù)
//BufferOverflow.DROP_LATEST: 丟棄最新的數(shù)據(jù)
)
val sharedFlow = MutableSharedFlow<String>()
lifecycleScope.launch(Dispatchers.IO) {
delay(1000)
sharedFlow.emit("aaa")
delay(1000)
sharedFlow.emit("bbb")
delay(1000)
sharedFlow.emit("ccc")
}
lifecycleScope.launch {
delay(500)
sharedFlow.collect { log("collect1:$it") }
}
lifecycleScope.launch {
delay(1500)
sharedFlow.collect { log("collect2:$it") }
}
lifecycleScope.launch {
delay(2500)
sharedFlow.collect { log("collect3:$it") }
}
lifecycleScope.launch {
delay(3500)
sharedFlow.collect { log("collect4:$it") }
}
- 將冷流Flow轉(zhuǎn)化為SharedFlow
lifecycleScope.launch {
(1..5).asFlow().shareIn(
//1. 共享開始時所在的協(xié)程作用域范圍
scope = lifecycleScope,
//2. 控制共享的開始和結(jié)束的策略
// started = SharingStarted.Lazily,//當(dāng)首個訂閱者出現(xiàn)時開始闸餐,在scope指定的作用域被結(jié)束時終止
// started = SharingStarted.Eagerly,//立即開始,而在scope指定的作用域被結(jié)束時終止
//對于那些只執(zhí)行一次的操作矾芙,您可以使用Lazily或者Eagerly舍沙。然而,如果您需要觀察其他的流剔宪,就應(yīng)該使用WhileSubscribed來實現(xiàn)細(xì)微但又重要的優(yōu)化工作
//WhileSubscribed策略會在沒有收集器的情況下取消上游數(shù)據(jù)流
started = SharingStarted.WhileSubscribed(
500,//stopTimeoutMillis 控制一個以毫秒為單位的延遲值拂铡,指的是最后一個訂閱者結(jié)束訂閱與停止上游流的時間差。默認(rèn)值是 0(比如當(dāng)用戶旋轉(zhuǎn)設(shè)備時葱绒,原來的視圖會先被銷毀感帅,然后數(shù)秒鐘內(nèi)重建)
Long.MAX_VALUE//replayExpirationMillis表示數(shù)據(jù)重播的過時時間,如果用戶離開應(yīng)用太久地淀,此時您不想讓用戶看到陳舊的數(shù)據(jù)失球,你可以用到這個參數(shù)
),
//3. 狀態(tài)流的重播個數(shù)
replay = 0
).collect { log("shareIn.collect:$it") }
}
StateFlow
- StateFlow繼承于SharedFlow,是SharedFlow的一個特殊變種
- 構(gòu)造函數(shù)如下殷绍,只需要傳入一個默認(rèn)值
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
- StateFlow本質(zhì)上是一個replay為1部脚,并且沒有緩沖區(qū)的SharedFlow,因此第一次訂閱時會先獲得默認(rèn)值
- StateFlow僅在值已更新忱叭,并且值發(fā)生了變化時才會返回瘫证,即如果更新后的值沒有變化,也不會回調(diào)Collect方法该互,這點與LiveData不同
- StateFlow 與 LiveData是最接近的,因為:
1. 它始終是有值的斗蒋。
2. 它的值是唯一的煤惩。
3. 它允許被多個觀察者共用 (因此是共享的數(shù)據(jù)流)爷肝。
4. 它永遠(yuǎn)只會把最新的值重現(xiàn)給訂閱者猾浦,這與活躍觀察者的數(shù)量是無關(guān)的。
log("StateFlow 默認(rèn)值:111")
val stateFlow = MutableStateFlow("111")
lifecycleScope.launch {
delay(500)
stateFlow.collect { log("StateFlow collect1:$it") }
}
lifecycleScope.launch {
delay(1500)
stateFlow.collect { log("StateFlow collect2:$it") }
}
lifecycleScope.launch {
delay(2500)
stateFlow.collect { log("StateFlow collect3:$it") }
}
lifecycleScope.launch(Dispatchers.IO) {
delay(5000)
log("StateFlow re emit:111")
stateFlow.emit("111")
delay(1000)
log("StateFlow emit:222")
stateFlow.emit("222")
}
- 普通流Flow轉(zhuǎn)化成StateFlow
val stateFlow2: StateFlow<Int> = flow {
List(10) {
delay(300)
emit(it)
}
}.stateIn(
scope = lifecycleScope,
started = WhileSubscribed(5000),//等待5秒后仍然沒有訂閱者存在就終止協(xié)程
initialValue = 666//默認(rèn)值
)
lifecycleScope.launchWhenStarted {//STARTED狀態(tài)時會開始收集流灯抛,并且在RESUMED狀態(tài)時保持收集金赦,進(jìn)入STOPPED狀態(tài)時結(jié)束收集過程
stateFlow2.collect { log("StateFlow shareIn.collect:$it") }
}
StateFlow與SharedFlow 的區(qū)別
- SharedFlow配置更為靈活,支持配置replay,緩沖區(qū)大小等对嚼,StateFlow是SharedFlow的特化版本夹抗,replay固定為1,緩沖區(qū)大小默認(rèn)為0;
- StateFlow與LiveData類似猪半,支持通過myFlow.value獲取當(dāng)前狀態(tài)兔朦,如果有這個需求,必須使用StateFlow;
- SharedFlow支持發(fā)出和收集重復(fù)值磨确,而StateFlow當(dāng)value重復(fù)時沽甥,不會回調(diào)collect;
- 對于新的訂閱者,StateFlow只會重播當(dāng)前最新值乏奥,SharedFlow可配置重播元素個數(shù)(默認(rèn)為0摆舟,即不重播);
基于SharedFlow封裝FlowBus
創(chuàng)建消息類EventMessage
class EventMessage {
/**
* 消息的key
*/
var key: Int
/**
* 消息的主體message
*/
var message: Any? = null
private var messageMap: HashMap<String, Any?>? = null
constructor(key: Int, message: Any?) {
this.key = key
this.message = message
}
constructor(key: Int) {
this.key = key
}
fun put(key: String, message: Any?) {
if (messageMap == null) {
messageMap = HashMap<String, Any?>()
}
messageMap?.set(key, message)
}
operator fun <T> get(key: String?): T? {
if (messageMap != null) {
try {
return messageMap!![key] as T?
} catch (e: ClassCastException) {
e.printStackTrace()
}
}
return null
}
}
創(chuàng)建FlowBus
class FlowBus : ViewModel() {
companion object {
val instance by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { FlowBus() }
}
//正常事件
private val events = mutableMapOf<String, Event<*>>()
//粘性事件
private val stickyEvents = mutableMapOf<String, Event<*>>()
fun with(key: String, isSticky: Boolean = false): Event<Any> {
return with(key, Any::class.java, isSticky)
}
fun <T> with(eventType: Class<T>, isSticky: Boolean = false): Event<T> {
return with(eventType.name, eventType, isSticky)
}
@Synchronized
fun <T> with(key: String, type: Class<T>?, isSticky: Boolean): Event<T> {
val flows = if (isSticky) stickyEvents else events
if (!flows.containsKey(key)) {
flows[key] = Event<T>(key, isSticky)
}
return flows[key] as Event<T>
}
class Event<T>(private val key: String, isSticky: Boolean) {
// private mutable shared flow
private val _events = MutableSharedFlow<T>(
replay = if (isSticky) 1 else 0,
extraBufferCapacity = Int.MAX_VALUE
)
// publicly exposed as read-only shared flow
val events = _events.asSharedFlow()
/**
* need main thread execute
*/
fun observeEvent(
lifecycleOwner: LifecycleOwner,
dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
action: (t: T) -> Unit
) {
lifecycleOwner.lifecycle.addObserver(object : DefaultLifecycleObserver {
override fun onDestroy(owner: LifecycleOwner) {
super.onDestroy(owner)
LjyLogUtil.d("EventBus.onDestroy:remove key=$key")
val subscriptCount = _events.subscriptionCount.value
if (subscriptCount <= 0)
instance.events.remove(key)
}
})
lifecycleOwner.lifecycleScope.launch(dispatcher) {
lifecycleOwner.lifecycle.whenStateAtLeast(minActiveState) {
events.collect {
try {
action(it)
} catch (e: Exception) {
LjyLogUtil.d("ker=$key , error=${e.message}")
}
}
}
}
}
/**
* send value
*/
suspend fun setValue(
event: T,
dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate
) {
withContext(dispatcher) {
_events.emit(event)
}
}
}
}
使用FlowBus
FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.IO) {
withContext(Dispatchers.Main) {//不創(chuàng)建新的協(xié)程,指定協(xié)程上運行代碼塊,可以切換線程
FlowBus.instance.with(EventMessage::class.java)
.observeEvent(this@EventBusActivity) {
LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
}
}
FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.Main) {
val event = EventMessage(111)
LjyLogUtil.d(
"FlowBus:send1_${Thread.currentThread().name}_${
GsonUtils.toJson(
event
)
}"
)
FlowBus.instance.with(EventMessage::class.java).setValue(event)
delay(2000)
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(101))
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(102))
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(103))
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(104))
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(105))
}
lifecycleScope.launch(Dispatchers.IO) {
delay(4000)
val event = EventMessage(222, "bbb")
LjyLogUtil.d(
"FlowBus:send2_${Thread.currentThread().name}_${
GsonUtils.toJson(
event
)
}"
)
FlowBus.instance.with(EventMessage::class.java).setValue(event)
}
lifecycleScope.launch(Dispatchers.Default) {
delay(6000)
withContext(Dispatchers.Main) {
val event = EventMessage(333, "ccc")
event.put("key1", 123)
event.put("key2", "abc")
LjyLogUtil.d(
"FlowBus:send3_${Thread.currentThread().name}_${
GsonUtils.toJson(
event
)
}"
)
FlowBus.instance.with(EventMessage::class.java).setValue(event)
}
}
進(jìn)一步優(yōu)化
- 利用擴展函數(shù)邓了,ViewModelStoreOwner恨诱,及預(yù)傳EventMessage::class.javas是當(dāng)前項目中的使用更加簡單
//利用擴展函數(shù)
fun LifecycleOwner.observeEvent(
dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
isSticky: Boolean = false,
action: (t: EventMessage) -> Unit
) {
ApplicationScopeViewModelProvider
.getApplicationScopeViewModel(FlowBus::class.java)
.with(EventMessage::class.java, isSticky = isSticky)
.observeEvent(this@observeEvent, dispatcher, minActiveState, action)
}
fun postValue(
event: EventMessage,
delayTimeMillis: Long = 0,
isSticky: Boolean = false,
dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
) {
LjyLogUtil.d("FlowBus:send_${Thread.currentThread().name}_${GsonUtils.toJson(event)}")
ApplicationScopeViewModelProvider
.getApplicationScopeViewModel(FlowBus::class.java)
.viewModelScope
.launch(dispatcher) {
delay(delayTimeMillis)
ApplicationScopeViewModelProvider
.getApplicationScopeViewModel(FlowBus::class.java)
.with(EventMessage::class.java, isSticky = isSticky)
.setValue(event)
}
}
private object ApplicationScopeViewModelProvider : ViewModelStoreOwner {
private val eventViewModelStore: ViewModelStore = ViewModelStore()
override fun getViewModelStore(): ViewModelStore {
return eventViewModelStore
}
private val mApplicationProvider: ViewModelProvider by lazy {
ViewModelProvider(
ApplicationScopeViewModelProvider,
ViewModelProvider.AndroidViewModelFactory.getInstance(FlowBusInitializer.application)
)
}
fun <T : ViewModel> getApplicationScopeViewModel(modelClass: Class<T>): T {
return mApplicationProvider[modelClass]
}
}
object FlowBusInitializer {
lateinit var application: Application
//在Application中初始化
fun init(application: Application) {
FlowBusInitializer.application = application
}
}
lifecycleScope.launch(Dispatchers.IO) {
observeEvent {
LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
observeEvent(Dispatchers.IO) {
LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
observeEvent(Dispatchers.Main) {
LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
}
lifecycleScope.launch(Dispatchers.IO) {
delay(1000)
postValue(EventMessage(100))
postValue(EventMessage(101), 1000)
postValue(EventMessage(102, "bbb"), dispatcher = Dispatchers.IO)
val event3 = EventMessage(103, "ccc")
event3.put("key1", 123)
event3.put("key2", "abc")
postValue(event3, 2000, dispatcher = Dispatchers.Main)
}
參考
我是今陽句葵,如果想要進(jìn)階和了解更多的干貨厕鹃,歡迎關(guān)注微信公眾號 “今陽說” 接收我的最新文章