RxPy是非常流行的響應式框架Reactive X的Python版本蛋勺,其實這些版本都是一樣的瓦灶,只不過是各個語言的實現(xiàn)不同而已。因此抱完,如果學會了其中一種贼陶,那么使用其他的響應式版本也是輕而易舉的。之前我就聽說過這個框架巧娱,最近決定好好研究一下碉怔。
基本概念
Reactive X中有幾個核心的概念,先來簡單介紹一下禁添。
Observable和Observer(可觀察對象和觀察者)
首先是Observable和Observer撮胧,它們分別是可觀察對象和觀察者。Observable可以理解為一個異步的數(shù)據(jù)源上荡,會發(fā)送一系列的值趴樱。Observer則類似于消費者,需要先訂閱Observable酪捡,然后才可以接收到其發(fā)射的值叁征。可以說這組概念是設(shè)計模式中的觀察者模式和生產(chǎn)者-消費者模式的綜合體逛薇。
Operator(操作符)
另外一個非常重要的概念就是操作符了捺疼。操作符作用于Observable的數(shù)據(jù)流上,可以對其施加各種各樣的操作永罚。更重要的是啤呼,操作符還可以鏈式組合起來。這樣的鏈式函數(shù)調(diào)用不僅將數(shù)據(jù)和操作分隔開來呢袱,而且代碼更加清晰可讀官扣。一旦熟練掌握之后,你就會愛上這種感覺的羞福。
Single(單例)
在RxJava和其變體中惕蹄,還有一個比較特殊的概念叫做Single,它是一種只會發(fā)射同一個值的Observable治专,說白了就是單例怖辆。當然如果你對Java等語言比較熟悉室埋,那么單例想必也很熟悉。
Subject(主體)
主體這個概念非常特殊,它既是Observable又是Observer画拾。正是因為這個特點熟菲,所以Subject可以訂閱其他Observable朋贬,也可以將發(fā)射對象給其他Observer。在某些場景中铣揉,Subject會有很大的作用。
Scheduler(調(diào)度器)
默認情況下Reactive X只運行在當前線程下婿滓,但是如果有需要的話老速,也可以用調(diào)度器來讓Reactive X運行在多線程環(huán)境下。有很多調(diào)度器和對應的操作符凸主,可以處理多線程場景下的各種要求橘券。
Observer和Observable
先來看看一個最簡單的例子,運行的結(jié)果會依次打印這些數(shù)字卿吐。這里的of是一個操作符旁舰,可以根據(jù)給定的參數(shù)創(chuàng)建一個新的Observable。創(chuàng)建之后嗡官,就可以訂閱Observable箭窜,三個回調(diào)方法在對應的時機執(zhí)行。一旦Observer訂閱了Observable衍腥,就會接收到后續(xù)Observable發(fā)射的各項值磺樱。
from rx import of
ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
on_next=lambda i: print(f'Received: {i}'),
on_error=lambda e: print(f'Error: {e}'),
on_completed=lambda: print('Completed')
)
這個例子看起來好像很簡單,并且看起來沒什么用婆咸。但是當你了解了Rx的一些核心概念竹捉,就會理解到這是一個多么強大的工具。更重要的是尚骄,Observable生成數(shù)據(jù)和訂閱的過程是異步的块差,如果你熟悉的話,就可以利用這個特性做很多事情倔丈。
操作符
在RxPy中另一個非常重要的概念就是操作符了憨闰,甚至可以說操作符就是最重要的一個概念了。幾乎所有的功能都可以通過組合各個操作符來實現(xiàn)需五。熟練掌握操作符就是學好RxPy的關(guān)鍵了鹉动。操作符之間也可以用pipe函數(shù)連接起來,構(gòu)成復雜的操作鏈宏邮。
from rx import of, operators as op
import rx
ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
op.map(lambda i: i ** 2),
op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))
在RxPy中有大量操作符泽示,可以完成各種各樣的功能。我們來簡單看看其中一些常用的操作符蜀铲。如果你熟悉Java8的流類庫或者其他函數(shù)式編程類庫的話边琉,應該對這些操作符感到非常親切属百。
創(chuàng)建型操作符
首先是創(chuàng)建Observable的操作符记劝,列舉了一些比較常用的創(chuàng)建型操作符。
操作符 | 作用 |
---|---|
just(n) | 只包含1個值的Observable |
repeated_value(v,n) | 重復n次值為v的Observable |
of(a,b,c,d) | 包含所有參數(shù)的Observable |
empty() | 一個空的Observable |
from_iterable(iter) | 用iterable創(chuàng)建一個Observable |
generate(0, lambda x: x < 10, lambda x: x + 1) | 用初始值和循環(huán)條件生成Observable |
interval(n) | 以n秒為間隔定時發(fā)送整數(shù)序列的Observable |
過濾型操作符
過濾型操作符的主要作用是對Observable進行篩選和過濾族扰。
操作符 | 作用 |
---|---|
debounce | 按時間間隔過濾厌丑,在范圍內(nèi)的值會被忽略 |
distinct | 忽略重復的值 |
elementAt | 只發(fā)射第n位的值 |
filter | 按條件過濾值 |
first/last | 發(fā)射首/尾值 |
skip | 跳過前n個值 |
take | 只取前n個值 |
轉(zhuǎn)換型操作符
操作符 | 作用 |
---|---|
flatMap | 轉(zhuǎn)換多個Observable的值并將它們合并為一個Observable |
groupBy | 對值進行分組定欧,返回多個Observable |
map | 將Observable映射為另一個Observable |
scan | 將函數(shù)應用到Observable的每個值上,然后返回后面的值 |
算術(shù)操作符
操作符 | 作用 |
---|---|
average | 平均數(shù) |
count | 個數(shù) |
max | 最大值 |
min | 最小值 |
reduce | 將函數(shù)應用到每個值上怒竿,然后返回最終的計算結(jié)果 |
sum | 求和 |
Subject
Subject是一種特殊的對象砍鸠,它既是Observer又是Observable。不過這個對象一般不太常用耕驰,但是假如某些用途還是很有用的爷辱。所以還是要介紹一下。下面的代碼朦肘,因為訂閱的時候第一個值已經(jīng)發(fā)射出去了饭弓,所以只會打印訂閱之后才發(fā)射的值。
from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject
# Subject同時是Observer和Observable
print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4
另外還有幾個特殊的Subject媒抠,下面來介紹一下弟断。
ReplaySubject
ReplaySubject是一個特殊的Subject,它會記錄所有發(fā)射過的值趴生,不論什么時候訂閱的阀趴。所以它可以用來當做緩存來使用。ReplaySubject還可以接受一個bufferSize參數(shù)苍匆,指定可以緩存的最近數(shù)據(jù)數(shù)刘急,默認情況下是全部。
下面的代碼和上面的代碼幾乎完全一樣锉桑,但是因為使用了ReplaySubject排霉,所以所有的值都會被打印。當然大家也可以試試把訂閱語句放到其他位置民轴,看看輸出是否會產(chǎn)生變化攻柠。
# ReplaySubject會緩存所有值,如果指定參數(shù)的話只會緩存最近的幾個值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4
BehaviorSubject
BehaviorSubject是一個特殊的Subject后裸,它只會記錄最近一次發(fā)射的值瑰钮。而且在創(chuàng)建它的時候,必須指定一個初始值微驶,所有訂閱它的對象都可以接收到這個初始值浪谴。當然如果訂閱的晚了,這個初始值同樣會被后面發(fā)射的值覆蓋因苹,這一點要注意苟耻。
# BehaviorSubject會緩存上次發(fā)射的值,除非Observable已經(jīng)關(guān)閉
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4
AsyncSubject
AsyncSubject是一個特殊的Subject扶檐,顧名思義它是一個異步的Subject凶杖,它只會在Observer完成的時候發(fā)射數(shù)據(jù),而且只會發(fā)射最后一個數(shù)據(jù)款筑。因此下面的代碼僅僅會輸出4.假如注釋掉最后一行co_completed調(diào)用智蝠,那么什么也不會輸出腾么。
# AsyncSubject會緩存上次發(fā)射的值,而且僅會在Observable關(guān)閉后開始發(fā)射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4
Scheduler
雖然RxPy算是異步的框架杈湾,但是其實它默認還是運行在單個線程之上的解虱,因此如果使用了某些會阻礙線程運行的操作,那么程序就會卡死漆撞。當然針對這些情況殴泰,我們就可以使用其他的Scheduler來調(diào)度任務,保證程序能夠高效運行浮驳。
下面的例子創(chuàng)建了一個ThreadPoolScheduler艰匙,它是基于線程池的調(diào)度器。兩個Observable用subscribe_on方法指定了調(diào)度器抹恳,因此它們會使用不同的線程來工作员凝。
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op
import multiprocessing
import time
import threading
import random
def long_work(value):
time.sleep(random.randint(5, 20) / 10)
return value
pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())
rx.range(5).pipe(
op.map(lambda i: long_work(i + 1)),
op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))
rx.of(1, 2, 3, 4, 5).pipe(
op.map(lambda i: i * 2),
op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))
如果你觀察過各個操作符的API的話,可以發(fā)現(xiàn)大部分操作符都支持可選的Scheduler參數(shù)奋献,為操作符指定一個調(diào)度器健霹。如果操作符上指定了調(diào)度器的話,會優(yōu)先使用這個調(diào)度器瓶蚂;其次的話糖埋,會使用subscribe方法上指定的調(diào)度器;如果以上都沒有指定的話窃这,就會使用默認的調(diào)度器瞳别。
應用場景
好了,介紹了一些Reactive X的知識之后杭攻,下面來看看如何來使用Reactive X祟敛。在很多應用場景下,都可以利用Reactive X來抽象數(shù)據(jù)處理兆解,把概念簡單化馆铁。
防止重復發(fā)送
很多情況下我們都需要控制事件的發(fā)生間隔,比如有一個按鈕不小心按了好幾次锅睛,只希望第一次按鈕生效埠巨。這種情況下可以使用debounce操作符,它會過濾Observable现拒,小于指定時間間隔的數(shù)據(jù)會被過濾掉辣垒。debounce操作符會等待一段時間,直到過了間隔時間印蔬,才會發(fā)射最后一次的數(shù)據(jù)勋桶。如果想要過濾后面的數(shù)據(jù),發(fā)送第一次的數(shù)據(jù),則要使用throttle_first操作符哥遮。
下面的代碼可以比較好的演示這個操作符,快速按回車鍵發(fā)送數(shù)據(jù)陵究,注意觀察按鍵和數(shù)據(jù)顯示之間的關(guān)系眠饮,還可以把throttle_first操作符換成debounce操作符,然后再看看輸出會發(fā)生什么變化铜邮,還可以完全注釋掉pipe中的操作符仪召,再看看輸出會有什么變化。
import rx
from rx import operators as op
from rx.subject import Subject
import datetime
# debounce操作符松蒜,僅在時間間隔之外的可以發(fā)射
ob = Subject()
ob.pipe(
op.throttle_first(3)
# op.debounce(3)
).subscribe(
on_next=lambda i: print(i),
on_completed=lambda: print('Completed')
)
print('press enter to print, press other key to exit')
while True:
s = input()
if s == '':
ob.on_next(datetime.datetime.now().time())
else:
ob.on_completed()
break
操作數(shù)據(jù)流
如果需要對一些數(shù)據(jù)進行操作扔茅,那么同樣有一大堆操作符可以滿足需求。當然這部分功能并不是Reactive X獨有的秸苗,如果你對Java 8的流類庫有所了解召娜,會發(fā)現(xiàn)這兩者這方面的功能幾乎是完全一樣的。
下面是個簡單的例子惊楼,將兩個數(shù)據(jù)源結(jié)合起來玖瘸,然后找出來其中所有的偶數(shù)。
import rx
from rx import operators as op
from rx.subject import Subject
import datetime
# 操作數(shù)據(jù)流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
op.merge(some_data2),
op.filter(lambda i: i % 2 == 0),
# op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))
再或者一個利用reduce的簡單例子檀咙,求1-100的整數(shù)和雅倒。
import rx
from rx import operators as op
from rx.subject import Subject
import datetime
rx.range(1, 101).pipe(
op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))