织梦CMS - 轻松建站从此开始!

罗索

message queue的设计

落鹤生 发布于 2013-10-22 08:54 点击:次 
为了在各线程之间高效的传递消息,必须设计一种高效率的消息队列,传统的做法是mutex加queue,这种做法在每次执行push和pop时都要加锁,效率相对较低。其次还有使用循环队列,可以做到完全无锁,但只能实现1:1的消息传递。还有一些lock-free队列的实现,但基于其实现的相
TAG:

为了在各线程之间高效的传递消息,必须设计一种高效率的消息队列,传统的做法是mutex加queue,这种做法在每次执行push和pop时都要加锁,效率相对较低。其次还有使用循环队列,可以做到完全无锁,但只能实现1:1的消息传递。还有一些lock-free队列的实现,但基于其实现的相对复杂性,我不打算使用。

我的队列设计是使用tls维护一个local list,每个线程执行push时,首先将元素放入属于本线程的local list中,此时是无需加锁的,然后检查队列中元素的总数,如果发现总数超过一个阀值,则将local list中的所有元素一次性提交到share list中,此时需要加锁,share list中的元素是对全局可见的。

当读者执行pop操作时,首先从检查自己的local list中是否有元素,如果有就返回一个,如果没有则尝试从share list中将所有元素同步到自己的local list中.

local list和message queue的结构如下:

  1. struct per_thread_struct 
  2.     list_node   next; 
  3.     struct double_link_node block; 
  4.     struct link_list *local_q; 
  5.     condition_t cond; 
  6. }; 
  7.  
  8. struct mq 
  9.     uint32_t           push_size; 
  10.     pthread_key_t      t_key; 
  11.     mutex_t            mtx; 
  12.     struct double_link blocks; 
  13.     struct link_list  *share_list; 
  14.     struct link_list  *local_lists; 
  15.  
  16. }; 

对于push操作,提供了两个接口:

  1. void mq_push(mq_t,struct list_node*); 
  2. void mq_push_now(mq_t,struct list_node*); 

mq_push将元素插入local list但只有当local list中的元素到达一定阀值时才会执行提交操作mq_sync_push.

而mq_push_now将元素插入local list之后马上就会执行提交操作.

然后还有一个问题,如果local list中的元素较长时间内都达不到阀值,会导致消息传递的延时,所以提供了mq_force_sync函数,此函数的作用是

强制将执行一次提交操作,将local list中的所有元素提交到share list中去。producer线程可在其主循环内以固定的频率执行mq_force_sync,将一个

时间循环内剩余未被提交的消息提交出去.

下面贴下测试代码:

  1. #include <stdio.h> 
  2. #include <stdlib.h> 
  3. #include "KendyNet.h" 
  4. #include "thread.h" 
  5. #include "SocketWrapper.h" 
  6. #include "atomic.h" 
  7. #include "SysTime.h" 
  8. #include "mq.h" 
  9.  
  10. list_node *node_list1[5]; 
  11. list_node *node_list2[5]; 
  12. mq_t mq1; 
  13.  
  14. void *Routine1(void *arg) 
  15.     int j = 0; 
  16.     for( ; ; ) 
  17.     { 
  18.         int i = 0; 
  19.         for(; i < 10000000; ++i) 
  20.         { 
  21.             mq_push(mq1,&node_list1[j][i]); 
  22.         } 
  23.         mq_force_sync(mq1); 
  24.         j = (j + 1)%5;  
  25.         sleepms(100); 
  26.  
  27.     } 
  28.  
  29. void *Routine3(void *arg) 
  30.     int j = 0; 
  31.     for( ; ; ) 
  32.     { 
  33.         int i = 0; 
  34.         for(; i < 10000000; ++i) 
  35.         { 
  36.             mq_push(mq1,&node_list2[j][i]); 
  37.         } 
  38.         mq_force_sync(mq1); 
  39.         j = (j + 1)%5;  
  40.         sleepms(100); 
  41.  
  42.     } 
  43.  
  44. void *Routine2(void *arg) 
  45.     uint64_t count = 0; 
  46.     uint32_t tick = GetCurrentMs(); 
  47.     for( ; ; ) 
  48.     { 
  49.         list_node *n = mq_pop(mq1,50); 
  50.         if(n) 
  51.         { 
  52.             ++count; 
  53.         } 
  54.         uint32_t now = GetCurrentMs(); 
  55.         if(now - tick > 1000) 
  56.         { 
  57.             printf("recv:%d\n",(count*1000)/(now-tick)); 
  58.             tick = now; 
  59.             count = 0; 
  60.         } 
  61.     } 
  62.  
  63.  
  64. int main() 
  65.     int i = 0; 
  66.     for( ; i < 5; ++i) 
  67.     { 
  68.         node_list1[i] = calloc(10000000,sizeof(list_node)); 
  69.         node_list2[i] = calloc(10000000,sizeof(list_node)); 
  70.     } 
  71.     mq1 = create_mq(4096); 
  72.     init_system_time(10); 
  73.     thread_t t1 = create_thread(0); 
  74.     start_run(t1,Routine1,NULL); 
  75.  
  76.     thread_t t3 = create_thread(0); 
  77.     start_run(t3,Routine3,NULL);     
  78.  
  79.     thread_t t2 = create_thread(0); 
  80.     start_run(t2,Routine2,NULL); 
  81.  
  82.     getchar(); 
  83.  
  84.     return 0; 

因为主要是测试mq的效率,所以预先生成了1亿个消息,分为两个写者一个读者,两个写者循环不断的发消息,每发送1000W休眠一小会.

读者仅仅是从mq中pop一个消息出来,然后更新统计值.在我的i3 2.93双核台式机上运行rhel 6虚拟机,每秒pop出来的消息数量大概在8000W上下。

这个数据足已满足任何高性能的应用需求了.

https://github.com/sniperHW/kendylib/blob/master/src/mq.c
(sniperHW)
本站文章除注明转载外,均为本站原创或编译欢迎任何形式的转载,但请务必注明出处,尊重他人劳动,同学习共成长。转载请注明:文章转载自:罗索实验室 [http://www.rosoo.net/a/201310/16786.html]
本文出处:博客园 作者:sniperHW
顶一下
(0)
0%
踩一下
(0)
0%
------分隔线----------------------------
发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片
栏目列表
将本文分享到微信
织梦二维码生成器
推荐内容