鴻蒙開發(fā)實戰(zhàn)系列之二:事件總線EventBus/RxBus

鴻蒙開發(fā)實戰(zhàn)系列之一:鴻蒙開發(fā)實戰(zhàn)系列之一:圓角

前言

上一篇跟大家分享了如何在鴻蒙系統(tǒng)中實現(xiàn)圓角,這一期我們來跟大家分享一下如何實現(xiàn)發(fā)布/訂閱的事件總線,也就是我們在Android開發(fā)中常用的EventBus、RxBus等框架晒衩。

開始之前饿这,我們先回顧下Android里面的事件發(fā)布/訂閱是個什么東西怜浅?
像EventBus凡蚜、RxBus是我們在Android應(yīng)用開發(fā)中常選用的發(fā)布/訂閱事件框架,用來代替?zhèn)鹘y(tǒng)的Intent披泪,Handler,Broadcast搬瑰,在Activity款票,F(xiàn)ragment,Service線程之間傳遞數(shù)據(jù)泽论,執(zhí)行方法艾少。

它有很多優(yōu)點:簡化應(yīng)用組件間的通信;解耦事件的發(fā)送者和接收者翼悴;避免復雜和容易出錯的依賴和生命周期的問題缚够;速度快,專門為高性能優(yōu)化過等等鹦赎。

主要工作原理:事件源將產(chǎn)生的消息發(fā)送到事件總線的特定通道之上谍椅,然后監(jiān)聽器在事先會訂閱事務(wù)總線之中不同的通道以區(qū)分消息的響應(yīng),然后當消息被發(fā)送到事務(wù)總線的特定通道之中時古话,所對應(yīng)的監(jiān)聽器會監(jiān)聽到消息雏吭,然后監(jiān)聽器根據(jù)程序中設(shè)置的響應(yīng)函數(shù)進行執(zhí)行。

rxbus.jpg

那在鴻蒙系統(tǒng)中陪踩,肯定是沒有現(xiàn)成的EventBus或者RxBus給我們直接用的杖们,那么我們?nèi)绾卧邙櫭上到y(tǒng)中的Slice,Ability膊毁,Service胀莹,線程之間傳遞消息,執(zhí)行方法呢婚温?
磨刀不誤砍柴工啊描焰,兄dei,要是不先把這個工具庫寫出來,這么幾周的時間怎么完成一個鴻蒙App呢荆秦,產(chǎn)品手中的30m大刀可不是鬧著玩的啊篱竭。

由于鴻蒙只能使用Java代碼,所以我們看上了RxJava這個小兄弟步绸,何不像在Android中一樣掺逼,在rxjava的基礎(chǔ)上封裝一個RxBus呢?跟Android大哥看齊不香么瓤介?

好的吕喘,說干就干,我們來手lu一個鴻蒙版的RxBus刑桑。

實現(xiàn)鴻蒙Rxbus

1氯质、先引入Rxjava庫

implementation 'io.reactivex.rxjava3:rxjava:3.0.4'

2、創(chuàng)建鴻蒙線程調(diào)度HarmonySchedulers

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Executor;

public class HarmonySchedulers implements Executor {

    private static HarmonySchedulers instance;
    private final Scheduler mMainScheduler;
    private TaskDispatcher uiTaskDispatcher;

    private HarmonySchedulers() {
        mMainScheduler = Schedulers.from(this);
    }

    public static synchronized Scheduler mainThread() {
        if (instance == null) {
            instance = new HarmonySchedulers();
        }
        return instance.mMainScheduler;
    }

    @Override
    public void execute(@NonNull Runnable command) {
         if (uiTaskDispatcher == null) {
            uiTaskDispatcher = getMainAbility().getUITaskDispatcher();//注意祠斧,這里要用Ability來獲取UI線程的任務(wù)發(fā)射器闻察,Ability自己想辦法獲取
        }
        uiTaskDispatcher.delayDispatch(runnable, delayTime);
    }
}

3、創(chuàng)建RxBus類琢锋,實現(xiàn)訂閱辕漂、注冊、取消注冊等功能

@SuppressWarnings("unused")
public class RxBus {
    public static final String LOG_BUS = "RXBUS_LOG";
    private static volatile RxBus defaultInstance;

    private Map<Class, List<Disposable>> subscriptionsByEventType = new HashMap<>();

