流程圖如下:
1篡石、使用方式
當原生平臺需要向dart發(fā)送消息時,需要用到EventChannel权逗。
1.1 Android端注冊
Android平臺的注冊方式:
class MainActivity : FlutterActivity(){
val DATA_RESULT_CHANNEL = "com.yourname.yourname/typeData"
override fun onCreate(savedInstanceState: Bundle?) {
EventChannel(flutterView, DATA_RESULT_CHANNEL).setStreamHandler(object : EventChannel.StreamHandler {
override fun onListen(arguments: Any?, events: EventChannel.EventSink?) {
listenEvents = events!!
}
override fun onCancel(arguments: Any?) {
}
})
}
fun receiveMessage(data: Object) {
if (data == null) {
listenEvents.error("1", "data == null", null)
} else {
listenEvents.success(data: Object))
}
}
}
1.2 flutter端消費消息
flutter端的接收方式是:
class InteractUtil {
static const EventChannel eventChannel =
const EventChannel("com.yourname.yourname/typeData");
List<InteractListener> listenerList;
factory InteractUtil() => _getInstance();
static InteractUtil get instance => _getInstance();
static InteractUtil _instance;
InteractUtil._internal() {
listenerList = [];
eventChannel.receiveBroadcastStream().listen(_onEvent, onError: _onError);
}
static InteractUtil _getInstance() {
if (_instance == null) {
_instance = new InteractUtil._internal();
}
return _instance;
}
void addListener(InteractListener listener) {
if (listener == null) {
return;
}
listenerList.add(listener);
}
void _onEvent(Object event) {
print("_onEvent is invoke$event");
for (InteractListener listener in listenerList) {
listener.onEvent(event);
}
}
void _onError(Object error) {
for (InteractListener listener in listenerList) {
listener.onError(error);
}
}
}
abstract class InteractListener {
void onEvent(Object event);
void onError(Object error);
}
2、Android端
2.1 EventChannel構造
看看EventChannel的構造函數(shù):
shell\platform\android\io\flutter\plugin\common\EventChannel.java
public EventChannel(BinaryMessenger messenger, String name) {
this(messenger, name, StandardMethodCodec.INSTANCE);
}
public EventChannel(BinaryMessenger messenger, String name, MethodCodec codec) {
this.messenger = messenger;
this.name = name;
this.codec = codec;
}
與MethodChannel類似冤议,這里在構造函數(shù)的參數(shù)里面構造了StandardMethodCodec和StandardMessageCodec斟薇。前者做方法名稱和參數(shù)的編解碼,后者做消息數(shù)據(jù)的編解碼恕酸。
這里的messenger是FlutterView堪滨。
2.2 設置消息處理
通過setStreamHandler設置消息處理,并接收回調蕊温。
shell\platform\android\io\flutter\plugin\common\EventChannel.java
public void setStreamHandler(final StreamHandler handler) {
messenger.setMessageHandler(name, handler == null ? null : new IncomingStreamRequestHandler(handler));
}
這里的handler就是Android MainActivity中定義的EventChannel.StreamHandler袱箱。
2.2.1 IncomingStreamRequestHandler
IncomingStreamRequestHandler還是在EventChannel中,看看他的定義:
private final class IncomingStreamRequestHandler implements BinaryMessageHandler {
private final StreamHandler handler;
private final AtomicReference<EventSink> activeSink = new AtomicReference<>(null);
IncomingStreamRequestHandler(StreamHandler handler) {
this.handler = handler;
}
@Override
public void onMessage(ByteBuffer message, final BinaryReply reply) {
final MethodCall call = codec.decodeMethodCall(message);
if (call.method.equals("listen")) {
onListen(call.arguments, reply);
} else if (call.method.equals("cancel")) {
onCancel(call.arguments, reply);
} else {
reply.reply(null);
}
}
private void onListen(Object arguments, BinaryReply callback) {
final EventSink eventSink = new EventSinkImplementation();
handler.onCancel(null);
handler.onListen(arguments, eventSink);
callback.reply(codec.encodeSuccessEnvelope(null));
}
private void onCancel(Object arguments, BinaryReply callback) {
final EventSink oldSink = activeSink.getAndSet(null);
handler.onCancel(arguments);
callback.reply(codec.encodeSuccessEnvelope(null));
}
}
重載實現(xiàn)了onMessage方法义矛,并在這個方法中根據(jù)方法名稱的不同发笔,分別調用onListen和onCancel方法。
onMessage是由dart端注冊之后回調過來的凉翻,這個流程在dart端追蹤了讨。
根據(jù)dart端調用的方法,對應的調用到kotlin代碼中的onListen或onCancel方法制轰,這里以onListen為例跟蹤代碼前计。
2.3 onListen接收消息處理對象
在onListen方法中,初始化了EventSinkImplementation對象垃杖,同時將這個對象回調給Android的注冊回調函數(shù)onListen残炮,后續(xù)Android端的數(shù)據(jù)發(fā)送就依靠這個對象了。
2.3.1 EventSinkImplementation
看看他的定義:
private final class EventSinkImplementation implements EventSink {
final AtomicBoolean hasEnded = new AtomicBoolean(false);
@Override
@UiThread
public void success(Object event) {
if (hasEnded.get() || activeSink.get() != this) {
return;
}
EventChannel.this.messenger.send(name, codec.encodeSuccessEnvelope(event));
}
@Override
@UiThread
public void error(String errorCode, String errorMessage, Object errorDetails) {
if (hasEnded.get() || activeSink.get() != this) {
return;
}
EventChannel.this.messenger.send(
name, codec.encodeErrorEnvelope(errorCode, errorMessage, errorDetails));
}
@Override
@UiThread
public void endOfStream() {
if (hasEnded.getAndSet(true) || activeSink.get() != this) {
return;
}
EventChannel.this.messenger.send(name, null);
}
}
根據(jù)結果缩滨,kotlin中可以選擇執(zhí)行success或error或endOfStream函數(shù)势就,將對應的數(shù)據(jù)發(fā)送到dart端泉瞻。
2.3.2 success發(fā)送成功數(shù)據(jù)
以success為例,先經過codec對象編碼苞冯,codec是StandardMethodCodec類型袖牙,看看encodeSuccessEnvelope方法是怎么編碼的:
shell\platform\android\io\flutter\plugin\common\StandardMethodCodec.java
@Override
public ByteBuffer encodeSuccessEnvelope(Object result) {
final ExposedByteArrayOutputStream stream = new ExposedByteArrayOutputStream();
stream.write(0);
messageCodec.writeValue(stream, result);
final ByteBuffer buffer = ByteBuffer.allocateDirect(stream.size());
buffer.put(stream.buffer(), 0, stream.size());
return buffer;
}
ExposedByteArrayOutputStream繼承自ByteArrayOutputStream,是一個ByteArray輸出流舅锄,可以寫入byte數(shù)據(jù)鞭达。
先寫入成功標志位0,再寫入數(shù)據(jù)皇忿。這里的messageCodec是StandardMessageCodec類型畴蹭,看看怎么寫數(shù)據(jù)的:
shell\platform\android\io\flutter\plugin\common\StandardMessageCodec.java
protected void writeValue(ByteArrayOutputStream stream, Object value) {
if (value == null || value.equals(null)) {
stream.write(NULL);
} else if (value == Boolean.TRUE) {
stream.write(TRUE);
} else if (value == Boolean.FALSE) {
stream.write(FALSE);
} else if (value instanceof Number) {
if (value instanceof Integer || value instanceof Short || value instanceof Byte) {
stream.write(INT);
writeInt(stream, ((Number) value).intValue());
} else if (value instanceof Long) {
stream.write(LONG);
writeLong(stream, (long) value);
} else if (value instanceof Float || value instanceof Double) {
stream.write(DOUBLE);
writeAlignment(stream, 8);
writeDouble(stream, ((Number) value).doubleValue());
} else if (value instanceof BigInteger) {
stream.write(BIGINT);
writeBytes(stream, ((BigInteger) value).toString(16).getBytes(UTF8));
} else {
throw new IllegalArgumentException("Unsupported Number type: " + value.getClass());
}
} else if (value instanceof String) {
stream.write(STRING);
writeBytes(stream, ((String) value).getBytes(UTF8));
} else if (value instanceof byte[]) {
stream.write(BYTE_ARRAY);
writeBytes(stream, (byte[]) value);
} else if (value instanceof int[]) {
stream.write(INT_ARRAY);
final int[] array = (int[]) value;
writeSize(stream, array.length);
writeAlignment(stream, 4);
for (final int n : array) {
writeInt(stream, n);
}
} else if (value instanceof long[]) {
stream.write(LONG_ARRAY);
final long[] array = (long[]) value;
writeSize(stream, array.length);
writeAlignment(stream, 8);
for (final long n : array) {
writeLong(stream, n);
}
} else if (value instanceof double[]) {
stream.write(DOUBLE_ARRAY);
final double[] array = (double[]) value;
writeSize(stream, array.length);
writeAlignment(stream, 8);
for (final double d : array) {
writeDouble(stream, d);
}
} else if (value instanceof List) {
stream.write(LIST);
final List<?> list = (List) value;
writeSize(stream, list.size());
for (final Object o : list) {
writeValue(stream, o);
}
} else if (value instanceof Map) {
stream.write(MAP);
final Map<?, ?> map = (Map) value;
writeSize(stream, map.size());
for (final Entry<?, ?> entry : map.entrySet()) {
writeValue(stream, entry.getKey());
writeValue(stream, entry.getValue());
}
} else {
throw new IllegalArgumentException("Unsupported value: " + value);
}
}
編碼方式就是先寫數(shù)據(jù)長度,再寫入具體數(shù)據(jù)鳍烁。支持的數(shù)據(jù)類型如下:
private static final byte NULL = 0;
private static final byte TRUE = 1;
private static final byte FALSE = 2;
private static final byte INT = 3;
private static final byte LONG = 4;
private static final byte BIGINT = 5;
private static final byte DOUBLE = 6;
private static final byte STRING = 7;
private static final byte BYTE_ARRAY = 8;
private static final byte INT_ARRAY = 9;
private static final byte LONG_ARRAY = 10;
private static final byte DOUBLE_ARRAY = 11;
private static final byte LIST = 12;
private static final byte MAP = 13;
BigInteger 不可變的任意精度的整數(shù)叨襟。所有操作中,都以二進制補碼形式表示 BigInteger幔荒。
支持bool,int,long,BigInteger,double,String,ByteArray,IntArray,LongArray,DoubleArray,List,Map糊闽。集合類型中的數(shù)據(jù)類型也必須是基本數(shù)據(jù)類型或其數(shù)組,以及String類型爹梁。
寫int數(shù)據(jù)之前右犹,先4字節(jié)對齊;寫long或float或double類型之前姚垃,先8字節(jié)對齊念链。
String類型轉換成byte[]數(shù)據(jù)再寫入。
所有數(shù)據(jù)寫入stream之后积糯,通過allocateDirect方法為ByteBuffer分配stream大小的內存空間钓账,并將stream中的數(shù)據(jù)寫入ByteBuffer中。
2.4 send發(fā)送數(shù)據(jù)
2.4.1 FlutterView
上一步生成的ByteBuffer數(shù)據(jù)在這里被send絮宁,messenger對象其實是FlutterView梆暮,看看send方法:
shell\platform\android\io\flutter\view\FlutterView.java
@Override
@UiThread
public void send(String channel, ByteBuffer message) {
send(channel, message, null);
}
@Override
@UiThread
public void send(String channel, ByteBuffer message, BinaryReply callback) {
if (!isAttached()) {
Log.d(TAG, "FlutterView.send called on a detached view, channel=" + channel);
return;
}
mNativeView.send(channel, message, callback);
}
2.4.2 FlutterNativeView
mNativeView是FlutterNativeView類型,看看他里面的方法:
@Override
@UiThread
public void send(String channel, ByteBuffer message) {
dartExecutor.getBinaryMessenger().send(channel, message);
}
2.4.3 DefaultBinaryMessenger
這里的getBinaryMessenger方法返回的是dartMessenger對象绍昂,對應DefaultBinaryMessenger類啦粹。是在DartExecutor構造函數(shù)里面初始化的:
DartExecutor
public DartExecutor(@NonNull FlutterJNI flutterJNI, @NonNull AssetManager assetManager) {
this.flutterJNI = flutterJNI;
this.assetManager = assetManager;
this.dartMessenger = new DartMessenger(flutterJNI);
dartMessenger.setMessageHandler("flutter/isolate", isolateChannelMessageHandler);
this.binaryMessenger = new DefaultBinaryMessenger(dartMessenger);
}
看看DefaultBinaryMessenger里面的send方法:
DefaultBinaryMessenger
@Override
@UiThread
public void send(@NonNull String channel, @Nullable ByteBuffer message) {
messenger.send(channel, message, null);
}
2.4.4 DartMessenger
messenger其實是dartMessenger,對應DartMessenger類窘游,看看里面的send方法:
DartMessenger
@Override
@UiThread
public void send(@NonNull String channel, @NonNull ByteBuffer message) {
send(channel, message, null);
}
@Override
public void send(
@NonNull String channel,
@Nullable ByteBuffer message,
@Nullable BinaryMessenger.BinaryReply callback) {
int replyId = 0;
if (message == null) {
flutterJNI.dispatchEmptyPlatformMessage(channel, replyId);
} else {
flutterJNI.dispatchPlatformMessage(channel, message, message.position(), replyId);
}
}
2.4.5 FlutterJNI
這里的message不為空唠椭,對應調用dispatchPlatformMessage方法。是在FlutterJNI中調用到native層忍饰,看看這個方法:
@UiThread
public void dispatchPlatformMessage(
@NonNull String channel, @Nullable ByteBuffer message, int position, int responseId) {
if (isAttached()) {
nativeDispatchPlatformMessage(nativePlatformViewId, channel, message, position, responseId);
} else {
}
}
// Send a data-carrying platform message to Dart.
private native void nativeDispatchPlatformMessage(
long nativePlatformViewId,
@NonNull String channel,
@Nullable ByteBuffer message,
int position,
int responseId);
nativeDispatchPlatformMessage調用到了native層贪嫂,是在shell\platform\android\platform_view_android_jni.cc
文件中,看看注冊的地方:
platform_view_android_jni.cc
bool RegisterApi(JNIEnv* env) {
static const JNINativeMethod flutter_jni_methods[] = {
{
.name = "nativeDispatchPlatformMessage",
.signature = "(JLjava/lang/String;Ljava/nio/ByteBuffer;II)V",
.fnPtr = reinterpret_cast<void*>(&DispatchPlatformMessage),
},
}
}
看看DispatchPlatformMessage方法的調用棧:
platform_view_android_jni.cc
static void DispatchPlatformMessage(JNIEnv* env,
jobject jcaller,
jlong shell_holder,
jstring channel,
jobject message,
jint position,
jint responseId) {
ANDROID_SHELL_HOLDER->GetPlatformView()->DispatchPlatformMessage(
env, //
fml::jni::JavaStringToString(env, channel), //
message, //
position, //
responseId //
);
}
shell\platform\android\platform_view_android.cc
void PlatformViewAndroid::DispatchPlatformMessage(JNIEnv* env,
std::string name,
jobject java_message_data,
jint java_message_position,
jint response_id) {
uint8_t* message_data =
static_cast<uint8_t*>(env->GetDirectBufferAddress(java_message_data));
std::vector<uint8_t> message =
std::vector<uint8_t>(message_data, message_data + java_message_position);
fml::RefPtr<flutter::PlatformMessageResponse> response;
if (response_id) {
response = fml::MakeRefCounted<PlatformMessageResponseAndroid>(
response_id, java_object_, task_runners_.GetPlatformTaskRunner());
}
PlatformView::DispatchPlatformMessage(
fml::MakeRefCounted<flutter::PlatformMessage>(
std::move(name), std::move(message), std::move(response)));
}
這一步將name艾蓝,message封裝到了PlatformMessage對象中力崇。
shell\common\platform_view.cc
void PlatformView::DispatchPlatformMessage(
fml::RefPtr<PlatformMessage> message) {
delegate_.OnPlatformViewDispatchPlatformMessage(std::move(message));
}
shell\common\shell.cc
// |PlatformView::Delegate|
void Shell::OnPlatformViewDispatchPlatformMessage(
fml::RefPtr<PlatformMessage> message) {
FML_DCHECK(is_setup_);
FML_DCHECK(task_runners_.GetPlatformTaskRunner()->RunsTasksOnCurrentThread());
task_runners_.GetUITaskRunner()->PostTask(
[engine = engine_->GetWeakPtr(), message = std::move(message)] {
if (engine) {
engine->DispatchPlatformMessage(std::move(message));
}
});
}
shell\common\engine.cc
void Engine::DispatchPlatformMessage(fml::RefPtr<PlatformMessage> message) {
if (message->channel() == kLifecycleChannel) {
if (HandleLifecyclePlatformMessage(message.get()))
return;
} else if (message->channel() == kLocalizationChannel) {
if (HandleLocalizationPlatformMessage(message.get()))
return;
} else if (message->channel() == kSettingsChannel) {
HandleSettingsPlatformMessage(message.get());
return;
}
if (runtime_controller_->IsRootIsolateRunning() &&
runtime_controller_->DispatchPlatformMessage(std::move(message))) {
return;
}
// If there's no runtime_, we may still need to set the initial route.
if (message->channel() == kNavigationChannel) {
HandleNavigationPlatformMessage(std::move(message));
return;
}
FML_DLOG(WARNING) << "Dropping platform message on channel: "
<< message->channel();
}
在這里執(zhí)行到runtime_controller_->DispatchPlatformMessage
中斗塘,看看這個方法:
runtime\runtime_controller.cc
bool RuntimeController::DispatchPlatformMessage(
fml::RefPtr<PlatformMessage> message) {
if (auto* window = GetWindowIfAvailable()) {
TRACE_EVENT1("flutter", "RuntimeController::DispatchPlatformMessage",
"mode", "basic");
window->DispatchPlatformMessage(std::move(message));
return true;
}
return false;
}
lib\ui\window\window.cc
void Window::DispatchPlatformMessage(fml::RefPtr<PlatformMessage> message) {
std::shared_ptr<tonic::DartState> dart_state = library_.dart_state().lock();
if (!dart_state) {
FML_DLOG(WARNING)
<< "Dropping platform message for lack of DartState on channel: "
<< message->channel();
return;
}
tonic::DartState::Scope scope(dart_state);
Dart_Handle data_handle =
(message->hasData()) ? ToByteData(message->data()) : Dart_Null();
if (Dart_IsError(data_handle)) {
FML_DLOG(WARNING)
<< "Dropping platform message because of a Dart error on channel: "
<< message->channel();
return;
}
int response_id = 0;
if (auto response = message->response()) {
response_id = next_response_id_++;
pending_responses_[response_id] = response;
}
tonic::LogIfError(
tonic::DartInvokeField(library_.value(), "_dispatchPlatformMessage",
{tonic::ToDart(message->channel()), data_handle,
tonic::ToDart(response_id)}));
}
將message中的數(shù)據(jù)轉換成Dart_Handle,并最終執(zhí)行_dispatchPlatformMessage方法亮靴,同時傳遞channel name和數(shù)據(jù)馍盟。
3、flutter端
3.1 方法映射
_dispatchPlatformMessage對應的是hooks.dart文件中的_invoke3方法茧吊。
lib\ui\hooks.dart
void _dispatchPlatformMessage(String name, ByteData data, int responseId) {
if (name == ChannelBuffers.kControlChannelName) {
try {
channelBuffers.handleMessage(data);
} catch (ex) {
_printDebug('Message to "$name" caused exception $ex');
} finally {
window._respondToPlatformMessage(responseId, null);
}
} else if (window.onPlatformMessage != null) {
_invoke3<String, ByteData, PlatformMessageResponseCallback>(
window.onPlatformMessage,
window._onPlatformMessageZone,
name,
data,
(ByteData responseData) {
window._respondToPlatformMessage(responseId, responseData);
},
);
} else {
channelBuffers.push(name, data, (ByteData responseData) {
window._respondToPlatformMessage(responseId, responseData);
});
}
}
這里調用到了onPlatformMessage方法贞岭,攜帶的參數(shù)就是_dispatchPlatformMessage的name,data參數(shù)搓侄。
3.2 ServicesBinding初始化
這個onPlatformMessage是哪里定義的呢瞄桨?記得在ServicesBinding的initInstances方法中,有定義這個方法:
packages\flutter\lib\src\services\binding.dart\ServicesBinding
@override
void initInstances() {
super.initInstances();
_instance = this;
_defaultBinaryMessenger = createBinaryMessenger();
window
..onPlatformMessage = defaultBinaryMessenger.handlePlatformMessage;
initLicenses();
SystemChannels.system.setMessageHandler(handleSystemMessage);
}
defaultBinaryMessenger就是_DefaultBinaryMessenger類型讶踪,而在onPlatformMessage被調用的時候芯侥,就執(zhí)行到了里面的handlePlatformMessage方法。
3.3 _DefaultBinaryMessenger消息發(fā)送對象
看看方法體:
packages\flutter\lib\src\services\binding.dart\_DefaultBinaryMessenger
@override
Future<void> handlePlatformMessage(
String channel,
ByteData data,
ui.PlatformMessageResponseCallback callback,
) async {
ByteData response;
try {
final MessageHandler handler = _handlers[channel];
if (handler != null) {
response = await handler(data);
} else {
ui.channelBuffers.push(channel, data, callback);
callback = null;
}
} catch (exception, stack) {
} finally {
if (callback != null) {
callback(response);
}
}
}
這里會執(zhí)行到ui.channelBuffers.push(channel, data, callback);
俊柔,看看是怎么講數(shù)據(jù)push過去的:
pkg\sky_engine\lib\ui\channel_buffers.dart
bool push(String channel, ByteData data, PlatformMessageResponseCallback callback) {
_RingBuffer<_StoredMessage> queue = _messages[channel];
if (queue == null) {
queue = _makeRingBuffer(kDefaultBufferSize);
_messages[channel] = queue;
}
final bool didOverflow = queue.push(_StoredMessage(data, callback));
if (didOverflow) {
}
return didOverflow;
}
可以看到這里有一個消息隊列筹麸,有消息過來就將channel對應的消息存起來放到隊列中活合。
3.4 flutter注冊消息接收
flutter注冊的時候雏婶,會注冊兩個方法,一個是_onEvent白指,一個是_onError留晚。
eventChannel.receiveBroadcastStream().listen(_onEvent, onError: _onError);
3.5 EventChannel構造
看看他的構造方法:
packages\flutter\lib\src\services\platform_channel.dart
const EventChannel(this.name, [this.codec = const StandardMethodCodec(), BinaryMessenger binaryMessenger])
: assert(name != null),
assert(codec != null),
_binaryMessenger = binaryMessenger;
codec是StandardMethodCodec類型,提供方法及其參數(shù)的編解碼告嘲,binaryMessenger對象是上一章節(jié)中說的_DefaultBinaryMessenger類型错维,在ServicesBinding中定義及初始化。
3.5.1 receiveBroadcastStream獲取Stream對象
是EventChannel類的方法橄唬。
packages\flutter\lib\src\services\platform_channel.dart
Stream<dynamic> receiveBroadcastStream([ dynamic arguments ]) {
final MethodChannel methodChannel = MethodChannel(name, codec);
StreamController<dynamic> controller;
controller = StreamController<dynamic>.broadcast(onListen: () async {
}, onCancel: () async {
});
return controller.stream;
}
這里先是使用EventChannel初始化時傳遞的name和生成的codec參數(shù)赋焕,構造了一個MethodChannel對象。
3.5.2 StreamController控制數(shù)據(jù)流
接下來調用StreamController的broadcast方法仰楚,監(jiān)聽接收到的消息隆判。
pkg\sky_engine\lib\async\stream_controller.dart\StreamController
factory StreamController.broadcast(
{void onListen(), void onCancel(), bool sync: false}) {
return sync
? new _SyncBroadcastStreamController<T>(onListen, onCancel)
: new _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
可以看到上一步中,沒有傳sync參數(shù)僧界,這里默認是false侨嘀,也就是會返回一個_AsyncBroadcastStreamController對象。
3.5.3 _AsyncBroadcastStreamController異步數(shù)據(jù)流處理
構造方法如下:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
_AsyncBroadcastStreamController(void onListen(), void onCancel())
: super(onListen, onCancel);
他繼承自_BroadcastStreamController類捂襟,再看看super方法:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
_BroadcastStreamController(this.onListen, this.onCancel)
: _state = _STATE_INITIAL;
將兩個回調方法給到自身定義的兩個變量中咬腕,兩個變量其實是回調方法。
在receiveBroadcastStream方法的最后會返回controller.stream
葬荷,這個stream定義在_BroadcastStreamController中:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
Stream<T> get stream => new _BroadcastStream<T>(this);
3.5.4 _BroadcastStream構造
看看他的構造方法:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
class _BroadcastStream<T> extends _ControllerStream<T> {
_BroadcastStream(_StreamControllerLifecycle<T> controller)
: super(controller);
bool get isBroadcast => true;
}
這里的controller其實是_BroadcastStreamController類型涨共,因為_BroadcastStreamController實現(xiàn)了_StreamControllerBase接口纽帖,而_StreamControllerBase接口繼承了_StreamControllerLifecycle接口。
3.5.5 _ControllerStream構造
_BroadcastStream繼承自_ControllerStream煞赢,看看他的構造方法:
pkg\sky_engine\lib\async\stream_controller.dart
class _ControllerStream<T> extends _StreamImpl<T> {
_ControllerStream(this._controller);
}
3.5.6 _StreamImpl構造
_ControllerStream繼承自_StreamImpl方法抛计,其定義的地方是:
pkg\sky_engine\lib\async\stream_impl.dart
abstract class _StreamImpl<T> extends Stream<T> {
}
Stream中持有StreamController對象,繼承關系先到這里照筑。
3.6 listen監(jiān)聽
在我們自己的dart代碼中執(zhí)行完receiveBroadcastStream之后吹截,就要執(zhí)行l(wèi)isten方法了。
listen方法定義在_StreamImpl類中:
pkg\sky_engine\lib\async\stream_impl.dart\_StreamImpl
StreamSubscription<T> listen(void onData(T data),
{Function onError, void onDone(), bool cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
StreamSubscription<T> subscription =
_createSubscription(onData, onError, onDone, cancelOnError);
_onListen(subscription);
return subscription;
}
// -------------------------------------------------------------------
/** Create a subscription object. Called by [subcribe]. */
StreamSubscription<T> _createSubscription(void onData(T data),
Function onError, void onDone(), bool cancelOnError) {
return new _BufferingStreamSubscription<T>(
onData, onError, onDone, cancelOnError);
}
這里的onData方法對應注冊時的_onEvent方法凝危,第二個參數(shù)中的onError對應注冊時的_onError方法波俄。
但是在_StreamImpl的子類_ControllerStream中,也定義了這個_createSubscription方法:
pkg\sky_engine\lib\async\stream_controller.dart
class _ControllerStream<T> extends _StreamImpl<T> {
StreamSubscription<T> _createSubscription(void onData(T data),
Function onError, void onDone(), bool cancelOnError) =>
_controller._subscribe(onData, onError, onDone, cancelOnError);
}
該調用哪一個呢蛾默?在https://dartpad.dev/
寫個demo看看:
abstract class Test1{
void listen(){
print("class init");
_test1();
print("class init1");
}
void _test1(){
print("_test1 1");
}
}
class Demo1 extends Test1{
void _test1(){
print("_test1 2");
}
}
class Demo2 extends Demo1{
}
void main() {
Demo2().listen();
}
輸出如下:
class init
_test1 2
class init1
應該是調用_ControllerStream的_createSubscription方法懦铺。_controller對應的是_AsyncBroadcastStreamController,實際_subscribe方法在其父類_BroadcastStreamController中定義:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
StreamSubscription<T> _subscribe(void onData(T data), Function onError,
void onDone(), bool cancelOnError) {
if (isClosed) {
onDone ??= _nullDoneHandler;
return new _DoneStreamSubscription<T>(onDone);
}
StreamSubscription<T> subscription = new _BroadcastSubscription<T>(
this, onData, onError, onDone, cancelOnError);
_addListener(subscription);
if (identical(_firstSubscription, _lastSubscription)) {
// Only one listener, so it must be the first listener.
_runGuarded(onListen);
}
return subscription;
}
_BroadcastSubscription的繼承鏈條是:_BroadcastSubscription,_ControllerSubscription,_BufferingStreamSubscription.
看看_addListener方法:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
void _addListener(_BroadcastSubscription<T> subscription) {
assert(identical(subscription._next, subscription));
subscription._eventState = (_state & _STATE_EVENT_ID);
// Insert in linked list as last subscription.
_BroadcastSubscription<T> oldLast = _lastSubscription;
_lastSubscription = subscription;
subscription._next = null;
subscription._previous = oldLast;
if (oldLast == null) {
_firstSubscription = subscription;
} else {
oldLast._next = subscription;
}
}
第一次添加監(jiān)聽支鸡,可以得到_firstSubscription和_lastSubscription相等冬念,也就是執(zhí)行_runGuarded方法,也就會回調到onListen方法牧挣。這個方法一開始對應的就是broadcast方法里面的onListen閉包函數(shù)急前。
3.7 回到dart注冊
現(xiàn)在回到最開始注冊回調的地方,執(zhí)行EventChannel中receiveBroadcastStream里面的onListen回調方法:
packages\flutter\lib\src\services\platform_channel.dart
onListen: () async {
binaryMessenger.setMessageHandler(name, (ByteData reply) async {
if (reply == null) {
controller.close();
} else {
try {
controller.add(codec.decodeEnvelope(reply));
} on PlatformException catch (e) {
controller.addError(e);
}
}
return null;
});
try {
await methodChannel.invokeMethod<void>('listen', arguments);
} catch (exception, stack) {
}
}
3.7.1 設置消息處理
binaryMessenger是_DefaultBinaryMessenger類型瀑构,看看里面的setMessageHandler方法:
packages\flutter\lib\src\services\binding.dart
@override
void setMessageHandler(String channel, MessageHandler handler) {
if (handler == null)
_handlers.remove(channel);
else
_handlers[channel] = handler;
ui.channelBuffers.drain(channel, (ByteData data, ui.PlatformMessageResponseCallback callback) async {
await handlePlatformMessage(channel, data, callback);
});
}
MessageHandler就是onListen第二個參數(shù)里面的閉包塊裆针。先將這個MessageHandler放到_handlers Map中。然后執(zhí)行drain方法:
pkg\sky_engine\lib\ui\channel_buffers.dart
Future<void> drain(String channel, DrainChannelCallback callback) async {
while (!_isEmpty(channel)) {
final _StoredMessage message = _pop(channel);
await callback(message.data, message.callback);
}
}
看看_isEmpty方法:
bool _isEmpty(String channel) {
final _RingBuffer<_StoredMessage> queue = _messages[channel];
return (queue == null) ? true : queue.isEmpty;
}
先獲取channel對應的消息隊列寺晌,如果為空返回true世吨,否則判斷消息隊列的隊頭是否等于隊尾,相等則為true呻征,否則為false耘婚。
如果消息隊列中有消息,此時就回調callback方法陆赋,傳遞數(shù)據(jù)和message.callback參數(shù)沐祷。
callback方法對應的調用到_DefaultBinaryMessenger中的handlePlatformMessage方法:
packages\flutter\lib\src\services\binding.dart
@override
Future<void> handlePlatformMessage(
String channel,
ByteData data,
ui.PlatformMessageResponseCallback callback,
) async {
ByteData response;
try {
final MessageHandler handler = _handlers[channel];
if (handler != null) {
response = await handler(data);
} else {
ui.channelBuffers.push(channel, data, callback);
callback = null;
}
} catch (exception, stack) {
} finally {
if (callback != null) {
callback(response);
}
}
}
這里的handler不為空,于是調用到最開始binaryMessenger.setMessageHandler的第二個閉包函數(shù)中:
packages\flutter\lib\src\services\platform_channel.dart
binaryMessenger.setMessageHandler(name, (ByteData reply) async {
controller.add(codec.decodeEnvelope(reply));
}
先解碼收到的數(shù)據(jù)奏甫,在StandardMethodCodec中解碼:
packages\flutter\lib\src\services\message_codecs.dart
dynamic decodeEnvelope(ByteData envelope) {
if (buffer.getUint8() == 0)
return messageCodec.readValue(buffer);
}
讀取數(shù)據(jù)戈轿,然后返回,這里的數(shù)據(jù)類型為dynamic阵子,需要我們自己清楚兩端發(fā)送和接收的數(shù)據(jù)類型即可思杯。
3.7.2 invokeMethod發(fā)消息給Android
這個方法執(zhí)行的時機是dart端執(zhí)行l(wèi)isten方法之后,就會回調到onListen的閉包函數(shù)中,然后通過MethodChannel執(zhí)行一個channel名字為listen的方法色乾,實際最終執(zhí)行到了Android的IncomingStreamRequestHandler類中的onMessage方法中誊册,具體流程可以參考之前的文章 flutter通信機制-MethodChannel。也就實現(xiàn)了Android端針對EventChannel.EventSink
變量的初始化暖璧。
3.7.3 添加數(shù)據(jù)處理
然后執(zhí)行controller的add方法:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart\_BroadcastStreamController
void add(T data) {
if (!_mayAddEvent) throw _addEventError();
_sendData(data);
}
add方法定義在_AsyncBroadcastStreamController類中案怯,看看_sendData方法:
void _sendData(T data) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedData<T>(data));
}
}
接下來調用_addPending方法,在_BufferingStreamSubscription類中:
pkg\sky_engine\lib\async\broadcast_stream_controller.dart
void _addPending(_DelayedEvent event) {
_StreamImplEvents<T> pending = _pending;
if (_pending == null) {
pending = _pending = new _StreamImplEvents<T>();
}
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
if (!_isPaused) {
_pending.schedule(this);
}
}
}
3.7.4 調度消息回調
看看_StreamImplEvents的schedule方法:
pkg\sky_engine\lib\async\stream_impl.dart\_PendingEvents
void schedule(_EventDispatch<T> dispatch) {
if (isScheduled) return;
assert(!isEmpty);
if (_eventScheduled) {
assert(_state == _STATE_CANCELED);
_state = _STATE_SCHEDULED;
return;
}
scheduleMicrotask(() {
int oldState = _state;
_state = _STATE_UNSCHEDULED;
if (oldState == _STATE_CANCELED) return;
handleNext(dispatch);
});
_state = _STATE_SCHEDULED;
}
handleNext方法執(zhí)行的就是_StreamImplEvents中的方法澎办,_StreamImplEvents繼承自_PendingEvents類:
pkg\sky_engine\lib\async\stream_impl.dart
void handleNext(_EventDispatch<T> dispatch) {
assert(!isScheduled);
_DelayedEvent event = firstPendingEvent;
firstPendingEvent = event.next;
if (firstPendingEvent == null) {
lastPendingEvent = null;
}
event.perform(dispatch);
}
到這里執(zhí)行perform方法嘲碱,其對應的是_DelayedData類中的方法:
pkg\sky_engine\lib\async\stream_impl.dart
void perform(_EventDispatch<T> dispatch) {
dispatch._sendData(value);
}
dispatch對象在_BufferingStreamSubscription的_addPending方法中調用schedule的時候,指代的就是_BufferingStreamSubscription本身局蚀,因此_sendData調用會在_BufferingStreamSubscription中:
pkg\sky_engine\lib\async\stream_impl.dart
void _sendData(T data) {
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
_zone.runUnaryGuarded(_onData, data)
最終就會調用到我們最初dart代碼中定義的onEvent方法中了麦锯,也就是可以在void _onEvent(Object event) {}
回調方法中處理data數(shù)據(jù)了。
4琅绅、scheduleMicrotask任務調度
在_PendingEvents的schedule方法中扶欣,會執(zhí)行scheduleMicrotask方法,看看這個方法里面是怎么執(zhí)行的:
pkg\sky_engine\lib\async\schedule_microtask.dart
void scheduleMicrotask(void callback()) {
_Zone currentZone = Zone.current;
if (identical(_rootZone, currentZone)) {
// No need to bind the callback. We know that the root's scheduleMicrotask
// will be invoked in the root zone.
_rootScheduleMicrotask(null, null, _rootZone, callback);
return;
}
_ZoneFunction implementation = currentZone._scheduleMicrotask;
if (identical(_rootZone, implementation.zone) &&
_rootZone.inSameErrorZone(currentZone)) {
_rootScheduleMicrotask(
null, null, currentZone, currentZone.registerCallback(callback));
return;
}
Zone.current.scheduleMicrotask(Zone.current.bindCallbackGuarded(callback));
}
4.1 Zone運行空間
Zone表示一個可以穩(wěn)定異步調用的環(huán)境千扶。代碼總是在一個空間的上下文中執(zhí)行料祠,比如Zone.current
。初始化main函數(shù)在Zone.root
空間中執(zhí)行澎羞,代碼可以在不同的空間中執(zhí)行髓绽,既可以通過runZoned
創(chuàng)建一個新的空間,也可以通過Zone.run
方法在一個已經存在的空間上下文中執(zhí)行煤痕,比如通過Zone.fork
創(chuàng)建的空間中梧宫。
異步回調方法總是在他們被調度的上下文空間中運行接谨,兩步即可實現(xiàn):
1摆碉、注冊回調方法,方法是registerCallback或registerUnaryCallback或registerBinaryCallback脓豪,這允許空間記錄這一個回調方法巷帝,后續(xù)可能也會存在修改,比如返回另外一個回調方法扫夜。做注冊操作時的空間預示著后續(xù)回調也運行在這個空間中楞泼。
2、在后續(xù)某個時間點笤闯,回調方法在對應的空間中運行堕阔。
為了方便,空間提供了bindCallback(bindUnaryCallback或bindBinaryCallback)方法來表示這種機制颗味,最開始注冊方法所在的空間超陆,就是其包裹的回調方法被異步執(zhí)行時所在的空間。
同樣的浦马,空間提供了bindCallbackGuarded(bindUnaryCallbackGuarded或bindBinaryCallbackGuarded)方法时呀,應該在其中通過調用Zone.runGuarded
去執(zhí)行回調方法张漂。
4.2 執(zhí)行任務
這里的Zone.current其實是_RootZone,意味著跟main函數(shù)所在的空間相同谨娜。于是這里調用_rootScheduleMicrotask方法:
pkg\sky_engine\lib\async\zone.dart
void _rootScheduleMicrotask(
Zone self, ZoneDelegate parent, Zone zone, void f()) {
if (!identical(_rootZone, zone)) {
bool hasErrorHandler = !_rootZone.inSameErrorZone(zone);
if (hasErrorHandler) {
f = zone.bindCallbackGuarded(f);
} else {
f = zone.bindCallback(f);
}
// Use root zone as event zone if the function is already bound.
zone = _rootZone;
}
_scheduleAsyncCallback(f);
}
_rootZone和zone是相等的航攒,于是調用_scheduleAsyncCallback,將f回調函數(shù)異步調用:
pkg\sky_engine\lib\async\schedule_microtask.dart
void _scheduleAsyncCallback(_AsyncCallback callback) {
_AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
if (_nextCallback == null) {
_nextCallback = _lastCallback = newEntry;
if (!_isInCallbackLoop) {
_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
}
} else {
_lastCallback.next = newEntry;
_lastCallback = newEntry;
}
}
接下來執(zhí)行_startMicrotaskLoop方法:
pkg\sky_engine\lib\async\schedule_microtask.dart
void _startMicrotaskLoop() {
_isInCallbackLoop = true;
try {
// Moved to separate function because try-finally prevents
// good optimization.
_microtaskLoop();
} finally {
_lastPriorityCallback = null;
_isInCallbackLoop = false;
if (_nextCallback != null) {
_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
}
}
}
可以看到在finally的代碼塊中趴梢,又會異步的調用_startMicrotaskLoop漠畜,當_nextCallback不為空時,就可以一直調用_microtaskLoop方法了坞靶。這些調用并不會阻塞UI線程盆驹,因為當前是異步的,而異步執(zhí)行的方法是_microtaskLoop:
void _microtaskLoop() {
while (_nextCallback != null) {
_lastPriorityCallback = null;
_AsyncCallbackEntry entry = _nextCallback;
_nextCallback = entry.next;
if (_nextCallback == null) _lastCallback = null;
(entry.callback)();
}
}
_nextCallback就是_AsyncCallbackEntry封裝的異步callback方法滩愁,執(zhí)行回調之前將_nextCallback賦值為下一個回調方法躯喇。
callback就是我們在_scheduleAsyncCallback方法中封裝過來的callback回調方法,這個回調方法就是_rootScheduleMicrotask中的f()硝枉,也就是上一章的scheduleMicrotask方法的第二個參數(shù)廉丽,最終回調到我們的onEvent方法了。
5妻味、Android通知flutter
回到flutter中_DefaultBinaryMessenger的handlePlatformMessage方法中:
@override
Future<void> handlePlatformMessage(
String channel,
ByteData data,
ui.PlatformMessageResponseCallback callback,
) async {
ByteData response;
try {
final MessageHandler handler = _handlers[channel];
if (handler != null) {
response = await handler(data);
} else {
ui.channelBuffers.push(channel, data, callback);
callback = null;
}
} catch (exception, stack) {
} finally {
if (callback != null) {
callback(response);
}
}
}
當沒有通過binaryMessenger.setMessageHandler
設置MessageHandler時正压,消息存在隊列中,一旦注冊之后责球,馬上就將消息分發(fā)給注冊者焦履;當Map中存在channel對應的MessageHandler時,直接回調雏逾,也就是回到了setMessageHandler的閉包代碼塊中嘉裤,重復執(zhí)行3.7.2之后的流程。