Python響應式類庫RxPy簡介

RxPy是非常流行的響應式框架Reactive X的Python版本蛋勺,其實這些版本都是一樣的瓦灶,只不過是各個語言的實現(xiàn)不同而已。因此抱完,如果學會了其中一種贼陶,那么使用其他的響應式版本也是輕而易舉的。之前我就聽說過這個框架巧娱,最近決定好好研究一下碉怔。

基本概念

Reactive X中有幾個核心的概念,先來簡單介紹一下禁添。

Observable和Observer(可觀察對象和觀察者)

首先是Observable和Observer撮胧,它們分別是可觀察對象和觀察者。Observable可以理解為一個異步的數(shù)據(jù)源上荡,會發(fā)送一系列的值趴樱。Observer則類似于消費者,需要先訂閱Observable酪捡,然后才可以接收到其發(fā)射的值叁征。可以說這組概念是設(shè)計模式中的觀察者模式和生產(chǎn)者-消費者模式的綜合體逛薇。

Operator(操作符)

另外一個非常重要的概念就是操作符了捺疼。操作符作用于Observable的數(shù)據(jù)流上,可以對其施加各種各樣的操作永罚。更重要的是啤呼,操作符還可以鏈式組合起來。這樣的鏈式函數(shù)調(diào)用不僅將數(shù)據(jù)和操作分隔開來呢袱,而且代碼更加清晰可讀官扣。一旦熟練掌握之后,你就會愛上這種感覺的羞福。

Single(單例)

在RxJava和其變體中惕蹄,還有一個比較特殊的概念叫做Single,它是一種只會發(fā)射同一個值的Observable治专,說白了就是單例怖辆。當然如果你對Java等語言比較熟悉室埋,那么單例想必也很熟悉。

Subject(主體)

主體這個概念非常特殊,它既是Observable又是Observer画拾。正是因為這個特點熟菲,所以Subject可以訂閱其他Observable朋贬,也可以將發(fā)射對象給其他Observer。在某些場景中铣揉,Subject會有很大的作用。

Scheduler(調(diào)度器)

默認情況下Reactive X只運行在當前線程下婿滓,但是如果有需要的話老速,也可以用調(diào)度器來讓Reactive X運行在多線程環(huán)境下。有很多調(diào)度器和對應的操作符凸主,可以處理多線程場景下的各種要求橘券。

Observer和Observable

先來看看一個最簡單的例子,運行的結(jié)果會依次打印這些數(shù)字卿吐。這里的of是一個操作符旁舰,可以根據(jù)給定的參數(shù)創(chuàng)建一個新的Observable。創(chuàng)建之后嗡官,就可以訂閱Observable箭窜,三個回調(diào)方法在對應的時機執(zhí)行。一旦Observer訂閱了Observable衍腥,就會接收到后續(xù)Observable發(fā)射的各項值磺樱。

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
    on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)

這個例子看起來好像很簡單,并且看起來沒什么用婆咸。但是當你了解了Rx的一些核心概念竹捉,就會理解到這是一個多么強大的工具。更重要的是尚骄,Observable生成數(shù)據(jù)和訂閱的過程是異步的块差,如果你熟悉的話,就可以利用這個特性做很多事情倔丈。

操作符

在RxPy中另一個非常重要的概念就是操作符了憨闰,甚至可以說操作符就是最重要的一個概念了。幾乎所有的功能都可以通過組合各個操作符來實現(xiàn)需五。熟練掌握操作符就是學好RxPy的關(guān)鍵了鹉动。操作符之間也可以用pipe函數(shù)連接起來,構(gòu)成復雜的操作鏈宏邮。

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
    op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))

在RxPy中有大量操作符泽示,可以完成各種各樣的功能。我們來簡單看看其中一些常用的操作符蜀铲。如果你熟悉Java8的流類庫或者其他函數(shù)式編程類庫的話边琉,應該對這些操作符感到非常親切属百。

創(chuàng)建型操作符

首先是創(chuàng)建Observable的操作符记劝,列舉了一些比較常用的創(chuàng)建型操作符。

操作符 作用
just(n) 只包含1個值的Observable
repeated_value(v,n) 重復n次值為v的Observable
of(a,b,c,d) 包含所有參數(shù)的Observable
empty() 一個空的Observable
from_iterable(iter) 用iterable創(chuàng)建一個Observable
generate(0, lambda x: x < 10, lambda x: x + 1) 用初始值和循環(huán)條件生成Observable
interval(n) 以n秒為間隔定時發(fā)送整數(shù)序列的Observable