    private Map<Object, List<Class>> eventTypesBySubscriber = new HashMap<>();

    private Map<Class, List<SubscriberMethod>> subscriberMethodByEventType = new HashMap<>();

    private final Subject<Object> bus;

    private RxBus() {
        this.bus = PublishSubject.create().toSerialized();
    }

    public static RxBus get() {
        RxBus rxBus = defaultInstance;
        if (defaultInstance == null) {
            synchronized (RxBus.class) {
                rxBus = defaultInstance;
                if (defaultInstance == null) {
                    rxBus = new RxBus();
                    defaultInstance = rxBus;
                }
            }
        }
        return rxBus;
    }

    /**
     * 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
     *
     * @param eventType 事件類型
     * @return return
     */
    private <T> Flowable<T> toObservable(Class<T> eventType) {
        return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(eventType);
    }

    /**
     * 根據(jù)傳遞的code和 eventType 類型返回特定類型(eventType)的 被觀察者
     *
     * @param code      事件code
     * @param eventType 事件類型
     */
    private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
        return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
                .filter(new Predicate<Message>() {
                    @Override
                    public boolean test(Message o) throws Exception {
                        return o.getCode() == code && eventType.isInstance(o.getObject());
                    }
                }).map(new Function<Message, Object>() {
                    @Override
                    public Object apply(Message o) throws Exception {
                        return o.getObject();
                    }
                }).cast(eventType);
    }

    /**
     * 注冊
     *
     * @param subscriber 訂閱者
     */
    public void register(Object subscriber) {
        Class<?> subClass = subscriber.getClass();
        Method[] methods = subClass.getDeclaredMethods();
        for (Method method : methods) {
            if (method.isAnnotationPresent(Subscribe.class)) {
                //獲得參數(shù)類型
                Class[] parameterType = method.getParameterTypes();
                //參數(shù)不為空 且參數(shù)個數(shù)為1
                if (parameterType != null && parameterType.length == 1) {

                    Class eventType = parameterType[0];

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();

                    SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode);
                    addSubscriberToMap(eventType, subscriberMethod);

                    addSubscriber(subscriberMethod);
                } else if (parameterType == null || parameterType.length == 0) {

                    Class eventType = BusData.class;

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();

                    SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode);
                    addSubscriberToMap(eventType, subscriberMethod);

                    addSubscriber(subscriberMethod);

                }
            }
        }
    }
    
    /**
     * 將event的類型以訂閱中subscriber為key保存到map里
     *
     * @param subscriber 訂閱者
     * @param eventType  event類型
     */
    private void addEventTypeToMap(Object subscriber, Class eventType) {
        List<Class> eventTypes = eventTypesBySubscriber.get(subscriber);
        if (eventTypes == null) {
            eventTypes = new ArrayList<>();
            eventTypesBySubscriber.put(subscriber, eventTypes);
        }

        if (!eventTypes.contains(eventType)) {
            eventTypes.add(eventType);
        }
    }

    /**
     * 將注解方法信息以event類型為key保存到map中
     *
     * @param eventType        event類型
     * @param subscriberMethod 注解方法信息
     */
    private void addSubscriberToMap(Class eventType, SubscriberMethod subscriberMethod) {
        List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
        if (subscriberMethods == null) {
            subscriberMethods = new ArrayList<>();
            subscriberMethodByEventType.put(eventType, subscriberMethods);
        }

        if (!subscriberMethods.contains(subscriberMethod)) {
            subscriberMethods.add(subscriberMethod);
        }
    }

    /**
     * 將訂閱事件以event類型為key保存到map,用于取消訂閱時用
     *
     * @param eventType  event類型
     * @param disposable 訂閱事件
     */
    private void addSubscriptionToMap(Class eventType, Disposable disposable) {
        List<Disposable> disposables = subscriptionsByEventType.get(eventType);
        if (disposables == null) {
            disposables = new ArrayList<>();
            subscriptionsByEventType.put(eventType, disposables);
        }

        if (!disposables.contains(disposable)) {
            disposables.add(disposable);
        }
    }

    /**
     * 用RxJava添加訂閱者
     *
     * @param subscriberMethod d
     */
    @SuppressWarnings("unchecked")
    private void addSubscriber(final SubscriberMethod subscriberMethod) {
        Flowable flowable;
        if (subscriberMethod.code == -1) {
            flowable = toObservable(subscriberMethod.eventType);
        } else {
            flowable = toObservable(subscriberMethod.code, subscriberMethod.eventType);
        }
        Disposable subscription = postToObservable(flowable, subscriberMethod)
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        callEvent(subscriberMethod, o);
                    }
                });
        addSubscriptionToMap(subscriberMethod.subscriber.getClass(), subscription);
    }

    /**
     * 用于處理訂閱事件在那個線程中執(zhí)行
     *
     * @param observable       d
     * @param subscriberMethod d
     * @return Observable
     */
    private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
        Scheduler scheduler;
        switch (subscriberMethod.threadMode) {
            case MAIN:
                scheduler = HarmonySchedulers.mainThread();
                break;

            case NEW_THREAD:
                scheduler = Schedulers.newThread();
                break;

            case CURRENT_THREAD:
                scheduler = Schedulers.trampoline();
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
        }
        return observable.observeOn(scheduler);
    }

    /**
     * 回調(diào)到訂閱者的方法中
     *
     * @param method code
     * @param object obj
     */
    private void callEvent(SubscriberMethod method, Object object) {
        Class eventClass = object.getClass();
        List<SubscriberMethod> methods = subscriberMethodByEventType.get(eventClass);
        if (methods != null && methods.size() > 0) {
            for (SubscriberMethod subscriberMethod : methods) {
                Subscribe sub = subscriberMethod.method.getAnnotation(Subscribe.class);
                int c = sub.code();
                if (c == method.code && method.subscriber.equals(subscriberMethod.subscriber) && method.method.equals(subscriberMethod.method)) {
                    subscriberMethod.invoke(object);
                }

            }
        }
    }

    /**
     * 取消注冊
     *
     * @param subscriber object
     */
    public void unRegister(Object subscriber) {
        List<Class> subscribedTypes = eventTypesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unSubscribeByEventType(subscriber.getClass());
                unSubscribeMethodByEventType(subscriber, eventType);
            }
            eventTypesBySubscriber.remove(subscriber);
        }
    }

    /**
     * subscriptions unsubscribe
     *
     * @param eventType eventType
     */
    private void unSubscribeByEventType(Class eventType) {
        List<Disposable> disposables = subscriptionsByEventType.get(eventType);
        if (disposables != null) {
            Iterator<Disposable> iterator = disposables.iterator();
            while (iterator.hasNext()) {
                Disposable disposable = iterator.next();
                if (disposable != null && !disposable.isDisposed()) {
                    disposable.dispose();
                    iterator.remove();
                }
            }
        }
    }

    /**
     * 移除subscriber對應(yīng)的subscriberMethods
     *
     * @param subscriber subscriber
     * @param eventType  eventType
     */
    private void unSubscribeMethodByEventType(Object subscriber, Class eventType) {
        List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
        if (subscriberMethods != null) {
            Iterator<SubscriberMethod> iterator = subscriberMethods.iterator();
            while (iterator.hasNext()) {
                SubscriberMethod subscriberMethod = iterator.next();
                if (subscriberMethod.subscriber.equals(subscriber)) {
                    iterator.remove();
                }
            }
        }
    }

    public void send(int code, Object o) {
        bus.onNext(new Message(code, o));
    }

    public void send(Object o) {
        bus.onNext(o);
    }

    public void send(int code) {
        bus.onNext(new Message(code, new BusData()));
    }

    private class Message {
        private int code;
        private Object object;

        public Message() {
        }

        private Message(int code, Object o) {
            this.code = code;
            this.object = o;
        }

        private int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        private Object getObject() {
            return object;
        }

        public void setObject(Object object) {
            this.object = object;
        }
    }
}

