Redis的pub/Sub(訂閱與發(fā)布)java 代碼實(shí)現(xiàn)

1.什么是pub/sub

Pub/Sub功能(means Publish, Subscribe)即發(fā)布及訂閱功能汗唱」常基于事件的系統(tǒng)中,Pub/Sub是目前廣泛使用的通信模型哩罪,它采用事件作為基本的通信機(jī)制授霸,提供大規(guī)模系統(tǒng)所要求的松散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達(dá)出它有興趣接收的一個(gè)事件或一類事件;發(fā)布者(如服務(wù)器)可將訂閱者感興趣的事件隨時(shí)通知相關(guān)訂閱者际插。熟悉設(shè)計(jì)模式的朋友應(yīng)該了解這與23種設(shè)計(jì)模式中的觀察者模式極為相似碘耳。

同樣,Redis的pub/sub是一種消息通信模式,主要的目的是解除消息發(fā)布者和消息訂閱者之間的耦合,Redis作為一個(gè)pub/sub的server,在訂閱者和發(fā)布者之間起到了消息路由的功能框弛。

2.Redis pub/sub的實(shí)現(xiàn)

Redis通過(guò)publish和subscribe命令實(shí)現(xiàn)訂閱和發(fā)布的功能辛辨。訂閱者可以通過(guò)subscribe向redis server訂閱自己感興趣的消息類型。redis將信息類型稱為通道(channel)瑟枫。當(dāng)發(fā)布者通過(guò)publish命令向redis server發(fā)送特定類型的信息時(shí)斗搞,訂閱該消息類型的全部訂閱者都會(huì)收到此消息。

客戶端1訂閱CCTV1:

127.0.0.1:6379> subscribe CCTV1

Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "CCTV1"3) (integer) 1

127.0.0.1:6379> subscribe CCTV1

Reading messages... (press Ctrl-C to quit)


客戶端2訂閱CCTV1和CCTV2:

127.0.0.1:6379> subscribe CCTV1 CCTV2

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "CCTV1"

3) (integer) 1

1) "subscribe"

2) "CCTV2"

3) (integer) 2

————————————————


此時(shí)這兩個(gè)客戶端分別監(jiān)聽這指定的頻道】睹睿現(xiàn)在另一個(gè)客戶端向服務(wù)器推送了關(guān)于這兩個(gè)頻道的信息僻焚。

127.0.0.1:6379> publish CCTV1 "cctv1 is good"

(integer) 2

//返回2表示兩個(gè)客戶端接收了次消息。被接收到消息的客戶端如下所示膝擂。

1) "message"

2) "CCTV1"

3) "cctv1 is good"

1) "message"

2) "CCTV1"

3) "cctv1 is good"

如上的訂閱/發(fā)布也稱訂閱發(fā)布到頻道(使用publish與subscribe命令)虑啤,此外還有訂閱發(fā)布到模式(使用psubscribe來(lái)訂閱一個(gè)模式)

訂閱CCTV的全部頻道

127.0.0.1:6379> psubscribe CCTV*

Reading messages... (press Ctrl-C to quit)

1) "psubscribe"

2) "CCTV*"

3) (integer) 1


當(dāng)依然先如上推送一個(gè)CCTV1的消息時(shí)隙弛,該客戶端正常接收。

Pub/Sub在java中的實(shí)現(xiàn):

導(dǎo)入Redis驅(qū)動(dòng):

dependencies {

? ? compile 'redis.clients:jedis:2.4.2'

}

Redis驅(qū)動(dòng)包提供了一個(gè)抽象類:JedisPubSub…繼承這個(gè)類就完成了對(duì)客戶端對(duì)訂閱的監(jiān)聽咐旧。示例代碼:

public class TestPubSub extends JedisPubSub {

? ? @Override

? ? public void onMessage(String channel, String message) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println(channel + "," + message);

? ? }

? ? @Override

? ? public void onPMessage(String pattern, String channel, String message) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println(pattern + "," + channel + "," + message);

? ? }

? ? @Override

? ? public void onSubscribe(String channel, int subscribedChannels) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println("onSubscribe: channel[" + channel + "]," + "subscribedChannels[" + subscribedChannels + "]");

? ? }

? ? @Override