過濾型操作符

過濾型操作符的主要作用是對Observable進行篩選和過濾族扰。

操作符 作用
debounce 按時間間隔過濾厌丑,在范圍內(nèi)的值會被忽略
distinct 忽略重復的值
elementAt 只發(fā)射第n位的值
filter 按條件過濾值
first/last 發(fā)射首/尾值
skip 跳過前n個值
take 只取前n個值

轉(zhuǎn)換型操作符

操作符 作用
flatMap 轉(zhuǎn)換多個Observable的值并將它們合并為一個Observable
groupBy 對值進行分組定欧,返回多個Observable
map 將Observable映射為另一個Observable
scan 將函數(shù)應用到Observable的每個值上,然后返回后面的值

算術(shù)操作符

操作符 作用
average 平均數(shù)
count 個數(shù)
max 最大值
min 最小值
reduce 將函數(shù)應用到每個值上怒竿,然后返回最終的計算結(jié)果
sum 求和

Subject

Subject是一種特殊的對象砍鸠,它既是Observer又是Observable。不過這個對象一般不太常用耕驰,但是假如某些用途還是很有用的爷辱。所以還是要介紹一下。下面的代碼朦肘,因為訂閱的時候第一個值已經(jīng)發(fā)射出去了饭弓,所以只會打印訂閱之后才發(fā)射的值。

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject同時是Observer和Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

另外還有幾個特殊的Subject媒抠,下面來介紹一下弟断。

ReplaySubject

ReplaySubject是一個特殊的Subject,它會記錄所有發(fā)射過的值趴生,不論什么時候訂閱的阀趴。所以它可以用來當做緩存來使用。ReplaySubject還可以接受一個bufferSize參數(shù)苍匆,指定可以緩存的最近數(shù)據(jù)數(shù)刘急,默認情況下是全部。

下面的代碼和上面的代碼幾乎完全一樣锉桑,但是因為使用了ReplaySubject排霉,所以所有的值都會被打印。當然大家也可以試試把訂閱語句放到其他位置民轴,看看輸出是否會產(chǎn)生變化攻柠。

# ReplaySubject會緩存所有值,如果指定參數(shù)的話只會緩存最近的幾個值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4

BehaviorSubject

BehaviorSubject是一個特殊的Subject后裸,它只會記錄最近一次發(fā)射的值瑰钮。而且在創(chuàng)建它的時候,必須指定一個初始值微驶,所有訂閱它的對象都可以接收到這個初始值浪谴。當然如果訂閱的晚了,這個初始值同樣會被后面發(fā)射的值覆蓋因苹,這一點要注意苟耻。

# BehaviorSubject會緩存上次發(fā)射的值,除非Observable已經(jīng)關(guān)閉
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

AsyncSubject

AsyncSubject是一個特殊的Subject扶檐,顧名思義它是一個異步的Subject凶杖,它只會在Observer完成的時候發(fā)射數(shù)據(jù),而且只會發(fā)射最后一個數(shù)據(jù)款筑。因此下面的代碼僅僅會輸出4.假如注釋掉最后一行co_completed調(diào)用智蝠,那么什么也不會輸出腾么。

# AsyncSubject會緩存上次發(fā)射的值,而且僅會在Observable關(guān)閉后開始發(fā)射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4

Scheduler

雖然RxPy算是異步的框架杈湾,但是其實它默認還是運行在單個線程之上的解虱,因此如果使用了某些會阻礙線程運行的操作,那么程序就會卡死漆撞。當然針對這些情況殴泰,我們就可以使用其他的Scheduler來調(diào)度任務,保證程序能夠高效運行浮驳。

下面的例子創(chuàng)建了一個ThreadPoolScheduler艰匙,它是基于線程池的調(diào)度器。兩個Observable用subscribe_on方法指定了調(diào)度器抹恳,因此它們會使用不同的線程來工作员凝。

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random


def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value


pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(
    op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(
    op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))

