PHP pthreads v3下worker和pool的使用方法示例

吾爱主题 阅读:144 2021-09-29 13:47:00 评论:0

本文实例讲述了PHP pthreads v3下worker和pool的使用方法。分享给大家供大家参考,具体如下:

有些人会想,明明用thread已经可以很好的工作了,为什么还要搞个worker和pool?

之所以要用到worker和pool还是因为效率,因为系统创建一个新线程代价是比较昂贵,每个创建的线程会复制当前执行的整个上下文。

尽可能的重用线程可以让我们的程序更高效。

一个简单的worker例子:

  1. <?php
  2. //创建自定义work类,给work取个名字,方便查看
  3. class Work extends Worker
  4. {
  5. private $name;
  6.  
  7. public function __construct($name)
  8. {
  9. $this->name = $name;
  10. }
  11.  
  12. public function getName()
  13. {
  14. return $this->name;
  15. }
  16. }
  17.  
  18. class Task extends Thread
  19. {
  20. private $num;
  21.  
  22. public function __construct($num)
  23. {
  24. $this->num = $num;
  25. }
  26.  
  27. public function run()
  28. {
  29. //计算累加和
  30. $total = 0;
  31. for ($i = 0; $i < $this->num; $i++) {
  32. $total += $i;
  33. }
  34. echo "work : {$this->worker->getName()} task : {$total} \n";
  35. sleep(1);
  36. }
  37. }
  38.  
  39. //创建一个worker线程
  40. $work = new Work('a');
  41.  
  42. $work->start();
  43.  
  44. for ($i = 1; $i <= 10; $i++) {
  45. //将Task对象压栈到worker线程中
  46. //这个时候Task对象就可以使用worker线程上下文(变量,函数等)
  47. $work->stack(new Task($i));
  48. }
  49.  
  50. //循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
  51. while ($work->collect()) ;
  52.  
  53. //关闭worker
  54. $work->shutdown();

上面代码在运行的时候,计算结果会每隔一秒出来一条,也就是10个task对象是运行在1个worker线程上的。

如果10个task对象是分别在独立空间运行的,sleep()函数就不会起作用,他们各自sleep并不会影响其他线程。

把上面的代码修改一下:

  1. <?php
  2. //创建自定义work类,给work取个名字,方便查看
  3. class Work extends Worker
  4. {
  5. private $name;
  6.  
  7. public function __construct($name)
  8. {
  9. $this->name = $name;
  10. }
  11.  
  12. public function getName()
  13. {
  14. return $this->name;
  15. }
  16. }
  17.  
  18. class Task extends Thread
  19. {
  20. private $num;
  21.  
  22. public function __construct($num)
  23. {
  24. $this->num = $num;
  25. }
  26.  
  27. public function run()
  28. {
  29. //计算累加和
  30. $total = 0;
  31. for ($i = 0; $i < $this->num; $i++) {
  32. $total += $i;
  33. }
  34. echo "work : {$this->worker->getName()} task : {$total} \n";
  35. sleep(1);
  36. }
  37. }
  38.  
  39. //创建二个worker线程
  40. $work1 = new Work('a');
  41. $work2 = new Work('b');
  42.  
  43. $work1->start();
  44. $work2->start();
  45.  
  46. for ($i = 1; $i <= 10; $i++) {
  47. if ($i <= 5) {
  48. $work1->stack(new Task($i));
  49. } else {
  50. $work2->stack(new Task($i));
  51. }
  52. }
  53.  
  54. //循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
  55. while ($work1->collect() || $work2->collect()) ;
  56.  
  57. //关闭worker
  58. $work1->shutdown();
  59. $work2->shutdown();

这里我们创建2个worker线程,让10个task对象分别压栈到2个worker中。

这时可以看到,计算结果是一对一对的出来,说明10个task对象跑在了2个worker线程上。

至于需要创建多少个worker线程,和多少个task对象,就看自已的需求了。