? ? public void onUnsubscribe(String channel, int subscribedChannels) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println(

? ? ? ? ? ? ? ? "onUnsubscribe: channel[" + channel + "], " + "subscribedChannels[" + subscribedChannels + "]");

? ? }

? ? @Override

? ? public void onPUnsubscribe(String pattern, int subscribedChannels) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println("onPUnsubscribe: pattern[" + pattern + "]," +

? ? ? ? ? ? ? ? "subscribedChannels[" + subscribedChannels + "]");

? ? }

? ? @Override

? ? public void onPSubscribe(String pattern, int subscribedChannels) {

? ? ? ? System.out.println("onPSubscribe: pattern[" + pattern + "], " +

? ? ? ? ? ? ? ? "subscribedChannels[" + subscribedChannels + "]");

? ? }

}


如上所示,抽象類中存在六個(gè)方法驶鹉。分別表示

監(jiān)聽到訂閱模式接受到消息時(shí)的回調(diào) (onPMessage)

監(jiān)聽到訂閱頻道接受到消息時(shí)的回調(diào) (onMessage )

訂閱頻道時(shí)的回調(diào)( onSubscribe )

取消訂閱頻道時(shí)的回調(diào)( onUnsubscribe )

訂閱頻道模式時(shí)的回調(diào) ( onPSubscribe )

取消訂閱模式時(shí)的回調(diào)( onPUnsubscribe )

運(yùn)行我們剛剛編寫的類:

@Test

? ? public void pubsubjava() {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? Jedis jr = null;

? ? ? ? try {? ? ?

? ? ? ? jr = new Jedis("127.0.0.1", 6379, 0);// redis服務(wù)地址和端口號(hào)

? ? ? ? ? ? jr.auth("wx950709");

? ? ? ? ? ? TestPubSub sp = new TestPubSub();

? ? ? ? ? ? // jr客戶端配置監(jiān)聽兩個(gè)channel

? ? ? ? ? ? sp.subscribe(jr.getClient(), "news.share", "news.blog");

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } finally{

? ? ? ? ? ? if(jr!=null){

? ? ? ? ? ? ? ? jr.disconnect();

? ? ? ? ? ? }

? ? ? ? }

? ? }

從代碼中我們不難看出,我們聲明的一個(gè)redis鏈接在設(shè)置監(jiān)聽后就可以執(zhí)行一些操作铣墨,例如發(fā)布消息室埋,訂閱消息等。伊约。姚淆。

當(dāng)運(yùn)行上述代碼后會(huì)在控制臺(tái)輸出:

onSubscribe: channel[news.share],subscribedChannels[1]

onSubscribe: channel[news.blog],subscribedChannels[2]

//onSubscribe方法成功運(yùn)行

此時(shí)當(dāng)在有客戶端向new.share或者new.blog通道publish消息時(shí),onMessage方法即可被相應(yīng)屡律。(jedis.publish(channel, message))腌逢。

Pub/Sub在Spring中的實(shí)踐

導(dǎo)入依賴jar

dependencies {

? ? compile 'org.springframework.data:spring-data-redis:1.7.2.RELEASE'

? ? compile 'redis.clients:jedis:2.4.2'

}

Spring配置redis的鏈接:

@Configuration

public class AppConfig {

? ? @Bean

? ? JedisConnectionFactory jedisConnectionFactory() {

? ? ? ? JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();

? ? ? ? jedisConnectionFactory.setPassword("xxxx");

? ? ? ? return jedisConnectionFactory;

? ? }

? ? @Bean

? ? RedisTemplate<String, Object> redisTemplate() {

? ? ? ? final RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();

? ? ? ? template.setConnectionFactory(jedisConnectionFactory());

? ? ? ? template.setDefaultSerializer(new StringRedisSerializer());

? ? ? ? return template;

? ? }

? ? @Bean

? ? MessageListenerAdapter messageListener() {

? ? ? ? return new MessageListenerAdapter(new RedisMessageListener());

? ? }

? ? @Bean

? ? RedisMessageListenerContainer redisContainer() {

? ? ? ? final RedisMessageListenerContainer container = new RedisMessageListenerContainer();

? ? ? ? container.setConnectionFactory(jedisConnectionFactory());

? ? ? ? container.addMessageListener(messageListener(), topic());

? ? ? ? return container;

? ? }

? ? @Bean

? ? RedisPublisherImpl redisPublisher() {

? ? ? ? return new RedisPublisherImpl(redisTemplate(), topic());

? ? }

? ? @Bean

? ? ChannelTopic topic() {

? ? ? ? return new ChannelTopic( "pubsub:queue" );

? ? }

}


