RabbitmqClient.h
#pragma once
#include <string>
#include "amqp_tcp_socket.h"
using std::string;
//using std::vector;
class CRabbitmqClient {
public:
CRabbitmqClient();
~CRabbitmqClient();
int Connect(const string &strHostname, int iPort, const string &strUser, const string &strPasswd);
int Disconnect();
/**
* @brief ExchangeDeclare 聲明exchange
* @param [in] strExchange
* @param [in] strType
* @return 等于0值代表成功創(chuàng)建exchange烦绳,小于0代表錯(cuò)誤
*/
int ExchangeDeclare(const string &strExchange, const string &strType);
/**
* @brief QueueDeclare 聲明消息隊(duì)列
* @param [in] strQueueName 消息隊(duì)列實(shí)例
* @param
* @return 等于0值代表成功創(chuàng)建queue恒削,小于0代表錯(cuò)誤
*/
int QueueDeclare(const string &strQueueName);
/**
* @brief QueueBind 將隊(duì)列歉眷,交換機(jī)和綁定規(guī)則綁定起來(lái)形成一個(gè)路由表
* @param [in] strQueueName 消息隊(duì)列
* @param [in] strExchange 交換機(jī)名稱(chēng)
* @param [in] strBindKey 路由名稱(chēng) “msg.#” “msg.weather.**”
* @return 等于0值代表成功綁定,小于0代表錯(cuò)誤
*/
int QueueBind(const string &strQueueName, const string &strExchange, const string &strBindKey);
/**
* @brief QueueUnbind 將隊(duì)列戴陡,交換機(jī)和綁定規(guī)則綁定解除
* @param [in] strQueueName 消息隊(duì)列
* @param [in] strExchange 交換機(jī)名稱(chēng)
* @param [in] strBindKey 路由名稱(chēng) “msg.#” “msg.weather.**”
* @return 等于0值代表成功綁定褂萧,小于0代表錯(cuò)誤
*/
int QueueUnbind(const string &strQueueName, const string &strExchange, const string &strBindKey);
/**
* @brief QueueDelete 刪除消息隊(duì)列。
* @param [in] strQueueName 消息隊(duì)列名稱(chēng)
* @param [in] iIfUnused 消息隊(duì)列是否在用嫂冻,1 則論是否在用都刪除
* @return 等于0值代表成功刪除queue胶征,小于0代表錯(cuò)誤
*/
int QueueDelete(const string &strQueueName, int iIfUnused);
/**
* @brief Publish 發(fā)布消息
* @param [in] strMessage 消息實(shí)體
* @param [in] strExchange 交換器
* @param [in] strRoutekey 路由規(guī)則
* 1.Direct Exchange – 處理路由鍵。需要將一個(gè)隊(duì)列綁定到交換機(jī)上桨仿,要求該消息與一個(gè)特定的路由鍵完全匹配睛低。
* 2.Fanout Exchange – 不處理路由鍵。將隊(duì)列綁定到交換機(jī)上服傍。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上钱雷。
* 3.Topic Exchange – 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上吹零。符號(hào)“#”匹配一個(gè)或多個(gè)詞罩抗,符號(hào)“*”匹配不多不少一個(gè)詞。
* 因此“audit.#”能夠匹配到“audit.irs.corporate”灿椅,但是“audit.*” 只會(huì)匹配到“audit.irs”
* @return 等于0值代表成功發(fā)送消息實(shí)體套蒂,小于0代表發(fā)送錯(cuò)誤
*/
int Publish(const string &strMessage, const string &strExchange, const string &strRoutekey);
/**
* @brief consumer 消費(fèi)消息
* @param [in] strQueueName 隊(duì)列名稱(chēng)
* @param [out] message_array 獲取的消息實(shí)體
* @param [int] GetNum 需要取得的消息個(gè)數(shù)
* @param [int] timeout 取得的消息是延遲,若為NULL茫蛹,表示持續(xù)取操刀,無(wú)延遲,阻塞狀態(tài)
* @return 等于0值代表成功婴洼,小于0代表錯(cuò)誤骨坑,錯(cuò)誤信息從ErrorReturn返回
*/
int Consumer(const string &strQueueName, string &message_array, int GetNum = 1, struct timeval *timeout = NULL);
private:
CRabbitmqClient(const CRabbitmqClient & rh);
void operator=(const CRabbitmqClient & rh);
int ErrorMsg(amqp_rpc_reply_t x, char const *context);
string m_strHostname; // amqp主機(jī)
int m_iPort; // amqp端口
string m_strUser;
string m_strPasswd;
int m_iChannel;
amqp_socket_t *m_pSock;
amqp_connection_state_t m_pConn;
};
RabbitmqClient.cpp
#include "RabbitmqClient.h"
#include "RabbitmqClient.h"
#include <io.h>
#include <process.h>
CRabbitmqClient::CRabbitmqClient()
: m_strHostname("")
, m_iPort(0)
, m_strUser("")
, m_strPasswd("")
, m_iChannel(1) //默認(rèn)用1號(hào)通道,通道無(wú)所謂
, m_pSock(NULL)
, m_pConn(NULL) {
}
CRabbitmqClient::~CRabbitmqClient() {
if (NULL != m_pConn) {
Disconnect();
m_pConn = NULL;
}
}
int CRabbitmqClient::Connect(const string &strHostname, int iPort, const string &strUser, const string &strPasswd) {
m_strHostname = strHostname;
m_iPort = iPort;
m_strUser = strUser;
m_strPasswd = strPasswd;
m_pConn = amqp_new_connection(); //返回連接狀態(tài)的指針
if (NULL == m_pConn) {
fprintf(stderr, "amqp new connection failed\n");
return -1;
}
m_pSock = amqp_tcp_socket_new(m_pConn); //創(chuàng)建一個(gè)tcp套接字
if (NULL == m_pSock) {
fprintf(stderr, "amqp tcp new socket failed\n");
return -2;
}
int status = amqp_socket_open(m_pSock, m_strHostname.c_str(), m_iPort); //綁定ip和端口打開(kāi)一個(gè)連接
if (status<0) {
fprintf(stderr, "amqp socket open failed\n");
return -3;
}
// amqp_login(amqp_connection_state_t state,char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method, ..)
if (0 != ErrorMsg(amqp_login(m_pConn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, m_strUser.c_str(), m_strPasswd.c_str()), "Logging in")) //用戶(hù)名和密碼登錄mq
{
return -4;
}
return 0;
}
int CRabbitmqClient::Disconnect() {
if (NULL != m_pConn) {
if (0 != ErrorMsg(amqp_connection_close(m_pConn, AMQP_REPLY_SUCCESS), "Closing connection"))
return -1;
if (amqp_destroy_connection(m_pConn) < 0)
return -2;
m_pConn = NULL;
}
return 0;
}
int CRabbitmqClient::ExchangeDeclare(const string &strExchange, const string &strType) {
amqp_channel_open(m_pConn, m_iChannel);
amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str());
amqp_bytes_t _type = amqp_cstring_bytes(strType.c_str());
int _passive = 0;
int _durable = 0; // 交換機(jī)是否持久化
amqp_exchange_declare(m_pConn, m_iChannel, _exchange, _type, _passive, _durable, 0, 0, amqp_empty_table);
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "exchange_declare")) {
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -1;
}
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return 0;
}
int CRabbitmqClient::QueueDeclare(const string &strQueueName) {
if (NULL == m_pConn) {
fprintf(stderr, "QueueDeclare m_pConn is null\n");
return -1;
}
amqp_channel_open(m_pConn, m_iChannel);
amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
int32_t _passive = 0;
int32_t _durable = 0;
int32_t _exclusive = 0;
int32_t _auto_delete = 1;
amqp_queue_declare(m_pConn, m_iChannel, _queue, _passive, _durable, _exclusive, _auto_delete, amqp_empty_table);
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_declare")) {
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -1;
}
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return 0;
}
int CRabbitmqClient::QueueBind(const string &strQueueName, const string &strExchange, const string &strBindKey) {
if (NULL == m_pConn) {
fprintf(stderr, "QueueBind m_pConn is null\n");
return -1;
}
amqp_channel_open(m_pConn, m_iChannel);
amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str());
amqp_bytes_t _routkey = amqp_cstring_bytes(strBindKey.c_str());
amqp_queue_bind(m_pConn, m_iChannel, _queue, _exchange, _routkey, amqp_empty_table);
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_bind")) {
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -1;
}
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return 0;
}
int CRabbitmqClient::QueueUnbind(const string &strQueueName, const string &strExchange, const string &strBindKey) {
if (NULL == m_pConn) {
fprintf(stderr, "QueueUnbind m_pConn is null\n");
return -1;
}
amqp_channel_open(m_pConn, m_iChannel);
amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str());
amqp_bytes_t _routkey = amqp_cstring_bytes(strBindKey.c_str());
amqp_queue_unbind(m_pConn, m_iChannel, _queue, _exchange, _routkey, amqp_empty_table);
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_unbind")) {
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -1;
}
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return 0;
}
int CRabbitmqClient::QueueDelete(const string &strQueueName, int iIfUnused) {
if (NULL == m_pConn) {
fprintf(stderr, "QueueDelete m_pConn is null\n");
return -1;
}
amqp_channel_open(m_pConn, m_iChannel);
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) {
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -2;
}
amqp_queue_delete(m_pConn, m_iChannel, amqp_cstring_bytes(strQueueName.c_str()), iIfUnused, 0);
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "delete queue")) {
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -3;
}
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return 0;
}
int CRabbitmqClient::Publish(const string &strMessage, const string &strExchange, const string &strRoutekey) {
if (NULL == m_pConn) {
fprintf(stderr, "publish m_pConn is null, publish failed\n");
return -1;
}
amqp_channel_open(m_pConn, m_iChannel);
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) {
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -2;
}
amqp_bytes_t message_bytes;
message_bytes.len = strMessage.length();
message_bytes.bytes = (void *)(strMessage.c_str());
//fprintf(stderr, "publish message(%d): %.*s\n", (int)message_bytes.len, (int)message_bytes.len, (char *)message_bytes.bytes);
/*
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes(m_type.c_str());
props.delivery_mode = m_durable; // persistent delivery mode
*/
amqp_bytes_t exchange = amqp_cstring_bytes(strExchange.c_str());
amqp_bytes_t routekey = amqp_cstring_bytes(strRoutekey.c_str());
//if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, &props, message_bytes)) {
if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, NULL, message_bytes)) {
fprintf(stderr, "publish amqp_basic_publish failed\n");
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "amqp_basic_publish")) {
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -3;
}
}
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return 0;
}
bool bhave = false;
int CRabbitmqClient::Consumer(const string &strQueueName, string &message_array, int GetNum, struct timeval *timeout) {
if (NULL == m_pConn)
{
fprintf(stderr, "Consumer m_pConn is null, Consumer failed\n");
return -1;
}
amqp_channel_open(m_pConn, m_iChannel); //打開(kāi)信道
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) //返回打開(kāi)信道的結(jié)果
{
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -2;
}
amqp_basic_qos(m_pConn, m_iChannel, 0, GetNum, 0);
int ack = 1; // no_ack 是否需要確認(rèn)消息后再?gòu)年?duì)列中刪除消息
amqp_bytes_t queuename = amqp_cstring_bytes(strQueueName.c_str()); //將c字符串轉(zhuǎn)為amap_bytes_t類(lèi)型的字符串
amqp_basic_consume(m_pConn, m_iChannel, queuename, amqp_empty_bytes, 0, ack, 0, amqp_empty_table);
if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "Consuming")) {
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -3;
}
int hasget = 0;
while (1)
{
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(m_pConn);
res = amqp_consume_message(m_pConn, &envelope, timeout, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type)
{
/*fprintf(stderr, "Consumer amqp_channel_close failed\n");
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
if (0 == hasget)
return -res.reply_type;
else
return 0;*/
continue;
}
string str((char *)envelope.message.body.bytes, (char *)envelope.message.body.bytes + envelope.message.body.len);
message_array = str;
//int rtn = amqp_basic_ack(m_pConn, m_iChannel, envelope.delivery_tag, 1);
amqp_destroy_envelope(&envelope);
/*if (rtn != 0)
{
amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
return -4;
}*/
bhave = true;
//GetNum--;
//hasget++;
//usleep(1);
}
return 0;
}
int CRabbitmqClient::ErrorMsg(amqp_rpc_reply_t x, char const *context) {
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return 0;
case AMQP_RESPONSE_NONE:
fprintf(stderr, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *)x.reply.decoded;
fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded;
fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
break;
}
default:
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n",
context, x.reply.id);
break;
}
break;
}
return -1;
}
main.cpp
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
//#include <unistd.h>
#include <iostream>
#include "RabbitmqClient.h"
#include <thread>
using namespace std;
std::string vecRecvMsg;
extern bool bhave;
void RecvData(CRabbitmqClient* client)
{
std::string strQueuename = "4.3keti";
int iRet = client->Consumer(strQueuename, vecRecvMsg, 1);
printf("Rabbitmq Consumer Ret: %d\n", iRet);
client->Disconnect();
}
void CoutData()
{
while (1)
{
if (bhave == true)
{
printf("Consumer: %s\n", vecRecvMsg.c_str());
bhave = false;
}
}
}
int main()
{
CRabbitmqClient objRabbitmq;
std::string strIP = "192.168.10.203";
int iPort = 5672;
std::string strUser = "rabbitadmin";
std::string strPasswd = "123456";
int iRet = objRabbitmq.Connect(strIP, iPort, strUser, strPasswd);
printf("Rabbitmq Connect Ret: %d\n", iRet);
std::string strExchange = "ExchangeTest";
std::string strRoutekey = "routekeyTest";
//// 可選操作 Declare Exchange
//iRet = objRabbitmq.ExchangeDeclare(strExchange, "direct");
//printf("Rabbitmq ExchangeDeclare Ret: %d\n", iRet);
//// 可選操作(接收) Declare Queue
//iRet = objRabbitmq.QueueDeclare(strQueuename);
//printf("Rabbitmq QueueDeclare Ret: %d\n", iRet);
//// 可選操作(接收) Queue Bind
//iRet = objRabbitmq.QueueBind(strQueuename, strExchange, strRoutekey);
//printf("Rabbitmq QueueBind Ret: %d\n", iRet);
// Send Msg
//std::string strSendMsg1 = "rabbitmq send test msg1";
//std::string strSendMsg2 = "rabbitmq send test msg2";
//iRet = objRabbitmq.Publish(strSendMsg1, strExchange, strRoutekey);
//printf("Rabbitmq Publish 1 Ret: %d\n", iRet);
//iRet = objRabbitmq.Publish(strSendMsg2, strExchange, strRoutekey);
//printf("Rabbitmq Publish 2 Ret: %d\n", iRet);
// Recv Msg
thread t1(RecvData, &objRabbitmq);
t1.detach();
thread t2(CoutData);
t2.detach();
while(1)
{
}
return 0;
}