4吴超、添加其他附加類

BusData.java 事件數(shù)據(jù)封裝

public class BusData {
    String id;
    String status;
    public BusData() {}
    public BusData(String id, String status) {
        this.id = id;
        this.status = status;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getStatus() {
        return status;
    }
    public void setStatus(String status) {
        this.status = status;
    }
}

Subscribe.java 注解類

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
    int code() default -1;

    ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
}

SubscriberMethod.java 執(zhí)行注冊方法封裝

public class SubscriberMethod {
    public Method method;
    public ThreadMode threadMode;
    public Class<?> eventType;
    public Object subscriber;
    public int code;

    public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.subscriber = subscriber;
        this.code = code;
    }
    
    /**
     * 調(diào)用方法
     * @param o 參數(shù)
     */
    public void invoke(Object o){
        try {
            Class[] parameterType = method.getParameterTypes();
            if(parameterType != null && parameterType.length == 1){
                method.invoke(subscriber, o);
            } else if(parameterType == null || parameterType.length == 0){
                method.invoke(subscriber);
            }
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
    }
}

ThreadMode.java 線程模型钉嘹,用來標識事件運行線程

public enum ThreadMode {
    /**
     * current thread
     */
    CURRENT_THREAD,
    /**
     * android main thread
     */
    MAIN,
    /**
     * new thread
     */
    NEW_THREAD
}

