linux下使用hiredis异步API实现subpub消息订阅和发布的功能.docx
- 文档编号:659483
- 上传时间:2023-04-29
- 格式:DOCX
- 页数:9
- 大小:18.60KB
linux下使用hiredis异步API实现subpub消息订阅和发布的功能.docx
《linux下使用hiredis异步API实现subpub消息订阅和发布的功能.docx》由会员分享,可在线阅读,更多相关《linux下使用hiredis异步API实现subpub消息订阅和发布的功能.docx(9页珍藏版)》请在冰点文库上搜索。
linux下使用hiredis异步API实现subpub消息订阅和发布的功能
linux下使用hiredis异步API实现subpub消息订阅和发布的功能
最近使用redis的c接口——hiredis,使客户端与redis服务器通信,实现消息订阅和发布(PUB/SUB)的功能,我把遇到的一些问题和解决方法列出来供大家学习。
redis_publisher.h
[cpp]viewplaincopy/*************************************************************************>FileName:
redis_publisher.h>CreatedTime:
Sat23Apr201610:
15:
09PMCST>Description:
封装hiredis,实现消息发布给redis功能************************************************************************/#ifndefREDIS_PUBLISHER_H#defineREDIS_PUBLISHER_H#include#include#include#include#include#include#include#include#includeclassCRedisPublisher{public:
CRedisPublisher();~CRedisPublisher();boolinit();booluninit();boolconnect();booldisconnect();boolpublish(conststd:
:
string&channel_name,conststd:
:
string&message);private:
//下面三个回调函数供redis服务调用//连接回调staticvoidconnect_callback(constredisAsyncContext*redis_context,intstatus);//断开连接的回调staticvoiddisconnect_callback(constredisAsyncContext*redis_context,intstatus);//执行命令回调staticvoidcommand_callback(redisAsyncContext*redis_context,void*reply,void*privdata);//事件分发线程函数staticvoid*event_thread(void*data);void*event_proc();private:
//libevent事件对象event_base*_event_base;//事件线程IDpthread_t_event_thread;//事件线程的信号量sem_t_event_sem;//hiredis异步对象redisAsyncContext*_redis_context;};#endif
redis_publisher.cpp
[cpp]viewplaincopy/*************************************************************************>FileName:
redis_publisher.cpp>CreatedTime:
Sat23Apr201610:
15:
09PMCST>Description:
************************************************************************/#include#include#include#include"redis_publisher.h"CRedisPublisher:
:
CRedisPublisher():
_event_base(0),_event_thread(0),_redis_context(0){}CRedisPublisher:
:
~CRedisPublisher(){}boolCRedisPublisher:
:
init(){//initializetheevent_event_base=event_base_new();//创建libevent对象if(NULL==_event_base){printf(":
Createrediseventfailed.\n");returnfalse;}memset(&_event_sem,0,sizeof(_event_sem));intret=sem_init(&_event_sem,0,0);if(ret!
=0){printf(":
Initsemfailed.\n");returnfalse;}returntrue;}boolCRedisPublisher:
:
uninit(){_event_base=NULL;sem_destroy(&_event_sem);returntrue;}boolCRedisPublisher:
:
connect(){//connectredis_redis_context=redisAsyncConnect("127.0.0.1",6379);//异步连接到redis服务器上,使用默认端口if(NULL==_redis_context){printf(":
Connectredisfailed.\n");returnfalse;}if(_redis_context->err){printf(":
Connectrediserror:
%d,%s\n",_redis_context->err,_redis_context->errstr);//输出错误信息returnfalse;}//attachtheeventredisLibeventAttach(_redis_context,_event_base);//将事件绑定到rediscontext上,使设置给redis的回调跟事件关联//创建事件处理线程intret=pthread_create(&_event_thread,0,&CRedisPublisher:
:
event_thread,this);if(ret!
=0){printf(":
createeventthreadfailed.\n");disconnect();returnfalse;}//设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态redisAsyncSetConnectCallback(_redis_context,&CRedisPublisher:
:
connect_callback);//设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连redisAsyncSetDisconnectCallback(_redis_context,&CRedisPublisher:
:
disconnect_callback);//启动事件线程sem_post(&_event_sem);returntrue;}boolCRedisPublisher:
:
disconnect(){if(_redis_context){redisAsyncDisconnect(_redis_context);redisAsyncFree(_redis_context);_redis_context=NULL;}returntrue;}boolCRedisPublisher:
:
publish(conststd:
:
string&channel_name,conststd:
:
string&message){intret=redisAsyncCommand(_redis_context,&CRedisPublisher:
:
command_callback,this,"PUBLISH%s%s",channel_name.c_str(),message.c_str());if(REDIS_ERR==ret){printf("Publishcommandfailed:
%d\n",ret);returnfalse;}returntrue;}voidCRedisPublisher:
:
connect_callback(constredisAsyncContext*redis_context,intstatus){if(status!
=REDIS_OK){printf(":
Error:
%s\n",redis_context->errstr);}else{printf(":
Redisconnected!
\n");}}voidCRedisPublisher:
:
disconnect_callback(constredisAsyncContext*redis_context,intstatus){if(status!
=REDIS_OK){//这里异常退出,可以尝试重连printf(":
Error:
%s\n",redis_context->errstr);}}//消息接收回调函数voidCRedisPublisher:
:
command_callback(redisAsyncContext*redis_context,void*reply,void*privdata){printf("commandcallback.\n");//这里不执行任何操作}void*CRedisPublisher:
:
event_thread(void*data){if(NULL==data){printf(":
Error!
\n");assert(false);returnNULL;}CRedisPublisher*self_this=reinterpret_cast(data);returnself_this->event_proc();}void*CRedisPublisher:
:
event_proc(){sem_wait(&_event_sem);//开启事件分发,event_base_dispatch会阻塞event_base_dispatch(_event_base);returnNULL;}redis_subscriber.h
[cpp]viewplaincopy/*************************************************************************>FileName:
redis_subscriber.h>CreatedTime:
Sat23Apr201610:
15:
09PMCST>Description:
封装hiredis,实现消息订阅redis功能************************************************************************/#ifndefREDIS_SUBSCRIBER_H#defineREDIS_SUBSCRIBER_H#include#include#include#include#include#include#include#include#includeclassCRedisSubscriber{public:
typedefstd:
:
tr1:
:
function\NotifyMessageFn;//回调函数对象类型,当接收到消息后调用回调把消息发送出去CRedisSubscriber();~CRedisSubscriber();boolinit(constNotifyMessageFn&fn);//传入回调对象booluninit();boolconnect();booldisconnect();//可以多次调用,订阅多个频道boolsubscribe(conststd:
:
string&channel_name);private:
//下面三个回调函数供redis服务调用//连接回调staticvoidconnect_callback(constredisAsyncContext*redis_context,intstatus);//断开连接的回调staticvoiddisconnect_callback(constredisAsyncContext*redis_context,intstatus);//执行命令回调staticvoidcommand_callback(redisAsyncContext*redis_context,void*reply,void*privdata);//事件分发线程函数staticvoid*event_thread(void*data);void*event_proc();private:
//libevent事件对象event_base*_event_base;//事件线程IDpthread_t_event_thread;//事件线程的信号量sem_t_event_sem;//hiredis异步对象redisAsyncContext*_redis_context;//通知外层的回调函数对象NotifyMessageFn_notify_message_fn;};#endifredis_subscriber.cpp:
/*************************************************************************>FileName:
redis_subscriber.cpp>CreatedTime:
Sat23Apr201610:
15:
09PMCST>Description:
************************************************************************/#include#include#include#include"redis_subscriber.h"CRedisSubscriber:
:
CRedisSubscriber():
_event_base(0),_event_thread(0),_redis_context(0){}CRedisSubscriber:
:
~CRedisSubscriber(){}boolCRedisSubscriber:
:
init(constNotifyMessageFn&fn){//initializetheevent_notify_message_fn=fn;_event_base=event_base_new();//创建libevent对象if(NULL==_event_base){printf(":
Createrediseventfailed.\n");returnfalse;}memset(&_event_sem,0,sizeof(_event_sem));intret=sem_init(&_event_sem,0,0);if(ret!
=0){printf(":
Initsemfailed.\n");returnfalse;}returntrue;}boolCRedisSubscriber:
:
uninit(){_event_base=NULL;sem_destroy(&_event_sem);returntrue;}boolCRedisSubscriber:
:
connect(){//connectredis_redis_context=redisAsyncConnect("127.0.0.1",6379);//异步连接到redis服务器上,使用默认端口if(NULL==_redis_context){printf(":
Connectredisfailed.\n");returnfalse;}if(_redis_context->err){printf(":
Connectrediserror:
%d,%s\n",_redis_context->err,_redis_context->errstr);//输出错误信息returnfalse;}//attachtheeventredisLibeventAttach(_redis_context,_event_base);//将事件绑定到rediscontext上,使设置给redis的回调跟事件关联//创建事件处理线程intret=pthread_create(&_event_thread,0,&CRedisSubscriber:
:
event_thread,this);if(ret!
=0){printf(":
createeventthreadfailed.\n");disconnect();returnfalse;}//设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态redisAsyncSetConnectCallback(_redis_context,&CRedisSubscriber:
:
connect_callback);//设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连redisAsyncSetDisconnectCallback(_redis_context,&CRedisSubscriber:
:
disconnect_callback);//启动事件线程sem_post(&_event_sem);returntrue;}boolCRedisSubscriber:
:
disconnect(){if(_redis_context){redisAsyncDisconnect(_redis_context);redisAsyncFree(_redis_context);_redis_context=NULL;}returntrue;}
boolCRedisSubscriber:
:
subscribe(conststd:
:
string&channel_name){intret=redisAsyncCommand(_redis_context,&CRedisSubscriber:
:
command_callback,this,"SUBSCRIBE%s",channel_name.c_str());if(REDIS_ERR==ret){printf("Subscribecommandfailed:
%d\n",ret);returnfalse;}printf(":
Subscribesuccess:
%s\n",channel_name.c_str());returntrue;}voidCRedisSubscriber:
:
connect_callback(constredisAsyncContext*redis_context,intstatus){if(status!
=REDIS_OK){printf(":
Error:
%s\n",redis_context->errstr);}else{printf(":
Redisconnected!
");}}voidCRedisSubscriber:
:
disconnect_callback(constredisAsyncContext*redis_context,intstatus){if(status!
=REDIS_OK){//这里异常退出,可以尝试重连printf(":
Error:
%s\n",redis_context->errstr);}}//消息接收回调函数voidCRedisSubscriber:
:
command_callback(redisAsyncContext*redis_context,void*reply,void*privdata){if(NULL==reply||NULL==privdata){return;}//静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问CRedisSubscriber*self_this=reinterpret_cast(privdata);redisReply*redis_reply=reinterpret_cast(reply);//订阅接收到的消息是一个带三元素的数组if(redis_reply->type==REDIS_REPLY_ARRAY&&redis_reply->elements==3){printf(":
Recievemessage:
%s:
%d:
%s:
%d:
%s:
%d\n",redis_reply->element[0]->str,redis_reply->element[0]->len,redis_reply->element[1]->str,redis_reply->element[1]->len,redis_reply->element[2]->str,redis_reply->element[2]->len);//调用函数对象把消息通知给外层self_this->_notify_message_fn(redis_reply->element[1]->str,redis_reply->element[2]->str,redis_reply->element[2]->len);}}void*CRedisSubscriber:
:
event_thread
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- linux 使用 hiredis 异步 API 实现 subpub 消息 订阅 发布 功能