worker还有一个好处就是可以重用worker中的对象和方法。我们可以在worker中创建一个连接数据库对象,方便各task调用。

  1. <?php
  2. class DB extends Worker
  3. {
  4. //注意这里设置为静态成员,pdo连接本身是不能在上下文中共享的
  5. //声明为静态成员,让每个worker有自已的pdo连接
  6. private static $db = null;
  7. public $msg = 'i from db';
  8.  
  9. public function run()
  10. {
  11. self::$db = new PDO('mysql:host=192.168.33.226;port=3306;dbname=test;charset=utf8', 'root', '');
  12. }
  13.  
  14. public function getDb()
  15. {
  16. return self::$db;
  17. }
  18. }
  19.  
  20. class Task extends Thread
  21. {
  22. private $id;
  23. //注意,这里不要给成员设置默认值,$result成员是线程对象是不可变的,不能被改写
  24. private $result;
  25.  
  26. public function __construct($id)
  27. {
  28. $this->id = $id;
  29. }
  30.  
  31. public function run()
  32. {
  33. //获取worker中的数据库连接
  34. $db = $this->worker->getDb();
  35. $ret = $db->query("select * from tb_user where id = {$this->id}");
  36. $this->result = $ret->fetch(PDO::FETCH_ASSOC);
  37. //访问worker中的成员变量msg
  38. echo "data : {$this->result['id']} {$this->result['name']} \t worker data : {$this->worker->msg} \n";
  39. }
  40. }
  41.  
  42. //创建一个worker线程
  43. $work = new DB();
  44.  
  45. $work->start();
  46.  
  47. for ($i = 1; $i <= 5; $i++) {
  48. $work->stack(new Task($i));
  49. }
  50.  
  51. //循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
  52. while ($work->collect()) ;
  53.  
  54. //关闭worker
  55. $work->shutdown();

tb_user表大家可以随意创建,我这里为了演示只创建了id和name字段

运行结果如下:

如果说worker是对线程的重用,那么pool就是对worker更高的抽象了,可以同时管理多个worker。

  1. <?php
  2. //之所以要创建一个Id线程类,主要是为了给work取个不同的ID,方便查看,哪些task线程属于哪个work中
  3. class Id extends Thread
  4. {
  5. private $id;
  6.  
  7. public function getId()
  8. {
  9. //防止出现id混乱,这里使用同步操作
  10. $this->synchronized(function () {
  11. ++$this->id;
  12. });
  13. return $this->id;
  14. }
  15. }
  16.  
  17. class Work extends Worker
  18. {
  19. private $id;
  20.  
  21. public function __construct(Id $obj)
  22. {
  23. $this->id = $obj->getId();
  24. }
  25.  
  26. public function getId()
  27. {
  28. return $this->id;
  29. }
  30. }
  31.  
  32. class Task extends Thread
  33. {
  34. private $num = 0;
  35.  
  36. public function __construct($num)
  37. {
  38. $this->num = $num;
  39. }
  40.  
  41. //计算累加和
  42. public function run()
  43. {
  44. $total = 0;
  45. for ($i = 0; $i < $this->num; $i++) {
  46. $total += $i;
  47. }
  48. echo "work id : {$this->worker->getId()} task : {$total} \n";
  49. }
  50. }
  51.  
  52. //创建pool,可容纳3个work对象
  53. $pool = new Pool(3, 'Work', [new Id()]);
  54.  
  55. //循环的把20个task线程提交到pool中的work对象上运行
  56. for ($i = 1; $i <= 20; $i++) {
  57. $pool->submit(new Task($i));
  58. }
  59.  
  60. //循环的清理任务,会阻塞主线程,直到任务都执行完毕
  61. while ($pool->collect()) ;
  62.  
  63. //关闭pool
  64. $pool->shutdown();

运行结果如下:

希望本文所述对大家PHP程序设计有所帮助。

原文链接:https://www.cnblogs.com/jkko123/p/8876783.html

可以去百度分享获取分享代码输入这里。
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

【腾讯云】云服务器产品特惠热卖中
搜索
标签列表
    关注我们

    了解等多精彩内容