使用Rxbus

1、定義事件參數(shù)類

public class RxbusEvent {}

2烛芬、定義事件接收者

public class RxBusDemoAbilitySlice extends AbilitySlice {
    @Override
    protected void onStart(Intent intent) {
        super.onStart(intent);
        RxBus.get().register(this);//注測rxbus
    }
    @Override
    protected void onStop() {
        super.onStop();
        RxBus.get().unRegister(this);//注銷rxbus
    }
    /**
     * 接收事件
     * @param rxbusEvent
     */
    @Subscribe(threadMode = ThreadMode.MAIN)
    public void rxBusRxbusEvent(RxbusEvent rxbusEvent) {
        if (rxbusEvent == null) {
            return;
        }
        //執(zhí)行對應(yīng)操作
    }    
}

3隧期、發(fā)送事件

RxBus.get().send(new RxbusEvent());//發(fā)送事件

總結(jié)

打完收工,最后使用起來跟Android中基本一樣赘娄,本期代碼比較多仆潮,歡迎大家使用測試并反饋bug,后期代碼整理好會一并傳到github上去遣臼。好了性置,繼續(xù)搞鴻蒙其他功能去了。

這是本系列的第二篇揍堰,后面還會為大家?guī)砀嗟镍櫭筛韶浥羟常凑埰诖?/p>

如果文章對您有一點啟發(fā)的話,希望您能點個贊屏歹,來個關(guān)注收藏不迷路隐砸。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蝙眶,隨后出現(xiàn)的幾起案子季希,更是在濱河造成了極大的恐慌褪那,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件式塌,死亡現(xiàn)場離奇詭異博敬,居然都是意外死亡,警方通過查閱死者的電腦和手機峰尝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進店門偏窝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人武学,你說我怎么就攤上這事祭往。” “怎么了火窒?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵链沼,是天一觀的道長。 經(jīng)常有香客問我沛鸵,道長,這世上最難降的妖魔是什么缆八? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任曲掰,我火速辦了婚禮,結(jié)果婚禮上奈辰,老公的妹妹穿的比我還像新娘栏妖。我一直安慰自己,他們只是感情好奖恰,可當我...
    茶點故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布吊趾。 她就那樣靜靜地躺著,像睡著了一般瑟啃。 火紅的嫁衣襯著肌膚如雪论泛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天蛹屿,我揣著相機與錄音屁奏,去河邊找鬼。 笑死错负,一個胖子當著我的面吹牛坟瓢,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播犹撒,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼折联,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了识颊?” 一聲冷哼從身側(cè)響起诚镰,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤酌儒,失蹤者是張志新(化名)和其女友劉穎滞时,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡甚亭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了盼忌。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片再愈。...
    茶點故事閱讀 38,768評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖跌帐,靈堂內(nèi)的尸體忽然破棺而出首懈,到底是詐尸還是另有隱情,我是刑警寧澤谨敛,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布究履,位于F島的核電站,受9級特大地震影響脸狸,放射性物質(zhì)發(fā)生泄漏最仑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一炊甲、第九天 我趴在偏房一處隱蔽的房頂上張望泥彤。 院中可真熱鬧,春花似錦卿啡、人聲如沸吟吝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽剑逃。三九已至,卻和暖如春官辽,著一層夾襖步出監(jiān)牢的瞬間蛹磺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工同仆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留称开,地道東北人。 一個月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓乓梨,卻偏偏與公主長得像鳖轰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子扶镀,可洞房花燭夜當晚...
    茶點故事閱讀 43,666評論 2 350