织梦CMS - 轻松建站从此开始!

罗索

BOOST 线程完全攻略 - 扩展 - 线程消息通讯

jackyhwei 发布于 2010-12-09 20:14 点击:次 
boost之controlled_module_ex.hpp : controlled_module类的扩展,增强线程之间消息通讯,增加线程安全启动和安全关闭功能 增加定时器功能
TAG:

controlled_module_ex.hpp : controlled_module类的扩展
增强线程之间消息通讯
增加线程安全启动和安全关闭功能
增加定时器功能

  1. #pragma once 
  2. #include <boost/shared_ptr.hpp> 
  3. #include <boost/any.hpp> 
  4. #include "controlled_module.hpp" 
  5.  
  6. struct _command 
  7.     typedef boost::shared_ptr<_command> CCmdPtr; 
  8.     unsigned int nCmd; 
  9.     boost::any anyParam; 
  10. }; 
  11. struct _wait_command 
  12. {       
  13.     boost::any par; 
  14.     unsigned int command; 
  15.     void * event; 
  16.     boost::shared_ptr<boost::any> resp; 
  17. }; 
  18. class controlled_module_ex; 
  19. struct _notify 
  20.     controlled_module_ex * sender; 
  21.     int id; 
  22.     boost::any par; 
  23. }; 
  24. #define BM_RESERVE 1000 
  25. #define BM_RING_START BM_RESERVE+1 
  26. #define BM_RING_STOP BM_RESERVE+2 
  27. #define BM_RING_SETTIME  BM_RESERVE+3 
  28. #define BM_RING_SETPARENT BM_RESERVE+4 
  29. #define BM_RING_CYCLE BM_RESERVE+5 
  30. #define BM_RING_PROCESS BM_RESERVE+6 
  31. #define BM_RING_PROCESSEND BM_RESERVE+7 
  32. #define BM_RING_PROCESSFAIL BM_RESERVE+8 
  33. #define BM_TIMER    BM_RESERVE+9 
  34. #define BM_COMMAND  BM_RESERVE+10 
  35. #define BM_NOTIFY   BM_RESERVE+11 
  36.  
  37. #define BM_USER 9000 
  38. class controlled_timer; 
  39. class controlled_module_ex: public controlled_module 
  40. public
  41.     controlled_module_ex() 
  42.     { 
  43.         m_safe = false
  44.     } 
  45.     ~controlled_module_ex() 
  46.     { 
  47.         safestop(); 
  48.     } 
  49. public
  50.     template<typename T> 
  51.     bool postmessage(unsigned int nCmd, const boost::shared_ptr<T>& p) 
  52.     { 
  53.         if(this==0||!m_safe)return false
  54.         boost::mutex::scoped_lock lock(m_mutex_command); 
  55.         _command::CCmdPtr cmd(new _command); 
  56.         cmd->nCmd = nCmd; 
  57.         cmd->anyParam = p; 
  58.         m_list_command.push_back(cmd); 
  59.         return true
  60.     } 
  61.     boost::any execute(unsigned int command,boost::any par,int timeout=-1) 
  62.     { 
  63.         boost::shared_ptr<_wait_command> shared(new _wait_command); 
  64.         _wait_command & cmd = *shared; 
  65.         cmd.command = command; 
  66.         cmd.event = (void *)CreateEvent(0,FALSE,FALSE,0); 
  67.         cmd.par = par; 
  68.         cmd.resp = boost::shared_ptr<boost::any>(new boost::any); 
  69.         if(this->postmessage(BM_COMMAND,shared)) 
  70.         { 
  71.             DWORD dw = WaitForSingleObject(cmd.event,timeout); 
  72.             CloseHandle(cmd.event); 
  73.             if(dw!=WAIT_OBJECT_0) 
  74.                 return boost::any(); 
  75.             else 
  76.                 return *cmd.resp; 
  77.         } 
  78.         else 
  79.         { 
  80.             CloseHandle(cmd.event); 
  81.             return boost::any(); 
  82.         } 
  83.     } 
  84.     void notify(_notify p) 
  85.     { 
  86.         this->postmessage(BM_NOTIFY,p); 
  87.     } 
  88.     bool postmessage(unsigned int nCmd,boost::any p) 
  89.     { 
  90.         if(this==0||!m_safe) 
  91.             return false
  92.         boost::mutex::scoped_lock lock(m_mutex_command); 
  93.         _command::CCmdPtr cmd(new _command); 
  94.         cmd->nCmd = nCmd; 
  95.         cmd->anyParam = p; 
  96.         m_list_command.push_back(cmd); 
  97.         return true
  98.     } 
  99.     bool postmessage(unsigned int nCmd) 
  100.     { 
  101.         if(this==0||!m_safe) 
  102.             return false
  103.         boost::mutex::scoped_lock lock(m_mutex_command); 
  104.         _command::CCmdPtr cmd(new _command); 
  105.         cmd->nCmd = nCmd; 
  106.         cmd->anyParam = 0; 
  107.         m_list_command.push_back(cmd); 
  108.         return true
  109.     } 
  110.     virtual bool work() 
  111.     { 
  112.         if(!getmessage()) 
  113.             return false
  114.         else 
  115.         { 
  116.             Sleep(this->m_sleeptime); 
  117.             return true
  118.         } 
  119.     } 
  120.     virtual void message(const _command & cmd) 
  121.     { 
  122.         if(cmd.nCmd==BM_RING_START) 
  123.         { 
  124.             this->on_safestart(); 
  125.         } 
  126.         else if(cmd.nCmd==BM_RING_STOP) 
  127.         { 
  128.             this->on_safestop(); 
  129.         } 
  130.         else if(cmd.nCmd==BM_TIMER) 
  131.         { 
  132.             this->on_timer(boost::any_cast<controlled_timer*>(cmd.anyParam)); 
  133.         } 
  134.         else if(cmd.nCmd==BM_COMMAND) 
  135.         { 
  136.             boost::shared_ptr<_wait_command> shared =
  137.  boost::any_cast< boost::shared_ptr<_wait_command> >(cmd.anyParam); 
  138.             _wait_command & cmd = *shared; 
  139.             *cmd.resp = this->on_command(cmd.command,cmd.par); 
  140.             SetEvent((HANDLE)cmd.event); 
  141.         } 
  142.         else if(cmd.nCmd==BM_NOTIFY) 
  143.         { 
  144.             try 
  145.             { 
  146.                 _notify par = boost::any_cast<_notify>(cmd.anyParam); 
  147.                 this->on_notify(par); 
  148.             } 
  149.             catch(boost::bad_any_cast) 
  150.             { 
  151.             } 
  152.         } 
  153.     } 
  154.     virtual void release() 
  155.     { 
  156.         boost::mutex::scoped_lock lock(m_mutex_command); 
  157.         m_list_command.clear(); 
  158.     } 
  159.     void safestart() 
  160.     { 
  161.         if(!islive()) 
  162.             start(); 
  163.         m_safe = true
  164.         m_safestart_event = (void*)CreateEvent(NULL,FALSE,FALSE,0); 
  165.         postmessage(BM_RING_START); 
  166.         ::WaitForSingleObject((HANDLE)m_safestart_event,INFINITE); 
  167.         CloseHandle(m_safestart_event); 
  168.     } 
  169.     void safestop() 
  170.     { 
  171.     if(this->islive()) 
  172.     { 
  173.         m_safe = false
  174.         m_safestop_event = (void*)CreateEvent(NULL,FALSE,FALSE,0); 
  175.         { 
  176.             boost::mutex::scoped_lock lock(m_mutex_command); 
  177.             _command::CCmdPtr cmd(new _command); 
  178.             cmd->nCmd = BM_RING_STOP; 
  179.             cmd->anyParam = 0; 
  180.             m_list_command.push_back(cmd); 
  181.         } 
  182.         DWORD dw = ::WaitForSingleObject((HANDLE)m_safestop_event,3*1000); 
  183.         if(WAIT_OBJECT_0!=dw) 
  184.         { 
  185.         } 
  186.         CloseHandle(m_safestop_event); 
  187.         stop(); 
  188.     } 
  189.     } 
  190.     virtual void on_timer(const controlled_timer * p){} 
  191.     virtual void on_safestart() 
  192.     {
  193.         SetEvent(m_safestart_event); 
  194.     } 
  195.     virtual void on_safestop() 
  196.     { 
  197.         SetEvent(m_safestop_event); 
  198.     } 
  199.     virtual void on_notify(const _notify & p) 
  200.     { 
  201.     } 
  202. protected
  203.     virtual boost::any on_command(const unsigned int command,const boost::any par) 
  204.     { 
  205.         return boost::any(); 
  206.     } 
  207.     bool getmessage() 
  208.     { 
  209.         std::list<_command::CCmdPtr> cache; 
  210.         { 
  211.             boost::mutex::scoped_lock lock(m_mutex_command); 
  212.             while(!m_list_command.empty()) 
  213.             { 
  214.                 _command::CCmdPtr p = m_list_command.front(); 
  215.                 m_list_command.pop_front(); 
  216.                 cache.push_back(p); 
  217.             } 
  218.         } 
  219.         _command::CCmdPtr stop_command; 
  220.         std::list<_command::CCmdPtr>::iterator item; 
  221.         for(item = cache.begin();item!=cache.end();item++) 
  222.         { 
  223.             if((*(*item)).nCmd==BM_RING_STOP) 
  224.             { 
  225.                 stop_command = *item;               
  226.                 break
  227.             } 
  228.         } 
  229.         if(stop_command.get()==0) 
  230.         { 
  231.             while(!cache.empty()) 
  232.             { 
  233.                 _command::CCmdPtr p = cache.front(); 
  234.                 cache.pop_front(); 
  235.                 try 
  236.                 { 
  237.                     if((*p).nCmd!=BM_RING_START) 
  238.                     { 
  239.                         if(!this->m_safe) 
  240.                             continue
  241.                     } 
  242.                     this->message(*p); 
  243.                 } 
  244.                 catch(boost::bad_any_cast &) 
  245.                 { 
  246.                 } 
  247.             } 
  248.             return true
  249.         } 
  250.         else 
  251.         { 
  252.             cache.clear(); 
  253.             this->message(*stop_command); 
  254.             return false
  255.         } 
  256.     } 
  257. private
  258.     void*   m_safestart_event; 
  259.     void* m_safestop_event; 
  260.     bool m_safe;//在多线程,尤其牵涉到线程之间有类似socket级别关联时,
  261. //当父线程safestop以后有可能会收到其他线程的 postmessage,这时会引起线程死锁,
  262. //这个m_safe就是解决这个问题的,当safestop以后不再接收新消息处理 
  263.     boost::mutex m_mutex_command; 
  264.     std::list<_command::CCmdPtr> m_list_command; 
  265. }; 
  266. class controlled_timer: public controlled_module_ex 
  267. public
  268.   controlled_timer() 
  269.   { 
  270.     this->m_time = 0; 
  271.     this->m_parent = 0; 
  272.     this->m_step = 0; 
  273.   } 
  274.   ~controlled_timer(){ 
  275.   } 
  276. protected
  277.   controlled_module_ex* m_parent; 
  278.   int m_time; 
  279.   int m_step; 
  280. public
  281.   void starttimer(int time,controlled_module_ex* parent) 
  282.   { 
  283.     this->safestart(); 
  284.     this->postmessage(BM_RING_SETPARENT,parent); 
  285.     this->postmessage(BM_RING_SETTIME,time); 
  286.   } 
  287.   void stoptimer() 
  288.   { 
  289.     this->safestop(); 
  290.   } 
  291. public
  292.   virtual void on_safestop() 
  293.   { 
  294.     m_time = 0; 
  295.     controlled_module_ex::on_safestop(); 
  296.   } 
  297.   virtual void message(const _command & cmd) 
  298.   { 
  299.     controlled_module_ex::message(cmd); 
  300.     if(cmd.nCmd==BM_RING_SETTIME) 
  301.     { 
  302.         int time = boost::any_cast<int>(cmd.anyParam); 
  303.         this->m_time = time/this->m_sleeptime; 
  304.         this->postmessage(BM_RING_CYCLE); 
  305.     } 
  306.     else if(cmd.nCmd==BM_RING_SETPARENT) 
  307.     { 
  308.         this->m_parent  = boost::any_cast<controlled_module_ex*>(cmd.anyParam); 
  309.     }   
  310.     else if(cmd.nCmd==BM_RING_CYCLE) 
  311.     { 
  312.         if(m_time>0) 
  313.         { 
  314.             if(m_step>m_time) 
  315.             { 
  316.                 m_parent->postmessage(BM_TIMER,this); 
  317.                 m_step=0; 
  318.             } 
  319.             m_step++; 
  320.         } 
  321.         this->postmessage(BM_RING_CYCLE); 
  322.     } 
  323.   } 
  324. }; 

