linux下使用hiredis异步API实现subpub消息订阅和发布的功能Word文档格式.docx
- 文档编号:877180
- 上传时间:2023-04-29
- 格式:DOCX
- 页数:9
- 大小:18.60KB
linux下使用hiredis异步API实现subpub消息订阅和发布的功能Word文档格式.docx
《linux下使用hiredis异步API实现subpub消息订阅和发布的功能Word文档格式.docx》由会员分享,可在线阅读,更多相关《linux下使用hiredis异步API实现subpub消息订阅和发布的功能Word文档格式.docx(9页珍藏版)》请在冰点文库上搜索。
booldisconnect();
boolpublish(conststd:
:
string&
amp;
channel_name,conststd:
message);
private:
//下面三个回调函数供redis服务调用//连接回调staticvoidconnect_callback(constredisAsyncContext*redis_context,intstatus);
//断开连接的回调staticvoiddisconnect_callback(constredisAsyncContext*redis_context,intstatus);
//执行命令回调staticvoidcommand_callback(redisAsyncContext*redis_context,void*reply,void*privdata);
//事件分发线程函数staticvoid*event_thread(void*data);
void*event_proc();
//libevent事件对象event_base*_event_base;
//事件线程IDpthread_t_event_thread;
//事件线程的信号量sem_t_event_sem;
//hiredis异步对象redisAsyncContext*_redis_context;
};
#endif
redis_publisher.cpp
redis_publisher.cpp&
************************************************************************/#include#include#include#include"
redis_publisher.h"
CRedisPublisher:
CRedisPublisher():
_event_base(0),_event_thread(0),_redis_context(0){}CRedisPublisher:
~CRedisPublisher(){}boolCRedisPublisher:
init(){//initializetheevent_event_base=event_base_new();
//创建libevent对象if(NULL==_event_base){printf("
Createrediseventfailed.\n"
);
returnfalse;
}memset(&
_event_sem,0,sizeof(_event_sem));
intret=sem_init(&
_event_sem,0,0);
if(ret!
=0){printf("
Initsemfailed.\n"
}returntrue;
}boolCRedisPublisher:
uninit(){_event_base=NULL;
sem_destroy(&
_event_sem);
returntrue;
connect(){//connectredis_redis_context=redisAsyncConnect("
127.0.0.1"
6379);
//异步连接到redis服务器上,使用默认端口if(NULL==_redis_context){printf("
Connectredisfailed.\n"
}if(_redis_context-&
err){printf("
Connectrediserror:
%d,%s\n"
_redis_context-&
err,_redis_context-&
errstr);
//输出错误信息returnfalse;
}//attachtheeventredisLibeventAttach(_redis_context,_event_base);
//将事件绑定到rediscontext上,使设置给redis的回调跟事件关联//创建事件处理线程intret=pthread_create(&
_event_thread,0,&
CRedisPublisher:
event_thread,this);
createeventthreadfailed.\n"
disconnect();
}//设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态redisAsyncSetConnectCallback(_redis_context,&
connect_callback);
//设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连redisAsyncSetDisconnectCallback(_redis_context,&
disconnect_callback);
//启动事件线程sem_post(&
disconnect(){if(_redis_context){redisAsyncDisconnect(_redis_context);
redisAsyncFree(_redis_context);
_redis_context=NULL;
publish(conststd:
message){intret=redisAsyncCommand(_redis_context,&
command_callback,this,"
PUBLISH%s%s"
channel_name.c_str(),message.c_str());
if(REDIS_ERR==ret){printf("
Publishcommandfailed:
%d\n"
ret);
}voidCRedisPublisher:
connect_callback(constredisAsyncContext*redis_context,intstatus){if(status!
=REDIS_OK){printf("
Error:
%s\n"
redis_context-&
}else{printf("
Redisconnected!
\n"
}}voidCRedisPublisher:
disconnect_callback(constredisAsyncContext*redis_context,intstatus){if(status!
=REDIS_OK){//这里异常退出,可以尝试重连printf("
}}//消息接收回调函数voidCRedisPublisher:
command_callback(redisAsyncContext*redis_context,void*reply,void*privdata){printf("
commandcallback.\n"
//这里不执行任何操作}void*CRedisPublisher:
event_thread(void*data){if(NULL==data){printf("
Error!
assert(false);
returnNULL;
}CRedisPublisher*self_this=reinterpret_cast(data);
returnself_this-&
event_proc();
}void*CRedisPublisher:
event_proc(){sem_wait(&
//开启事件分发,event_base_dispatch会阻塞event_base_dispatch(_event_base);
}redis_subscriber.h
redis_subscriber.h&
封装hiredis,实现消息订阅redis功能************************************************************************/#ifndefREDIS_SUBSCRIBER_H#defineREDIS_SUBSCRIBER_H#include#include#include#include#include#include#include#include#includeclassCRedisSubscriber{public:
typedefstd:
tr1:
function\NotifyMessageFn;
//回调函数对象类型,当接收到消息后调用回调把消息发送出去CRedisSubscriber();
~CRedisSubscriber();
boolinit(constNotifyMessageFn&
fn);
//传入回调对象booluninit();
//可以多次调用,订阅多个频道boolsubscribe(conststd:
channel_name);
//通知外层的回调函数对象NotifyMessageFn_notify_message_fn;
#endifredis_subscriber.cpp:
/*************************************************************************&
redis_subscriber.cpp&
redis_subscriber.h"
CRedisSubscriber:
CRedisSubscriber():
_event_base(0),_event_thread(0),_redis_context(0){}CRedisSubscriber:
~CRedisSubscriber(){}boolCRedisSubscriber:
init(constNotifyMessageFn&
fn){//initializetheevent_notify_message_fn=fn;
_event_base=event_base_new();
}boolCRedisSubscriber:
CRedisSubscriber:
}
boolCRedisSubscriber:
subscribe(conststd:
channel_name){intret=redisAsyncCommand(_redis_context,&
SUBSCRIBE%s"
channel_name.c_str());
Subscribecommandfailed:
}printf("
Subscribesuccess:
}voidCRedisSubscriber:
"
}}voidCRedisSubscriber:
}}//消息接收回调函数voidCRedisSubscriber:
command_callback(redisAsyncContext*redis_context,void*reply,void*privdata){if(NULL==reply||NULL==privdata){return;
}//静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问CRedisSubscriber*self_this=reinterpret_cast(privdata);
redisReply*redis_reply=reinterpret_cast(reply);
//订阅接收到的消息是一个带三元素的数组if(redis_reply-&
type==REDIS_REPLY_ARRAY&
&
redis_reply-&
elements==3){printf("
Recievemessage:
%s:
%d:
%d\n"
redis_reply-&
element[0]-&
str,redis_reply-&
len,redis_reply-&
element[1]-&
element[2]-&
len);
//调用函数对象把消息通知给外层self_this-&
_notify_message_fn(redis_reply-&
}}void*CRedisSubscriber:
event_thread
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- linux 使用 hiredis 异步 API 实现 subpub 消息 订阅 发布 功能