织梦CMS - 轻松建站从此开始!

罗索

BOOST 线程完全攻略 - 扩展 - 事务线程

落鹤生 发布于 2010-12-21 09:35 点击:次 
我们写一个IM客户端的登录子线程,则该子线程会有这么几个事务要处理:No.1 TCP Socket物理连接 No.2 逻辑登录 No.3 好友在线查询 No.4 状态更新
TAG:

什么叫事务线程
举个例子:
我们写一个IM客户端的登录子线程,则该子线程会有这么几个事务要处理
No.1 TCP Socket物理连接
No.2 逻辑登录
No.3 好友在线查询
No.4 状态更新

我们通常的代码写法是
void ThreadLogin()
{
  try
  {
     if(fail(物理连接))
        throw;
     if(fail(登录))
        throw;
     if(fail(查询好友))
        throw;
     if(fail(更新))
        throw;
  }
  catch(exception)
  {
  }
}
串行的逻辑用串行的代码写,不太好看,况且中途如果主线程发出取消指令,还不好处理。
这里扩展的thread类,就是来解决这个问题的,他会提供给程序员一种事件处理的模式
class threadLogin
{
void onEventConnect()
{
  物理连接
}
void onEventLogin()
{
 登录
}
void onEventQuery()
{
查询
}
void onEventUpdate()
{
更新
}
}

源码如下

  1. // thread.hpp : controlled_module_ex类的扩展 
  2. // 增强线程事务处理能力 
  3. #pragma once 
  4. #include "controlled_module_ex.hpp" 
  5. class threadpublic controlled_module_ex 
  6. protected
  7. static const int NONE = -1; 
  8. static const int WAITING =-2; 
  9. static const int DONE =-3; 
  10. static const int FAILED =-4; 
  11. protected
  12. struct process 
  13. int level; 
  14. int status; 
  15. int sequence; 
  16. int trycount; 
  17. int tryindex; 
  18. std::string lasterror; 
  19. double timeout; 
  20. bool bTimeout; 
  21. }; 
  22. process m_process; 
  23. controlled_timer m_timer_process; 
  24. int m_process_begin,m_process_end; 
  25. double m_timeout_default; 
  26. public
  27. void startprocess(int process_begin,int process_end
  28. ,double timeout_default=1.0,int cycle=1000) 
  29. m_process_begin = process_begin; 
  30. m_process_end = process_end; 
  31. m_timeout_default = timeout_default; 
  32. m_process.level = m_process_begin; 
  33. m_process.tryindex = 0; 
  34. this->postmessage(BM_RING_PROCESS); 
  35. m_timer_process.starttimer(cycle,this); 
  36. void tryagain() 
  37. if(this->m_process.level==thread::NONE) 
  38. return
  39. this->m_process.tryindex++; 
  40. if(this->m_process.trycount>0
  41.  && this->m_process.tryindex>=this->m_process.trycount) 
  42. this->fail(); 
  43. else 
  44. this->postmessage(BM_RING_PROCESS); 
  45. void next() 
  46. if(this->m_process.level==thread::NONE) 
  47. return
  48. if(this->m_process.level>=this->m_process_end) 
  49. this->m_timer_process.stoptimer(); 
  50. this->postmessage(BM_RING_PROCESSEND); 
  51. else 
  52. this->m_process.tryindex = 0; 
  53. this->m_process.level++; 
  54. this->m_process.bTimeout = false
  55. this->postmessage(BM_RING_PROCESS); 
  56. void fail() 
  57. m_process.level = thread::NONE; 
  58. this->m_timer_process.stoptimer(); 
  59. this->postmessage(BM_RING_PROCESSFAIL); 
  60. virtual void on_safestart() 
  61. m_process.level = thread::NONE; 
  62. m_process.status = thread::NONE; 
  63. m_process_begin = m_process_end = thread::NONE; 
  64. controlled_module_ex::on_safestart(); 
  65. virtual void on_safestop() 
  66. m_timer_process.stoptimer(); 
  67. controlled_module_ex::on_safestop(); 
  68. virtual void message(const _command & cmd) 
  69. controlled_module_ex::message(cmd); 
  70. if(cmd.nCmd==BM_RING_PROCESS) 
  71. this->on_process(); 
  72. if(cmd.nCmd==BM_RING_PROCESSEND) 
  73. this->m_process.level = thread::NONE; 
  74. this->on_process_end(); 
  75. if(cmd.nCmd==BM_RING_PROCESSFAIL) 
  76. this->m_process.level = thread::NONE; 
  77. this->on_process_fail(); 
  78. virtual void on_timer(const controlled_timer * p) 
  79. if(p==this->m_timer_process) 
  80. if(this->m_process.level!=thread::NONE) 
  81. if(this->m_process.level>=this->m_process_begin
  82.  && this->m_process.level<=this->m_process_end) 
  83. if(this->m_process.status==thread::NONE) 
  84. this->m_process.level = this->m_process_begin; 
  85. m_process.tryindex = 0; 
  86. on_process(); 
  87. else if(this->m_process.status==thread::WAITING) 
  88. if(this->m_process.timeout>0) 
  89. time_t cur; 
  90. time(&cur); 
  91. if(difftime(cur,(time_t)this->m_process.sequence)>this->m_process.timeout) 
  92. this->m_process.bTimeout = true
  93. this->tryagain(); 
  94. else if(this->m_process.status==thread::FAILED) 
  95. this->tryagain(); 
  96. else if(this->m_process.status==thread::DONE) 
  97. this->m_process.level++; 
  98. m_process.tryindex = 0; 
  99. this->on_process(); 
  100. virtual void on_process() 
  101. time((time_t*)&m_process.sequence); 
  102. m_process.timeout = m_timeout_default; 
  103. m_process.status = thread::WAITING; 
  104. m_process.trycount = -1; 
  105. virtual void on_process_end(){} 
  106. virtual void on_process_fail(){} 
  107. int get_sequence(){return m_process.sequence;} 
  108. void put_timeout(double v){m_process.timeout = v;} 
  109. void put_trycount(int v){m_process.trycount = v;} 
  110. int get_level(){return m_process.level;} 
  111. void put_level(int v){m_process.level=v;} 
  112. std::string get_lasterror(){return m_process.lasterror;} 
  113. void put_lasterror(std::string v){m_process.lasterror=v;} 
  114. __declspec(property(put=put_trycount)) int trycount; 
  115. __declspec(property(put=put_timeout)) double timeout; 
  116. __declspec(property(get=get_level,put=put_level)) int level; 
  117. __declspec(property(get=get_sequence)) int sequence; 
  118. __declspec(property(get=get_lasterror,put=put_lasterror)) std::string lasterror; 
  119. }; 

