轉(zhuǎn)載自:http://blog.csdn.net/chenzba/article/details/51224715
最近使用redis的c接口——hiredis道媚,使客戶端與redis服務(wù)器通信贴硫,實(shí)現(xiàn)消息訂閱和發(fā)布(PUB/SUB)的功能男旗,我把遇到的一些問題和解決方法列出來供大家學(xué)習(xí)糯而。
廢話不多說诱贿,先貼代碼愁溜。
redis_publisher.h
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?redis_publisher.h
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST
>?Description:?封裝hiredis芍殖,實(shí)現(xiàn)消息發(fā)布給redis功能
************************************************************************/
#ifndef?REDIS_PUBLISHER_H
#define?REDIS_PUBLISHER_H
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
classCRedisPublisher
{
public:
CRedisPublisher();
~CRedisPublisher();
boolinit();
booluninit();
boolconnect();
booldisconnect();
boolpublish(conststd::string?&channel_name,
conststd::string?&message);
private:
//?下面三個(gè)回調(diào)函數(shù)供redis服務(wù)調(diào)用
//?連接回調(diào)
staticvoidconnect_callback(constredisAsyncContext?*redis_context,
intstatus);
//?斷開連接的回調(diào)
staticvoiddisconnect_callback(constredisAsyncContext?*redis_context,
intstatus);
//?執(zhí)行命令回調(diào)
staticvoidcommand_callback(redisAsyncContext?*redis_context,
void*reply,void*privdata);
//?事件分發(fā)線程函數(shù)
staticvoid*event_thread(void*data);
void*event_proc();
private:
//?libevent事件對(duì)象
event_base?*_event_base;
//?事件線程ID
pthread_t?_event_thread;
//?事件線程的信號(hào)量
sem_t?_event_sem;
//?hiredis異步對(duì)象
redisAsyncContext?*_redis_context;
};
#endif
redis_publisher.cpp
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?redis_publisher.cpp
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST
>?Description:
************************************************************************/
#include?
#include?
#include?
#include?"redis_publisher.h"
CRedisPublisher::CRedisPublisher():_event_base(0),?_event_thread(0),
_redis_context(0)
{
}
CRedisPublisher::~CRedisPublisher()
{
}
boolCRedisPublisher::init()
{
//?initialize?the?event
_event_base?=?event_base_new();//?創(chuàng)建libevent對(duì)象
if(NULL?==?_event_base)
{
printf(":?Create?redis?event?failed.\n");
returnfalse;
}
memset(&_event_sem,?0,sizeof(_event_sem));
intret?=?sem_init(&_event_sem,?0,?0);
if(ret?!=?0)
{
printf(":?Init?sem?failed.\n");
returnfalse;
}
returntrue;
}
boolCRedisPublisher::uninit()
{
_event_base?=?NULL;
sem_destroy(&_event_sem);
returntrue;
}
boolCRedisPublisher::connect()
{
//?connect?redis
_redis_context?=?redisAsyncConnect("127.0.0.1",?6379);//?異步連接到redis服務(wù)器上豪嗽,使用默認(rèn)端口
if(NULL?==?_redis_context)
{
printf(":?Connect?redis?failed.\n");
returnfalse;
}
if(_redis_context->err)
{
printf(":?Connect?redis?error:?%d,?%s\n",
_redis_context->err,?_redis_context->errstr);//?輸出錯(cuò)誤信息
returnfalse;
}
//?attach?the?event
redisLibeventAttach(_redis_context,?_event_base);//?將事件綁定到redis?context上,使設(shè)置給redis的回調(diào)跟事件關(guān)聯(lián)
//?創(chuàng)建事件處理線程
intret?=?pthread_create(&_event_thread,?0,?&CRedisPublisher::event_thread,this);
if(ret?!=?0)
{
printf(":?create?event?thread?failed.\n");
disconnect();
returnfalse;
}
//?設(shè)置連接回調(diào)豌骏,當(dāng)異步調(diào)用連接后龟梦,服務(wù)器處理連接請(qǐng)求結(jié)束后調(diào)用,通知調(diào)用者連接的狀態(tài)
redisAsyncSetConnectCallback(_redis_context,
&CRedisPublisher::connect_callback);
//?設(shè)置斷開連接回調(diào)肯适,當(dāng)服務(wù)器斷開連接后变秦,通知調(diào)用者連接斷開,調(diào)用者可以利用這個(gè)函數(shù)實(shí)現(xiàn)重連
redisAsyncSetDisconnectCallback(_redis_context,
&CRedisPublisher::disconnect_callback);
//?啟動(dòng)事件線程
sem_post(&_event_sem);
returntrue;
}
boolCRedisPublisher::disconnect()
{
if(_redis_context)
{
redisAsyncDisconnect(_redis_context);
redisAsyncFree(_redis_context);
_redis_context?=?NULL;
}
returntrue;
}
boolCRedisPublisher::publish(conststd::string?&channel_name,
conststd::string?&message)
{
intret?=?redisAsyncCommand(_redis_context,
&CRedisPublisher::command_callback,this,"PUBLISH?%s?%s",
channel_name.c_str(),?message.c_str());
if(REDIS_ERR?==?ret)
{
printf("Publish?command?failed:?%d\n",?ret);
returnfalse;
}
returntrue;
}
voidCRedisPublisher::connect_callback(constredisAsyncContext?*redis_context,
intstatus)
{
if(status?!=?REDIS_OK)
{
printf(":?Error:?%s\n",?redis_context->errstr);
}
else
{
printf(":?Redis?connected!\n");
}
}
voidCRedisPublisher::disconnect_callback(
constredisAsyncContext?*redis_context,intstatus)
{
if(status?!=?REDIS_OK)
{
//?這里異常退出框舔,可以嘗試重連
printf(":?Error:?%s\n",?redis_context->errstr);
}
}
//?消息接收回調(diào)函數(shù)
voidCRedisPublisher::command_callback(redisAsyncContext?*redis_context,
void*reply,void*privdata)
{
printf("command?callback.\n");
//?這里不執(zhí)行任何操作
}
void*CRedisPublisher::event_thread(void*data)
{
if(NULL?==?data)
{
printf(":?Error!\n");
assert(false);
returnNULL;
}
CRedisPublisher?*self_this?=reinterpret_cast(data);
returnself_this->event_proc();
}
void*CRedisPublisher::event_proc()
{
sem_wait(&_event_sem);
//?開啟事件分發(fā)蹦玫,event_base_dispatch會(huì)阻塞
event_base_dispatch(_event_base);
returnNULL;
}
redis_subscriber.h
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?redis_subscriber.h
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST
>?Description:?封裝hiredis,實(shí)現(xiàn)消息訂閱redis功能
************************************************************************/
#ifndef?REDIS_SUBSCRIBER_H
#define?REDIS_SUBSCRIBER_H
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
classCRedisSubscriber
{
public:
typedef? std::tr1::function <void(const char*,const char*,int)> NotifyMessageFn;//?回調(diào)函數(shù)對(duì)象類型刘绣,當(dāng)接收到消息后調(diào)用回調(diào)把消息發(fā)送出去
CRedisSubscriber();
~CRedisSubscriber();
boolinit(const NotifyMessageFn?&fn);//?傳入回調(diào)對(duì)象
bool uninit();
bool connect();
bool disconnect();
//?可以多次調(diào)用樱溉,訂閱多個(gè)頻道
bool subscribe(conststd::string?&channel_name);
private:
//?下面三個(gè)回調(diào)函數(shù)供redis服務(wù)調(diào)用
//?連接回調(diào)
static void connect_callback(constredisAsyncContext?*redis_context,
intstatus);
//?斷開連接的回調(diào)
staticvoiddisconnect_callback(constredisAsyncContext?*redis_context,
intstatus);
//?執(zhí)行命令回調(diào)
staticvoidcommand_callback(redisAsyncContext?*redis_context,
void*reply,void*privdata);
//?事件分發(fā)線程函數(shù)
staticvoid*event_thread(void*data);
void*event_proc();
private:
//?libevent事件對(duì)象
event_base?*_event_base;
//?事件線程ID
pthread_t?_event_thread;
//?事件線程的信號(hào)量
sem_t?_event_sem;
//?hiredis異步對(duì)象
redisAsyncContext?*_redis_context;
//?通知外層的回調(diào)函數(shù)對(duì)象
NotifyMessageFn?_notify_message_fn;
};
#endif
redis_subscriber.cpp:
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?redis_subscriber.cpp
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST
>?Description:
************************************************************************/
#include?
#include?
#include?
#include?"redis_subscriber.h"
CRedisSubscriber::CRedisSubscriber():_event_base(0),?_event_thread(0),
_redis_context(0)
{
}
CRedisSubscriber::~CRedisSubscriber()
{
}
boolCRedisSubscriber::init(constNotifyMessageFn?&fn)
{
//?initialize?the?event
_notify_message_fn?=?fn;
_event_base?=?event_base_new();//?創(chuàng)建libevent對(duì)象
if(NULL?==?_event_base)
{
printf(":?Create?redis?event?failed.\n");
returnfalse;
}
memset(&_event_sem,?0,sizeof(_event_sem));
intret?=?sem_init(&_event_sem,?0,?0);
if(ret?!=?0)
{
printf(":?Init?sem?failed.\n");
returnfalse;
}
returntrue;
}
boolCRedisSubscriber::uninit()
{
_event_base?=?NULL;
sem_destroy(&_event_sem);
returntrue;
}
boolCRedisSubscriber::connect()
{
//?connect?redis
_redis_context?=?redisAsyncConnect("127.0.0.1",?6379);//?異步連接到redis服務(wù)器上,使用默認(rèn)端口
if(NULL?==?_redis_context)
{
printf(":?Connect?redis?failed.\n");
returnfalse;
}
if(_redis_context->err)
{
printf(":?Connect?redis?error:?%d,?%s\n",
_redis_context->err,?_redis_context->errstr);//?輸出錯(cuò)誤信息
returnfalse;
}
//?attach?the?event
redisLibeventAttach(_redis_context,?_event_base);//?將事件綁定到redis?context上纬凤,使設(shè)置給redis的回調(diào)跟事件關(guān)聯(lián)
//?創(chuàng)建事件處理線程
intret?=?pthread_create(&_event_thread,?0,?&CRedisSubscriber::event_thread,this);
if(ret?!=?0)
{
printf(":?create?event?thread?failed.\n");
disconnect();
returnfalse;
}
//?設(shè)置連接回調(diào)福贞,當(dāng)異步調(diào)用連接后,服務(wù)器處理連接請(qǐng)求結(jié)束后調(diào)用停士,通知調(diào)用者連接的狀態(tài)
redisAsyncSetConnectCallback(_redis_context,
&CRedisSubscriber::connect_callback);
//?設(shè)置斷開連接回調(diào)挖帘,當(dāng)服務(wù)器斷開連接后完丽,通知調(diào)用者連接斷開,調(diào)用者可以利用這個(gè)函數(shù)實(shí)現(xiàn)重連
redisAsyncSetDisconnectCallback(_redis_context,
&CRedisSubscriber::disconnect_callback);
//?啟動(dòng)事件線程
sem_post(&_event_sem);
returntrue;
}
boolCRedisSubscriber::disconnect()
{
if(_redis_context)
{
redisAsyncDisconnect(_redis_context);
redisAsyncFree(_redis_context);
_redis_context?=?NULL;
}
returntrue;
}
boolCRedisSubscriber::subscribe(conststd::string?&channel_name)
{
intret?=?redisAsyncCommand(_redis_context,
&CRedisSubscriber::command_callback,this,"SUBSCRIBE?%s",
channel_name.c_str());
if(REDIS_ERR?==?ret)
{
printf("Subscribe?command?failed:?%d\n",?ret);
returnfalse;
}
printf(":?Subscribe?success:?%s\n",?channel_name.c_str());
returntrue;
}
voidCRedisSubscriber::connect_callback(constredisAsyncContext?*redis_context,
intstatus)
{
if(status?!=?REDIS_OK)
{
printf(":?Error:?%s\n",?redis_context->errstr);
}
else
{
printf(":?Redis?connected!");
}
}
voidCRedisSubscriber::disconnect_callback(
constredisAsyncContext?*redis_context,intstatus)
{
if(status?!=?REDIS_OK)
{
//?這里異常退出拇舀,可以嘗試重連
printf(":?Error:?%s\n",?redis_context->errstr);
}
}
//?消息接收回調(diào)函數(shù)
voidCRedisSubscriber::command_callback(redisAsyncContext?*redis_context,
void*reply,void*privdata)
{
if(NULL?==?reply?||?NULL?==?privdata)?{
return;
}
//?靜態(tài)函數(shù)中逻族,要使用類的成員變量,把當(dāng)前的this指針傳進(jìn)來骄崩,用this指針間接訪問
CRedisSubscriber?*self_this?=reinterpret_cast(privdata);
redisReply?*redis_reply?=reinterpret_cast(reply);
//?訂閱接收到的消息是一個(gè)帶三元素的數(shù)組
if(redis_reply->type?==?REDIS_REPLY_ARRAY?&&
redis_reply->elements?==?3)
{
printf(":?Recieve?message:%s:%d:%s:%d:%s:%d\n",
redis_reply->element[0]->str,?redis_reply->element[0]->len,
redis_reply->element[1]->str,?redis_reply->element[1]->len,
redis_reply->element[2]->str,?redis_reply->element[2]->len);
//?調(diào)用函數(shù)對(duì)象把消息通知給外層
self_this->_notify_message_fn(redis_reply->element[1]->str,
redis_reply->element[2]->str,?redis_reply->element[2]->len);
}
}
void*CRedisSubscriber::event_thread(void*data)
{
if(NULL?==?data)
{
printf(":?Error!\n");
assert(false);
returnNULL;
}
CRedisSubscriber?*self_this?=reinterpret_cast(data);
returnself_this->event_proc();
}
void*CRedisSubscriber::event_proc()
{
sem_wait(&_event_sem);
//?開啟事件分發(fā)聘鳞,event_base_dispatch會(huì)阻塞
event_base_dispatch(_event_base);
returnNULL;
}
問題1:hiredis官網(wǎng)沒有異步接口的實(shí)現(xiàn)例子。
hiredis提供了幾個(gè)異步通信的API要拂,一開始根據(jù)API名字的理解抠璃,我們實(shí)現(xiàn)了跟redis服務(wù)器建立連接、訂閱和發(fā)布的功能脱惰,可在實(shí)際使用的時(shí)候搏嗡,程序并沒有像我們預(yù)想的那樣,除了能夠建立連接外枪芒,任何事情都沒發(fā)生彻况。
網(wǎng)上查了很多資料谁尸,原來hiredis的異步實(shí)現(xiàn)是通過事件來分發(fā)redis發(fā)送過來的消息的舅踪,hiredis可以使用libae、libev良蛮、libuv和libevent中的任何一個(gè)實(shí)現(xiàn)事件的分發(fā)抽碌,網(wǎng)上的資料提示使用libae、libev和libuv可能發(fā)生其他問題决瞳,這里為了方便就選用libevent货徙。hireds官網(wǎng)并沒有對(duì)libevent做任何介紹,也沒用說明使用異步機(jī)制需要引入事件的接口皮胡,所以一開始走了很多彎路痴颊。
關(guān)于libevent的使用這里就不再贅述,詳情可以見libevent官網(wǎng)屡贺。
libevent官網(wǎng):http://libevent.org/
libevent api文檔:https://www.monkey.org/~provos/libevent/doxygen-2.0.1/include_2event2_2event_8h.html#6e9827de8c3014417b11b48f2fe688ae
CRedisPublisher和CRedisSubscriber的初始化過程:
初始化事件處理蠢棱,并獲得事件處理的實(shí)例:
[cpp]view plaincopy
_event_base?=?event_base_new();
在獲得redisAsyncContext *之后,調(diào)用
[cpp]view plaincopy
redisLibeventAttach(_redis_context,?_event_base);
這樣就將事件處理和redis關(guān)聯(lián)起來甩栈,最后在另一個(gè)線程調(diào)用
[cpp]view plaincopy
event_base_dispatch(_event_base);
啟動(dòng)事件的分發(fā)泻仙,這是一個(gè)阻塞函數(shù),因此量没,創(chuàng)建了一個(gè)新的線程處理事件分發(fā)玉转,值得注意的是,這里用信號(hào)燈_event_sem控制線程的啟動(dòng)殴蹄,意在程序調(diào)用
[cpp]view plaincopy
redisAsyncSetConnectCallback(_redis_context,
&CRedisSubscriber::connect_callback);
redisAsyncSetDisconnectCallback(_redis_context,
&CRedisSubscriber::disconnect_callback);
之后究抓,能夠完全捕捉到這兩個(gè)回調(diào)猾担。
問題2 奇特的‘ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context’錯(cuò)誤
有些人會(huì)覺得這兩個(gè)類設(shè)計(jì)有點(diǎn)冗余,我們發(fā)現(xiàn)CRedisPublisher和CRedisSubscriber很多邏輯是一樣的刺下,為什么不把他們整合到一起成一個(gè)類垒探,既能夠發(fā)布消息也能夠訂閱消息。其實(shí)一開始我就是這么干的怠李,在使用的時(shí)候發(fā)現(xiàn)圾叼,用同個(gè)redisAsynContex *對(duì)象進(jìn)行消息訂閱和發(fā)布,與redis服務(wù)連接會(huì)自動(dòng)斷開捺癞,disconnect_callback回調(diào)會(huì)被調(diào)用夷蚊,并且返回奇怪的錯(cuò)誤:ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context,因此髓介,不能使用同個(gè)redisAsyncContext *對(duì)象實(shí)現(xiàn)發(fā)布和訂閱惕鼓。這里為了減少設(shè)計(jì)的復(fù)雜性,就將兩個(gè)類的邏輯分開了唐础。
當(dāng)然箱歧,你也可以將相同的邏輯抽象到一個(gè)基類里,并實(shí)現(xiàn)publish和subscribe接口一膨。
編譯之前呀邢,需要安裝hiredis、libevent和boost庫豹绪,我是用的是Ubuntu x64系統(tǒng)价淌。
hiredis官網(wǎng):https://github.com/redis/hiredis
下載源碼解壓,進(jìn)入解壓目錄瞒津,執(zhí)行make && make install命令蝉衣。
libevent官網(wǎng):http://libevent.org/下載最新的穩(wěn)定版
解壓后進(jìn)入解壓目錄,執(zhí)行命令
./configure -prefix=/usr
sudo make && make install
boost庫:直接執(zhí)行安裝:sudo apt-get install libboost-dev
如果你不是用std::tr1::function的函數(shù)對(duì)象來給外層通知消息巷蚪,就不需要boost庫病毡。你可以用接口的形式實(shí)現(xiàn)回調(diào),把接口傳給CRedisSubscribe類屁柏,讓它在接收到消息后調(diào)用接口回調(diào)啦膜,通知外層。
最后貼出例子代碼前联。
publisher.cpp功戚,實(shí)現(xiàn)發(fā)布消息:
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?publisher.cpp
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?12:13:24?PM?CST
************************************************************************/
#include?"redis_publisher.h"
intmain(intargc,char*argv[])
{
CRedisPublisher?publisher;
boolret?=?publisher.init();
if(!ret)
{
printf("Init?failed.\n");
return0;
}
ret?=?publisher.connect();
if(!ret)
{
printf("connect?failed.");
return0;
}
while(true)
{
publisher.publish("test-channel","Test?message");
sleep(1);
}
publisher.disconnect();
publisher.uninit();
return0;
}
subscriber.cpp實(shí)現(xiàn)訂閱消息:
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?subscriber.cpp
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?12:26:42?PM?CST
************************************************************************/
#include?"redis_subscriber.h"
voidrecieve_message(constchar*channel_name,
constchar*message,intlen)
{
printf("Recieve?message:\n????channel?name:?%s\n????message:?%s\n",
channel_name,?message);
}
intmain(intargc,char*argv[])
{
CRedisSubscriber?subscriber;
CRedisSubscriber::NotifyMessageFn?fn?=
bind(recieve_message,?std::tr1::placeholders::_1,
std::tr1::placeholders::_2,?std::tr1::placeholders::_3);
boolret?=?subscriber.init(fn);
if(!ret)
{
printf("Init?failed.\n");
return0;
}
ret?=?subscriber.connect();
if(!ret)
{
printf("Connect?failed.\n");
return0;
}
subscriber.subscribe("test-channel");
while(true)
{
sleep(1);
}
subscriber.disconnect();
subscriber.uninit();
return0;
}
關(guān)于編譯的問題:在g++中編譯,注意要加上-lhiredis -levent參數(shù)似嗤,下面是一個(gè)簡單的Makefile:
[cpp]view plaincopy
EXE=server_main?client_main
CC=g++
FLAG=-lhiredis?-levent
OBJ=redis_publisher.o?publisher.o?redis_subscriber.o?subscriber.o
all:$(EXE)
$(EXE):$(OBJ)
$(CC)?-o?publisher?redis_publisher.o?publisher.o?$(FLAG)
$(CC)?-o?subscriber?redis_subscriber.o?subscriber.o?$(FLAG)
redis_publisher.o:redis_publisher.h
redis_subscriber.o:redis_subscriber.h
publisher.o:publisher.cpp
$(CC)?-c?publisher.cpp
subscriber.o:subscriber.cpp
$(CC)?-c?subscriber.cpp
clean:
rm?publisher?subscriber?*.o
致謝:
redis異步API使用libevent:http://www.tuicool.com/articles/N73uuu