为了在各线程之间高效的传递消息,必须设计一种高效率的消息队列,传统的做法是mutex加queue,这种做法在每次执行push和pop时都要加锁,效率相对较低。其次还有使用循环队列,可以做到完全无锁,但只能实现1:1的消息传递。还有一些lock-free队列的实现,但基于其实现的相对复杂性,我不打算使用。
我的队列设计是使用tls维护一个local list,每个线程执行push时,首先将元素放入属于本线程的local list中,此时是无需加锁的,然后检查队列中元素的总数,如果发现总数超过一个阀值,则将local list中的所有元素一次性提交到share list中,此时需要加锁,share list中的元素是对全局可见的。
当读者执行pop操作时,首先从检查自己的local list中是否有元素,如果有就返回一个,如果没有则尝试从share list中将所有元素同步到自己的local list中.
local list和message queue的结构如下:
- struct per_thread_struct
- {
- list_node next;
- struct double_link_node block;
- struct link_list *local_q;
- condition_t cond;
- };
-
- struct mq
- {
- uint32_t push_size;
- pthread_key_t t_key;
- mutex_t mtx;
- struct double_link blocks;
- struct link_list *share_list;
- struct link_list *local_lists;
-
- };
对于push操作,提供了两个接口:
- void mq_push(mq_t,struct list_node*);
- void mq_push_now(mq_t,struct list_node*);
mq_push将元素插入local list但只有当local list中的元素到达一定阀值时才会执行提交操作mq_sync_push.
而mq_push_now将元素插入local list之后马上就会执行提交操作.
然后还有一个问题,如果local list中的元素较长时间内都达不到阀值,会导致消息传递的延时,所以提供了mq_force_sync函数,此函数的作用是
强制将执行一次提交操作,将local list中的所有元素提交到share list中去。producer线程可在其主循环内以固定的频率执行mq_force_sync,将一个
时间循环内剩余未被提交的消息提交出去.
下面贴下测试代码:
- #include <stdio.h>
- #include <stdlib.h>
- #include "KendyNet.h"
- #include "thread.h"
- #include "SocketWrapper.h"
- #include "atomic.h"
- #include "SysTime.h"
- #include "mq.h"
-
- list_node *node_list1[5];
- list_node *node_list2[5];
- mq_t mq1;
-
- void *Routine1(void *arg)
- {
- int j = 0;
- for( ; ; )
- {
- int i = 0;
- for(; i < 10000000; ++i)
- {
- mq_push(mq1,&node_list1[j][i]);
- }
- mq_force_sync(mq1);
- j = (j + 1)%5;
- sleepms(100);
-
- }
- }
-
- void *Routine3(void *arg)
- {
- int j = 0;
- for( ; ; )
- {
- int i = 0;
- for(; i < 10000000; ++i)
- {
- mq_push(mq1,&node_list2[j][i]);
- }
- mq_force_sync(mq1);
- j = (j + 1)%5;
- sleepms(100);
-
- }
- }
-
- void *Routine2(void *arg)
- {
- uint64_t count = 0;
- uint32_t tick = GetCurrentMs();
- for( ; ; )
- {
- list_node *n = mq_pop(mq1,50);
- if(n)
- {
- ++count;
- }
- uint32_t now = GetCurrentMs();
- if(now - tick > 1000)
- {
- printf("recv:%d\n",(count*1000)/(now-tick));
- tick = now;
- count = 0;
- }
- }
- }
-
-
- int main()
- {
- int i = 0;
- for( ; i < 5; ++i)
- {
- node_list1[i] = calloc(10000000,sizeof(list_node));
- node_list2[i] = calloc(10000000,sizeof(list_node));
- }
- mq1 = create_mq(4096);
- init_system_time(10);
- thread_t t1 = create_thread(0);
- start_run(t1,Routine1,NULL);
-
- thread_t t3 = create_thread(0);
- start_run(t3,Routine3,NULL);
-
- thread_t t2 = create_thread(0);
- start_run(t2,Routine2,NULL);
-
- getchar();
-
- return 0;
- }
因为主要是测试mq的效率,所以预先生成了1亿个消息,分为两个写者一个读者,两个写者循环不断的发消息,每发送1000W休眠一小会.
读者仅仅是从mq中pop一个消息出来,然后更新统计值.在我的i3 2.93双核台式机上运行rhel 6虚拟机,每秒pop出来的消息数量大概在8000W上下。
这个数据足已满足任何高性能的应用需求了.
https://github.com/sniperHW/kendylib/blob/master/src/mq.c
(sniperHW) |