1.什么是pub/sub
Pub/Sub功能(means Publish, Subscribe)即發(fā)布及訂閱功能汗唱」常基于事件的系統(tǒng)中,Pub/Sub是目前廣泛使用的通信模型哩罪,它采用事件作為基本的通信機(jī)制授霸,提供大規(guī)模系統(tǒng)所要求的松散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達(dá)出它有興趣接收的一個(gè)事件或一類事件;發(fā)布者(如服務(wù)器)可將訂閱者感興趣的事件隨時(shí)通知相關(guān)訂閱者际插。熟悉設(shè)計(jì)模式的朋友應(yīng)該了解這與23種設(shè)計(jì)模式中的觀察者模式極為相似碘耳。
同樣,Redis的pub/sub是一種消息通信模式,主要的目的是解除消息發(fā)布者和消息訂閱者之間的耦合,Redis作為一個(gè)pub/sub的server,在訂閱者和發(fā)布者之間起到了消息路由的功能框弛。
2.Redis pub/sub的實(shí)現(xiàn)
Redis通過(guò)publish和subscribe命令實(shí)現(xiàn)訂閱和發(fā)布的功能辛辨。訂閱者可以通過(guò)subscribe向redis server訂閱自己感興趣的消息類型。redis將信息類型稱為通道(channel)瑟枫。當(dāng)發(fā)布者通過(guò)publish命令向redis server發(fā)送特定類型的信息時(shí)斗搞,訂閱該消息類型的全部訂閱者都會(huì)收到此消息。
客戶端1訂閱CCTV1:
127.0.0.1:6379> subscribe CCTV1
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "CCTV1"3) (integer) 1
127.0.0.1:6379> subscribe CCTV1
Reading messages... (press Ctrl-C to quit)
客戶端2訂閱CCTV1和CCTV2:
127.0.0.1:6379> subscribe CCTV1 CCTV2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "CCTV1"
3) (integer) 1
1) "subscribe"
2) "CCTV2"
3) (integer) 2
————————————————
此時(shí)這兩個(gè)客戶端分別監(jiān)聽這指定的頻道】睹睿現(xiàn)在另一個(gè)客戶端向服務(wù)器推送了關(guān)于這兩個(gè)頻道的信息僻焚。
127.0.0.1:6379> publish CCTV1 "cctv1 is good"
(integer) 2
//返回2表示兩個(gè)客戶端接收了次消息。被接收到消息的客戶端如下所示膝擂。
1) "message"
2) "CCTV1"
3) "cctv1 is good"
1) "message"
2) "CCTV1"
3) "cctv1 is good"
如上的訂閱/發(fā)布也稱訂閱發(fā)布到頻道(使用publish與subscribe命令)虑啤,此外還有訂閱發(fā)布到模式(使用psubscribe來(lái)訂閱一個(gè)模式)
訂閱CCTV的全部頻道
127.0.0.1:6379> psubscribe CCTV*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "CCTV*"
3) (integer) 1
當(dāng)依然先如上推送一個(gè)CCTV1的消息時(shí)隙弛,該客戶端正常接收。
Pub/Sub在java中的實(shí)現(xiàn):
導(dǎo)入Redis驅(qū)動(dòng):
dependencies {
? ? compile 'redis.clients:jedis:2.4.2'
}
Redis驅(qū)動(dòng)包提供了一個(gè)抽象類:JedisPubSub…繼承這個(gè)類就完成了對(duì)客戶端對(duì)訂閱的監(jiān)聽咐旧。示例代碼:
public class TestPubSub extends JedisPubSub {
? ? @Override
? ? public void onMessage(String channel, String message) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println(channel + "," + message);
? ? }
? ? @Override
? ? public void onPMessage(String pattern, String channel, String message) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println(pattern + "," + channel + "," + message);
? ? }
? ? @Override
? ? public void onSubscribe(String channel, int subscribedChannels) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println("onSubscribe: channel[" + channel + "]," + "subscribedChannels[" + subscribedChannels + "]");
? ? }
? ? @Override
? ? public void onUnsubscribe(String channel, int subscribedChannels) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println(
? ? ? ? ? ? ? ? "onUnsubscribe: channel[" + channel + "], " + "subscribedChannels[" + subscribedChannels + "]");
? ? }
? ? @Override
? ? public void onPUnsubscribe(String pattern, int subscribedChannels) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println("onPUnsubscribe: pattern[" + pattern + "]," +
? ? ? ? ? ? ? ? "subscribedChannels[" + subscribedChannels + "]");
? ? }
? ? @Override
? ? public void onPSubscribe(String pattern, int subscribedChannels) {
? ? ? ? System.out.println("onPSubscribe: pattern[" + pattern + "], " +
? ? ? ? ? ? ? ? "subscribedChannels[" + subscribedChannels + "]");
? ? }
}
如上所示,抽象類中存在六個(gè)方法驶鹉。分別表示
監(jiān)聽到訂閱模式接受到消息時(shí)的回調(diào) (onPMessage)
監(jiān)聽到訂閱頻道接受到消息時(shí)的回調(diào) (onMessage )
訂閱頻道時(shí)的回調(diào)( onSubscribe )
取消訂閱頻道時(shí)的回調(diào)( onUnsubscribe )
訂閱頻道模式時(shí)的回調(diào) ( onPSubscribe )
取消訂閱模式時(shí)的回調(diào)( onPUnsubscribe )
運(yùn)行我們剛剛編寫的類:
@Test
? ? public void pubsubjava() {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? Jedis jr = null;
? ? ? ? try {? ? ?
? ? ? ? jr = new Jedis("127.0.0.1", 6379, 0);// redis服務(wù)地址和端口號(hào)
? ? ? ? ? ? jr.auth("wx950709");
? ? ? ? ? ? TestPubSub sp = new TestPubSub();
? ? ? ? ? ? // jr客戶端配置監(jiān)聽兩個(gè)channel
? ? ? ? ? ? sp.subscribe(jr.getClient(), "news.share", "news.blog");
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } finally{
? ? ? ? ? ? if(jr!=null){
? ? ? ? ? ? ? ? jr.disconnect();
? ? ? ? ? ? }
? ? ? ? }
? ? }
從代碼中我們不難看出,我們聲明的一個(gè)redis鏈接在設(shè)置監(jiān)聽后就可以執(zhí)行一些操作铣墨,例如發(fā)布消息室埋,訂閱消息等。伊约。姚淆。
當(dāng)運(yùn)行上述代碼后會(huì)在控制臺(tái)輸出:
onSubscribe: channel[news.share],subscribedChannels[1]
onSubscribe: channel[news.blog],subscribedChannels[2]
//onSubscribe方法成功運(yùn)行
此時(shí)當(dāng)在有客戶端向new.share或者new.blog通道publish消息時(shí),onMessage方法即可被相應(yīng)屡律。(jedis.publish(channel, message))腌逢。
Pub/Sub在Spring中的實(shí)踐
導(dǎo)入依賴jar
dependencies {
? ? compile 'org.springframework.data:spring-data-redis:1.7.2.RELEASE'
? ? compile 'redis.clients:jedis:2.4.2'
}
Spring配置redis的鏈接:
@Configuration
public class AppConfig {
? ? @Bean
? ? JedisConnectionFactory jedisConnectionFactory() {
? ? ? ? JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
? ? ? ? jedisConnectionFactory.setPassword("xxxx");
? ? ? ? return jedisConnectionFactory;
? ? }
? ? @Bean
? ? RedisTemplate<String, Object> redisTemplate() {
? ? ? ? final RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
? ? ? ? template.setConnectionFactory(jedisConnectionFactory());
? ? ? ? template.setDefaultSerializer(new StringRedisSerializer());
? ? ? ? return template;
? ? }
? ? @Bean
? ? MessageListenerAdapter messageListener() {
? ? ? ? return new MessageListenerAdapter(new RedisMessageListener());
? ? }
? ? @Bean
? ? RedisMessageListenerContainer redisContainer() {
? ? ? ? final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
? ? ? ? container.setConnectionFactory(jedisConnectionFactory());
? ? ? ? container.addMessageListener(messageListener(), topic());
? ? ? ? return container;
? ? }
? ? @Bean
? ? RedisPublisherImpl redisPublisher() {
? ? ? ? return new RedisPublisherImpl(redisTemplate(), topic());
? ? }
? ? @Bean
? ? ChannelTopic topic() {
? ? ? ? return new ChannelTopic( "pubsub:queue" );
? ? }
}
如上的配置即配置了對(duì)Redis的鏈接。在RedisTemplate中沒(méi)有設(shè)置ip端口等信息則全部為默認(rèn)的超埋。在配置類中的將ChannelTopic加入IOC容器搏讶。則在Spring啟動(dòng)時(shí)會(huì)在一個(gè)RedisTemplate(一個(gè)對(duì)Redis的鏈接)中設(shè)置的一個(gè)channel,即pubsub:queue霍殴。
在上述配置中媒惕,RedisMessageListener是我們生成的,這個(gè)類即為核心監(jiān)聽類来庭,RedisTemplate接受到數(shù)據(jù)如何處理就是在該類中處理的妒蔚。
public class RedisMessageListener implements MessageListener {
? ? ? ? @Override
? ? ? ? public void onMessage( final Message message, final byte[] pattern ) {
? ? ? ? ? ? System.out.println("Message received: " + message.toString() );
? ? ? ? }
}
現(xiàn)在我們?cè)讷@取RedisTemplate,并給pubsub:queue這個(gè)channel publish數(shù)據(jù)。
public class PubSubMain {
? ? RedisTemplate<String,Object> redisTemplate;
? ? public? void execute() {
? ? ? String channel = "pubsub:queue";
? ? ? redisTemplate.convertAndSend(channel, "from testData");
? ? }
? ? public static void main(String[] args) {
? ? ? ? ApplicationContext applicationContext? = new AnnotationConfigApplicationContext(AppConfig.class);
? ? ? ? PubSubMain pubSubMain = new PubSubMain();
? ? ? ? pubSubMain.redisTemplate = (RedisTemplate<String, Object>) applicationContext.getBean("redisTemplate");
? ? ? ? pubSubMain.execute();
? ? }
}
此時(shí)運(yùn)行main 方法:
Message received: from app 12
//表明接受成功月弛,當(dāng)在命令行中啟動(dòng)一個(gè)客戶端并publish時(shí)依然可以在客戶端打印出message
后面的文章會(huì)多涉及一些關(guān)于Redis的使用場(chǎng)景肴盏。
原文鏈接:https://blog.csdn.net/canot/article/details/51938955