加入收藏 | 设为首页 | 会员中心 | 我要投稿 开发网_郴州站长网 (http://www.0735zz.com/)- 云通信、区块链、物联设备、云计算、站长网!
当前位置: 首页 > 站长学院 > PHP教程 > 正文

PHP使用Beanstalkd实例说明

发布时间:2022-06-15 11:19:45 所属栏目:PHP教程 来源:互联网
导读:有关Beanstalkd的基本概念,编译和yum的安装方法已经在上篇文章《Beanstalkd消息/任务队列的详解》中介绍了,今天练习下PHP使用Beanstalkd的过程,我选择的是使用Pheanstalk类来连接Beanstalkd 1.使用Composer安装Pheanstalk composer require pda/pheanstal
  有关Beanstalkd的基本概念,编译和yum的安装方法已经在上篇文章《Beanstalkd消息/任务队列的详解》中介绍了,今天练习下PHP使用Beanstalkd的过程,我选择的是使用Pheanstalk类来连接Beanstalkd
 
  1.使用Composer安装Pheanstalk
 
  composer require pda/pheanstalk
 
  2.实现代码
 
  php查看beanstalkd状态脚本Status.php
 
  <?php
   
  /**
   
   * Created by PhpStorm.
   
   * User: jmsite.cn
   
   * Date: 2019/1/21
   
   * Time: 10:32
   
   */
   
  require "../vendor/autoload.php";
   
  use PheanstalkPheanstalk;
   
  $pheanstalk = new Pheanstalk('192.168.75.135',11300);
   
  print_r($pheanstalk->stats());
  生产者代码Producter.php
 
  <?php
   
  /**
   
   * Created by PhpStorm.
   
   * User: jmsite.cn
   
   * Date: 2019/1/20
   
   * Time: 16:30
   
   */
   
  require "../vendor/autoload.php";
   
  use PheanstalkPheanstalk;
   
  $pheanstalk = new Pheanstalk('192.168.75.135',11300);
   
  for ($i=0;$i<50;$i++){
   
      $data = array(
   
          'key' => 'testkey'.$i,
   
          'value' => 'testvalue',
   
          'time' => time(),
  //phpfensi.com
      );
   
      $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR);
   
      var_dump($ret);
   
  }
  消费者代码Consumer.php
 
  <?php
   
  /**
   
   * Created by PhpStorm.
   
   * User: jmsite.cn
   
   * Date: 2019/1/20
   
   * Time: 16:31
   
   */
   
  set_time_limit(0);
   
  ini_set('default_socket_timeout', 900);
   
  require "../vendor/autoload.php";
   
  use PheanstalkPheanstalk;
   
  $pheanstalk = new Pheanstalk('192.168.75.135',11300);
   
  while (true){
   
      $job = $pheanstalk
   
          ->watch('test-tube')
   
          ->ignore('default')
   
          ->reserve();
   
      if ($job){
   
          sleep(2);
   
          echo $job->getData();
   
          echo "n";
   
          $pheanstalk->delete($job);
   
      }
   
  }
  打开命令行/终端窗口,执行生产者,会向tube写入50条任务
 
  PS E:repositoryworkbeanstalk> php .Producter.php
   
  int(101)
   
  int(102)
   
  int(103)
   
  int(104)
   
  int(105)
   
  int(106)
   
  int(107)
   
  int(108)
   
  int(109)
   
  int(110)
   
  int(111)
   
  int(112)
   
  int(113)
   
  int(114)
   
  ......
  由此可见,$pheanstalk->putInTube成功后返回的是job的id
 
  查看状态
 
  PS E:repositoryworkbeanstalk> php Status.php
   
  PheanstalkResponseArrayResponse Object
   
  (
   
      [_name:PheanstalkResponseArrayResponse:private] => OK
   
      [storage:ArrayObject:private] => Array
   
          (
   
              [current-jobs-urgent] => 0
   
              [current-jobs-ready] => 50
   
              [current-jobs-reserved] => 0
   
              [current-jobs-delayed] => 0
   
              [current-jobs-buried] => 0
   
              ......
  结果中显示处于ready待读取状态的job是50个
 
  打开两个或以上命令行/终端窗口,执行消费者,模拟多消费者竞争
 
  消费者1
 
  PS E:repositoryworkbeanstalk> php .Consumer.php
   
  {"key":"testkey0","value":"testvalue","time":1548039103}
   
  {"key":"testkey1","value":"testvalue","time":1548039103}
   
  {"key":"testkey2","value":"testvalue","time":1548039103}
   
  {"key":"testkey4","value":"testvalue","time":1548039103}
   
  {"key":"testkey6","value":"testvalue","time":1548039103}
   
  {"key":"testkey8","value":"testvalue","time":1548039103}
   
  {"key":"testkey10","value":"testvalue","time":1548039103}
   
  {"key":"testkey12","value":"testvalue","time":1548039103}
   
  {"key":"testkey14","value":"testvalue","time":1548039103}
   
  {"key":"testkey16","value":"testvalue","time":1548039103}
   
  {"key":"testkey18","value":"testvalue","time":1548039103}
   
  {"key":"testkey20","value":"testvalue","time":1548039103}
   
  {"key":"testkey22","value":"testvalue","time":1548039103}
   
  {"key":"testkey24","value":"testvalue","time":1548039103}
   
  {"key":"testkey26","value":"testvalue","time":1548039103}
   
  {"key":"testkey28","value":"testvalue","time":1548039103}
   
  {"key":"testkey30","value":"testvalue","time":1548039103}
   
  {"key":"testkey32","value":"testvalue","time":1548039103}
   
  {"key":"testkey34","value":"testvalue","time":1548039103}
   
  {"key":"testkey36","value":"testvalue","time":1548039103}
   
  {"key":"testkey38","value":"testvalue","time":1548039103}
   
  {"key":"testkey40","value":"testvalue","time":1548039103}
   
  {"key":"testkey42","value":"testvalue","time":1548039103}
   
  {"key":"testkey44","value":"testvalue","time":1548039103}
   
  {"key":"testkey46","value":"testvalue","time":1548039103}
   
  {"key":"testkey48","value":"testvalue","time":1548039103}
  消费者2
 
  PS E:repositoryworkbeanstalk> php .Consumer.php
   
  {"key":"testkey3","value":"testvalue","time":1548039103}
   
  {"key":"testkey5","value":"testvalue","time":1548039103}
   
  {"key":"testkey7","value":"testvalue","time":1548039103}
   
  {"key":"testkey9","value":"testvalue","time":1548039103}
   
  {"key":"testkey11","value":"testvalue","time":1548039103}
   
  {"key":"testkey13","value":"testvalue","time":1548039103}
   
  {"key":"testkey15","value":"testvalue","time":1548039103}
   
  {"key":"testkey17","value":"testvalue","time":1548039103}
   
  {"key":"testkey19","value":"testvalue","time":1548039103}
   
  {"key":"testkey21","value":"testvalue","time":1548039103}
   
  {"key":"testkey23","value":"testvalue","time":1548039103}
   
  {"key":"testkey25","value":"testvalue","time":1548039103}
   
  {"key":"testkey27","value":"testvalue","time":1548039103}
   
  {"key":"testkey29","value":"testvalue","time":1548039103}
   
  {"key":"testkey31","value":"testvalue","time":1548039103}
   
  {"key":"testkey33","value":"testvalue","time":1548039103}
   
  {"key":"testkey35","value":"testvalue","time":1548039103}
   
  {"key":"testkey37","value":"testvalue","time":1548039103}
   
  {"key":"testkey39","value":"testvalue","time":1548039103}
   
  {"key":"testkey41","value":"testvalue","time":1548039103}
   
  {"key":"testkey43","value":"testvalue","time":1548039103}
   
  {"key":"testkey45","value":"testvalue","time":1548039103}
   
  {"key":"testkey47","value":"testvalue","time":1548039103}
   
  {"key":"testkey49","value":"testvalue","time":1548039103}
  两个消费者竞争着完成了全部任务,由于我的beanstalkd启动时开启了binlog持久,所以beanstalkd重启后任务也不会丢失
 
  3.需要注意的事项
 
  1.创建job时,设置的超时时间Pheanstalk::DEFAULT_TTR一定要比消费者处理一个job的时间要长,否则job在超时之后会被tube更改为ready状态,被其他消费者获取,而此时当前消费者还在处理该job,这就出现了一个job被多个消费者重复执行的可怕现象
 
  2.Pheanstalk的维护者发生了变化,在新版的Pheanstalk中是不支持长连接的,当客户端socket连接服务器时间超过php.ini中设置的default_socket_timeout时,如果未能从服务端tube获得job,连接将会被断开,所以消费者进程需要维护,以便在退出后可以重新开启进程,推荐使用supervisord维护消费者进程。
 
  判断socket超时的代码
 
  public function getLine($length = null)
   
      {
   
          $timeout = ini_get('default_socket_timeout');
   
          $timer   = microtime(true);
   
          do {
   
              $data = isset($length) ?
   
                  $this->_wrapper()->fgets($this->_socket, $length) :
   
                  $this->_wrapper()->fgets($this->_socket);
   
              if ($this->_wrapper()->feof($this->_socket)) {
   
                  throw new ExceptionSocketException('Socket closed by server!');
   
              }
   
              if (($data === false) && microtime(true) - $timer > $timeout) {
   
                  $this->disconnect();
   
                  throw new ExceptionSocketException('Socket timed out!');
   
              }
   
          } while ($data === false);
   
          return rtrim($data);
   
      } 

(编辑:开发网_郴州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读