var a = 0.obs
我們看下會發(fā)生什么?
a就被RxInt包裹坐儿,
extension IntExtension on int {
/// Returns a `RxInt` with [this] `int` as initial value.
RxInt get obs => RxInt(this);
}
Rx<T>繼承_RxImpl
class Rx<T> extends _RxImpl<T> {
}
接著看
abstract class _RxImpl<T> extends RxNotifier<T> with RxObjectMixin<T> {
//這里觸發(fā)賦值操作
_RxImpl(T initial) {
_value = initial;
}
void update(void Function(T? val) fn) {
fn(_value);
subject.add(_value);
}
void trigger(T v) {
var firstRebuild = this.firstRebuild;
value = v;
if (!firstRebuild && !sentToStream) {
subject.add(v);
}
}
}
mixin RxObjectMixin<T> on NotifyManager<T> {
late T _value;
bool firstRebuild = true;
bool sentToStream = false;
String get string => value.toString();
set value(T val) {
if (subject.isClosed) return;
sentToStream = false;
if (_value == val && !firstRebuild) return;
firstRebuild = false;
_value = val;
sentToStream = true;
subject.add(_value);
}
T get value {
//這里的subject是GetStream; RxInterface.proxy == observer(build里面?zhèn)魅氲膐bserver)
RxInterface.proxy?.addListener(subject);
return _value;
}
}
class GetStream<T> {
List<LightSubscription<T>>? _onData = <LightSubscription<T>>[];
FutureOr<void> addSubscription(LightSubscription<T> subs) async {
if (!_isBusy!) {
return _onData!.add(subs);
} else {
await Future.delayed(Duration.zero);
return _onData!.add(subs);
}
}
void _notifyData(T data) {
_isBusy = true;
for (final item in _onData!) {
if (!item.isPaused) {
item._data?.call(data);
}
}
_isBusy = false;
}
void add(T event) {
assert(!isClosed, 'You cannot add event to closed Stream');
_value = event;
_notifyData(event);
}
LightSubscription<T> listen(void Function(T event) onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
final subs = LightSubscription<T>(
removeSubscription,
onPause: onPause,
onResume: onResume,
onCancel: onCancel,
)
..onData(onData)
..onError(onError)
..onDone(onDone)
..cancelOnError = cancelOnError;
addSubscription(subs);
onListen?.call();
return subs;
}
}
總結(jié):
當我們給觀察者對象添加.obs后垮耳,被封裝成了Rx<T>颈渊,RX繼承_RxImpl,_RxImpl 繼承RxNotifier with RxObjectMixin,在Obx中使用(a.value)會調(diào)用RxObjectMixin的get方法,get方法內(nèi)部 觀察者監(jiān)聽stream的變化(RxInterface.proxy?.addListen(subject))终佛,當賦值的時候(a.value=xx)會調(diào)用set方法俊嗽,如果該被訂閱對象已經(jīng)失效或者被訂閱對象的值沒有發(fā)生變化并且非首次構(gòu)建則不會刷新,否則會賦值新值铃彰,并且調(diào)用subject(getStream)的add方法绍豁,然后調(diào)用GetStream類的_notifyData方法,遍歷onData(onData是我們在Obx的時候訂閱的集合,Obx繼承ObxWidget繼承StatefulWidget牙捉,initState方法里面綁定觀察者訂閱關(guān)系(final _observer = RxNotifier();)訂閱者(late StreamSubscription subs;)竹揍,subs = _observer.listen(_updateTree,false);RXNotifier混入了NotifyManager,NotifyManager里的(GetStream<T> subject = GetStream<T>();)里面有l(wèi)isten方法,然后再調(diào)用subject.listen(),會調(diào)用addSubscription(subs),然后 _onData!.add(subs);添加到_onData集合),調(diào)用item._data.call()方法邪铲。當RxNotifier接受到通知后會通過listen回調(diào)調(diào)用_updateTree,這里面調(diào)用的是setState({})更新UI;
注釋:Obx原理
鬼佣,obx繼承自ObxWidget,ObxWidget在initstate方法中綁定了觀察者訂閱關(guān)系霜浴。build方法中調(diào)用RxInterface.notifyChildren 把_observer作為RxInterface.proxy 的臨時屬性晶衷,調(diào)用builder的后恢復原有的屬性, 注意builder(controller)函數(shù)里一定要包含obs.value,否則在if (!observer.canUpdate) 檢測時,由于沒有觀察對象阴孟,會拋出提示異常晌纫。
class Obx extends ObxWidget {
final WidgetCallback builder;
const Obx(this.builder, {Key? key}) : super(key: key);
@override
Widget build() => builder();
}
abstract class ObxWidget extends StatefulWidget {
const ObxWidget({Key? key}) : super(key: key);
@override
ObxState createState() => ObxState();
@protected
Widget build();
}
class ObxState extends State<ObxWidget> {
final _observer = RxNotifier();
late StreamSubscription subs;
@override
void initState() {
super.initState();
subs = _observer.listen(_updateTree, cancelOnError: false);
}
void _updateTree(_) {
if (mounted) {
setState(() {});
}
}
@override
Widget build(BuildContext context) =>
RxInterface.notifyChildren(_observer, widget.build);
}
class RxNotifier<T> = RxInterface<T> with NotifyManager<T>;
mixin NotifyManager<T> {
GetStream<T> subject = GetStream<T>();
final _subscriptions = <GetStream, List<StreamSubscription>>{};
bool get canUpdate => _subscriptions.isNotEmpty;
/// This is an internal method.
/// Subscribe to changes on the inner stream.
void addListener(GetStream<T> rxGetx) {
if (!_subscriptions.containsKey(rxGetx)) {
final subs = rxGetx.listen((data) {
if (!subject.isClosed) subject.add(data);
});
final listSubscriptions =
_subscriptions[rxGetx] ??= <StreamSubscription>[];
listSubscriptions.add(subs);
}
}
StreamSubscription<T> listen(
void Function(T) onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError ?? false,
);
}
}
abstract class RxInterface<T> {
static RxInterface? proxy;
bool get canUpdate;
static T notifyChildren<T>(RxNotifier observer, ValueGetter<T> builder) {
final oldObserver = RxInterface.proxy;
RxInterface.proxy = observer;
final result = builder();
if (!observer.canUpdate) {
RxInterface.proxy = oldObserver;
}
RxInterface.proxy = oldObserver;
return result;
}
}
為什么要有這一步 RxInterface.proxy = observer;
T get value {
RxInterface.proxy?.addListener(subject);
return _value;
}
Rx 最終是混入了 RxObjectMixin 類,即在 Rx 的獲取數(shù)據(jù)中調(diào)用了 RxInterface.proxy?.addListener 永丝,那什么時候獲取 Rx 的數(shù)據(jù)呢锹漱?
Obx(() {
return Text("${a.value}");
});
就是在 Obx 的 builder 方法里,這就清楚了為什么在 RxInterface.notifyChildren 方法里是先將傳入的 observer 賦值給 proxy 然后再調(diào)用 builder 方法了慕嚷,因為這樣在調(diào)用 builder 方法時調(diào)用了 Rx.value 哥牍,而在 get value 中調(diào)用了 RxInterface.proxy?.addListener ,且 addListener 傳入的 subject 是 Rx 的 GetStream喝检, 而 proxy 是 _ObxState 里創(chuàng)建的 RxNotifier嗅辣。
文字理解
obx 刷新原理
var a = 1.obs
將變量a包裝成RxInt 繼承自Rx<T> 繼承自抽象類_RxImpl 在這里初始化賦值, 繼承RxNotifier并且混入了RxObjectMiXin
obx 繼承自obxWidget->statefulWidget
obxState中
final _observer = RxNotifier()
var streamSubScription _subs
initState 中綁定觀察者和訂閱關(guān)系
_subs = _oberver.listen(_updateTree,cancleError:false)
接著會調(diào)用
getStream中的listen
listen(){
創(chuàng)建subs綁定傳入的updateTree函數(shù),然后調(diào)用addSubscription(subs)挠说,添加到_onData集中(這是個訂閱者集合)
}
接下來看builder
builder中調(diào)用了
RxInterface.notifyChildren(_observer,builder)
實現(xiàn)關(guān)鍵代碼: 臨時變量保存 const oldObserver = Rxinterface.proxy , Rxinterface.proxy = _observer,調(diào)用bulder方法澡谭,將原來的oldObserve賦值給Rxinterface.proxy = oldObserver
Obx的bulder方法中使用了a.value
a.value 就調(diào)用了 rxObjectMixin中的get方法
RxInterface.proxy.addListener(subject) //subject是getStream對象 RxInterface.proxy = build中傳入的observer(RxNotifier)
rxNotifier實現(xiàn)了addListener() 是因為Rxnotifier混入了NotifyManager
addListener(subject){
判斷subject對象是否有訂閱對象 如果沒有就調(diào)用getStream的listen方法將訂閱者加入到訂閱者集合
}
當我們給a.value= 2賦值的時候
會調(diào)用RxObjectMixin 的set方法
set(val){
首先判斷訂閱是否關(guān)閉
subject.close return
_value == val && !firsrBuld return
_value = val
subject.add(_value) //調(diào)用GetStream中的add方法
add(event){
_value = event
_notifyData(event)
}
_notifyData(){
for(item in _onData){
item._data.call() //這里就會調(diào)用updateTree 接著調(diào)用setState({})進行刷新
}
}
}