1.向线程PostMessage
  函数controlled_module_ex::postmessage完成消息推送。
  虚拟函数controlled_module_ex::message(const _command & cmd)实现消息接收。

  1. #include "controlled_module_ex.hpp" 
  2. class thdex: public controlled_module_ex 
  3. protected
  4.     virtual void message(const _command & cmd) 
  5.     { 
  6.         controlled_module_ex::message(cmd); 
  7.         if(cmd.nCmd==BM_USER+1) 
  8.         { 
  9.             cout << "get message" << endl; 
  10.         } 
  11.     } 
  12. }; 
  13.  
  14. int _tmain(int argc, _TCHAR* argv[]) 
  15.     thdex t; 
  16.     t.safestart(); 
  17.     t.postmessage(BM_USER+1); 
  18.     char buf[10]; 
  19.     gets_s(buf,sizeof buf); 
  20.     t.safestop(); 
  21.     return 0; 

下面介绍向线程PostMessage,并携带简单对象参数的实现方法。
(IamNieo)

本站文章除注明转载外,均为本站原创或编译欢迎任何形式的转载,但请务必注明出处,尊重他人劳动,同学习共成长。转载请注明:文章转载自:罗索实验室 [http://www.rosoo.net/a/201012/10589.html]
本文出处:CSDN博客 作者:IamNieo
顶一下
(1)
50%
踩一下
(1)
50%
------分隔线----------------------------
发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片
栏目列表
将本文分享到微信
织梦二维码生成器
推荐内容