如上的配置即配置了對(duì)Redis的鏈接。在RedisTemplate中沒(méi)有設(shè)置ip端口等信息則全部為默認(rèn)的超埋。在配置類中的將ChannelTopic加入IOC容器搏讶。則在Spring啟動(dòng)時(shí)會(huì)在一個(gè)RedisTemplate(一個(gè)對(duì)Redis的鏈接)中設(shè)置的一個(gè)channel,即pubsub:queue霍殴。

在上述配置中媒惕,RedisMessageListener是我們生成的,這個(gè)類即為核心監(jiān)聽類来庭,RedisTemplate接受到數(shù)據(jù)如何處理就是在該類中處理的妒蔚。

public class RedisMessageListener implements MessageListener {

? ? ? ? @Override

? ? ? ? public void onMessage( final Message message, final byte[] pattern ) {

? ? ? ? ? ? System.out.println("Message received: " + message.toString() );

? ? ? ? }

}

現(xiàn)在我們?cè)讷@取RedisTemplate,并給pubsub:queue這個(gè)channel publish數(shù)據(jù)。

public class PubSubMain {

? ? RedisTemplate<String,Object> redisTemplate;

? ? public? void execute() {

? ? ? String channel = "pubsub:queue";

? ? ? redisTemplate.convertAndSend(channel, "from testData");

? ? }

? ? public static void main(String[] args) {

? ? ? ? ApplicationContext applicationContext? = new AnnotationConfigApplicationContext(AppConfig.class);

? ? ? ? PubSubMain pubSubMain = new PubSubMain();

? ? ? ? pubSubMain.redisTemplate = (RedisTemplate<String, Object>) applicationContext.getBean("redisTemplate");

? ? ? ? pubSubMain.execute();

? ? }

}


此時(shí)運(yùn)行main 方法:

Message received: from app 12

//表明接受成功月弛,當(dāng)在命令行中啟動(dòng)一個(gè)客戶端并publish時(shí)依然可以在客戶端打印出message

后面的文章會(huì)多涉及一些關(guān)于Redis的使用場(chǎng)景肴盏。

原文鏈接:https://blog.csdn.net/canot/article/details/51938955

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市帽衙,隨后出現(xiàn)的幾起案子菜皂,更是在濱河造成了極大的恐慌,老刑警劉巖厉萝,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件幌墓,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡冀泻,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門蜡饵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)弹渔,“玉大人,你說(shuō)我怎么就攤上這事溯祸≈ǎ” “怎么了舞肆?”我有些...
    開封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)博杖。 經(jīng)常有香客問(wèn)我椿胯,道長(zhǎng),這世上最難降的妖魔是什么剃根? 我笑而不...
    開封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任哩盲,我火速辦了婚禮,結(jié)果婚禮上狈醉,老公的妹妹穿的比我還像新娘廉油。我一直安慰自己,他們只是感情好苗傅,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開白布抒线。 她就那樣靜靜地躺著,像睡著了一般渣慕。 火紅的嫁衣襯著肌膚如雪嘶炭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天逊桦,我揣著相機(jī)與錄音眨猎,去河邊找鬼。 笑死卫袒,一個(gè)胖子當(dāng)著我的面吹牛宵呛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播夕凝,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼宝穗,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了码秉?” 一聲冷哼從身側(cè)響起逮矛,我...
    開封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎转砖,沒(méi)想到半個(gè)月后须鼎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡府蔗,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年晋控,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片姓赤。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡赡译,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出不铆,到底是詐尸還是另有隱情蝌焚,我是刑警寧澤裹唆,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站只洒,受9級(jí)特大地震影響许帐,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜毕谴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一成畦、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧析珊,春花似錦羡鸥、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至奕剃,卻和暖如春衷旅,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背纵朋。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工柿顶, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人操软。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓嘁锯,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親聂薪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子家乘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容