虚拟函数thread::on_process()处理各种事务事件
虚拟函数thread::on_process_end()是所有事务处理完毕事件
虚拟函数thread::on_process_fail()是事务处理出现错误,这时所有事务被取消,线程终止
这里给一个简单的范例,
总共线程要完成3件事务,其中第二个事务要求用户确认是否继续

  1. #define PROCESS_1   1 
  2. #define PROCESS_2   2 
  3. #define PROCESS_3   3 
  4. class thdex: public thread 
  5. public
  6.     virtual void on_process() 
  7.     { 
  8.         thread::on_process(); 
  9.         if(this->level==PROCESS_1) 
  10.         { 
  11.             cout << "work on process 1..." << endl; 
  12.             Sleep(100); 
  13.             cout << "process 1 done." << endl; 
  14.             this->next(); 
  15.         } 
  16.         else if(this->level==PROCESS_2) 
  17.         { 
  18.             cout << "work on process 2..." << endl; 
  19.             this->timeout = -1; 
  20.             if(IDNO==::MessageBox(0,
  21. "are your want continue?","ask",MB_ICONQUESTION|MB_YESNO)) 
  22.             { 
  23.                 this->lasterror = "canceled by user"
  24.                 this->fail(); 
  25.             } 
  26.             else 
  27.             { 
  28.                 Sleep(100); 
  29.                 cout << "process 2 done." << endl; 
  30.                 this->next(); 
  31.             } 
  32.         } 
  33.         else if(this->level==PROCESS_3) 
  34.         { 
  35.             cout << "work on process 3..." << endl; 
  36.             Sleep(100); 
  37.             cout << "process 3 done." << endl; 
  38.             this->next(); 
  39.         } 
  40.     } 
  41.     virtual void on_process_fail() 
  42.     { 
  43.         cout << this->lasterror << endl; 
  44.     } 
  45.     virtual void on_process_end() 
  46.     { 
  47.         cout << "all process done." << endl; 
  48.     } 
  49. }; 
  50. int _tmain(int argc, _TCHAR* argv[]) 
  51.     thdex t; 
  52.     t.safestart(); 
  53.     t.startprocess(PROCESS_1,PROCESS_3); 
  54.     char buf[10]; 
  55.     gets_s(buf,sizeof buf); 
  56.     t.safestop(); 
  57.     return 0; 

thread事务还支持超时设定和重试次数设定,这里就不做介绍,读者可以自己研究代码。

(IamNieo )
本站文章除注明转载外,均为本站原创或编译欢迎任何形式的转载,但请务必注明出处,尊重他人劳动,同学习共成长。转载请注明:文章转载自:罗索实验室 [http://www.rosoo.net/a/201012/10623.html]
本文出处:CSDN博客 作者:IamNieo
顶一下
(0)
0%
踩一下
(0)
0%
------分隔线----------------------------
发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片
栏目列表
将本文分享到微信
织梦二维码生成器
推荐内容