如果你觀察過各個操作符的API的話,可以發(fā)現(xiàn)大部分操作符都支持可選的Scheduler參數(shù)奋献,為操作符指定一個調(diào)度器健霹。如果操作符上指定了調(diào)度器的話,會優(yōu)先使用這個調(diào)度器瓶蚂;其次的話糖埋,會使用subscribe方法上指定的調(diào)度器;如果以上都沒有指定的話窃这,就會使用默認的調(diào)度器瞳别。

應用場景

好了,介紹了一些Reactive X的知識之后杭攻,下面來看看如何來使用Reactive X祟敛。在很多應用場景下,都可以利用Reactive X來抽象數(shù)據(jù)處理兆解,把概念簡單化馆铁。

防止重復發(fā)送

很多情況下我們都需要控制事件的發(fā)生間隔,比如有一個按鈕不小心按了好幾次锅睛,只希望第一次按鈕生效埠巨。這種情況下可以使用debounce操作符,它會過濾Observable现拒,小于指定時間間隔的數(shù)據(jù)會被過濾掉辣垒。debounce操作符會等待一段時間,直到過了間隔時間印蔬,才會發(fā)射最后一次的數(shù)據(jù)勋桶。如果想要過濾后面的數(shù)據(jù),發(fā)送第一次的數(shù)據(jù),則要使用throttle_first操作符哥遮。

下面的代碼可以比較好的演示這個操作符,快速按回車鍵發(fā)送數(shù)據(jù)陵究,注意觀察按鍵和數(shù)據(jù)顯示之間的關(guān)系眠饮,還可以把throttle_first操作符換成debounce操作符,然后再看看輸出會發(fā)生什么變化铜邮,還可以完全注釋掉pipe中的操作符仪召,再看看輸出會有什么變化。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce操作符松蒜,僅在時間間隔之外的可以發(fā)射

ob = Subject()
ob.pipe(
    op.throttle_first(3)
    # op.debounce(3)
).subscribe(
    on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break

操作數(shù)據(jù)流

如果需要對一些數(shù)據(jù)進行操作扔茅,那么同樣有一大堆操作符可以滿足需求。當然這部分功能并不是Reactive X獨有的秸苗,如果你對Java 8的流類庫有所了解召娜,會發(fā)現(xiàn)這兩者這方面的功能幾乎是完全一樣的。

下面是個簡單的例子惊楼,將兩個數(shù)據(jù)源結(jié)合起來玖瘸,然后找出來其中所有的偶數(shù)。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# 操作數(shù)據(jù)流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
    op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))

再或者一個利用reduce的簡單例子檀咙,求1-100的整數(shù)和雅倒。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(
    op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市弧可,隨后出現(xiàn)的幾起案子蔑匣,更是在濱河造成了極大的恐慌,老刑警劉巖棕诵,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件裁良,死亡現(xiàn)場離奇詭異,居然都是意外死亡校套,警方通過查閱死者的電腦和手機趴久,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來搔确,“玉大人彼棍,你說我怎么就攤上這事∩潘悖” “怎么了座硕?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長涕蜂。 經(jīng)常有香客問我华匾,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任蜘拉,我火速辦了婚禮萨西,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘旭旭。我一直安慰自己谎脯,他們只是感情好,可當我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布持寄。 她就那樣靜靜地躺著源梭,像睡著了一般。 火紅的嫁衣襯著肌膚如雪稍味。 梳的紋絲不亂的頭發(fā)上废麻,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機與錄音模庐,去河邊找鬼烛愧。 笑死,一個胖子當著我的面吹牛掂碱,可吹牛的內(nèi)容都是我干的屑彻。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼顶吮,長吁一口氣:“原來是場噩夢啊……” “哼社牲!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起悴了,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤搏恤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后湃交,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體熟空,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年搞莺,在試婚紗的時候發(fā)現(xiàn)自己被綠了息罗。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡才沧,死狀恐怖迈喉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情温圆,我是刑警寧澤挨摸,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站岁歉,受9級特大地震影響得运,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一熔掺、第九天 我趴在偏房一處隱蔽的房頂上張望饱搏。 院中可真熱鬧,春花似錦置逻、人聲如沸推沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至疯坤,卻和暖如春报慕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背压怠。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工眠冈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人菌瘫。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓蜗顽,卻偏偏與公主長得像,于是被迫代替她去往敵國和親雨让。 傳聞我的和親對象是個殘疾皇子雇盖,可洞房花燭夜當晚...
    茶點故事閱讀 44,976評論 2 355