Maison  >  Article  >  développement back-end  >  Explication détaillée de la file d'attente de messages beanstalkd en php et partage de classes

Explication détaillée de la file d'attente de messages beanstalkd en php et partage de classes

黄舟
黄舟original
2018-05-19 16:10:592625parcourir

Cet article partage principalement l'exemple de code de classe de file d'attente de messages php-beanstalkd pour tout le monde, qui a une certaine valeur de référence. Les amis intéressés peuvent se référer à

Aperçu :
Beanstalkd, un A de haut niveau. Système de file d'attente de mémoire distribuée léger et performant, conçu à l'origine pour réduire le délai d'accès aux pages des systèmes d'applications Web de grande capacité en exécutant de manière asynchrone des tâches fastidieuses en arrière-plan, il a pris en charge l'application Facebook Causes avec 9,5 millions d'utilisateurs. Plus tard, il est devenu open source et PostRank est désormais déployé et utilisé à grande échelle, traitant des millions de tâches chaque jour. Beanstalkd est une conception typique de type Memcached. Le protocole et l'utilisation sont les mêmes, donc les utilisateurs qui ont utilisé Memcached auront l'impression que Beanstalkd semble familier.
Concepts de base de beanstalk :
Job : une tâche qui nécessite un traitement asynchrone doit être placée dans un tube.
tube : une file d'attente de tâches bien connue utilisée pour stocker des jobs d'un type uniforme
producteur : le producteur du job
consommateur : le consommateur du job
Bref, le processus n'en est qu'un phrase :
Le producteur génère une tâche et pousse la tâche dans un tube
Ensuite, le consommateur sort la tâche du tube et l'exécute (bien sûr, la prémisse de toutes ces opérations est que le service beanstalk. est en marche).

Une tâche a READY (toujours prête à être exécutée par un consommateur), RESERVED (la tâche est en cours de traitement par un consommateur), DELAYED (retarde la tâche et entre dans l'état prêt après le délai défini) , ENTERRÉ (Pendant l'hibernation, l'état doit être transféré avant l'opération) quatre états. Lorsque le producteur met directement une tâche, la tâche est à l'état PRÊT, attendant que le consommateur la traite. Si vous choisissez de retarder la mise, la tâche entrera d'abord dans l'état RETARDÉ, puis migrera vers l'état PRÊT après la mise en place. temps d'attente. Une fois que le consommateur a obtenu le travail READY actuel, le statut du travail est migré vers RESERVED, afin que les autres consommateurs ne puissent plus exécuter le travail. Une fois que le consommateur a terminé le travail, il peut choisir l'opération de suppression, de libération ou d'enfouissement ; après la suppression, le travail disparaît du système et ne peut plus être obtenu ; l'opération de libération peut revenir au statut READY (elle peut également retarder le statut). opération de migration). Cela permet à d'autres consommateurs de continuer à obtenir et à exécuter le travail ; ce qui est intéressant est l'opération d'enfouissement, qui peut mettre le travail en veille, puis remettre le travail dormant à l'état READY si nécessaire, ou supprimer le travail. travail à l’état ENTERRÉ. C'est précisément grâce à ces opérations et états intéressants que de nombreuses applications intéressantes peuvent être créées sur leur base. Par exemple, si vous souhaitez implémenter une file d'attente circulaire, vous pouvez mettre en veille les tâches à l'état RÉSERVÉ, puis attendre le résultat. les tâches à l'état READY doivent être mises en veille. La tâche revient à l'état READY en une seule fois.

Exemple d'analyse : Weibo est un exemple très typique :
1. Postez un Weibo
2 Poussez-le à ses fans (s'il y a 1 million de fans, cet endroit sera obstrué pendant longtemps. ce que les utilisateurs ressentent, c'est du retard)
Pour publier un élément de contenu sur Weibo, vous devez faire les deux choses ci-dessus pour être complet. La publication d'un Weibo ne nécessite qu'une simple opération de base de données,
mais les fans qui poussent. la base de données doit être exploitée 1 million de fois, ce qui oblige les utilisateurs à attendre un long délai avant d'envoyer un résultat Weibo avant que le résultat ne soit envoyé avec succès.
En utilisant la méthode de file d'attente, l'utilisateur envoie un message Weibo et le résultat est renvoyé immédiatement. Si l'envoi réussit, le push restant sera mis dans la file d'attente pour une exécution asynchrone.
Le push n'a pas besoin d'être effectué. particulièrement opportun, et le retard sera de quelques secondes ou dizaines de secondes est acceptable.

L'exemple de cet article partage le code spécifique de la classe de file d'attente de messages php beanstalkd pour votre référence. Le contenu spécifique est le suivant

<?php
namespace Common\Business;
/**
 * beanstalk: A minimalistic PHP beanstalk client.
 *
 * Copyright (c) 2009-2015 David Persson
 *
 * Distributed under the terms of the MIT License.
 * Redistributions of files must retain the above copyright notice.
 */
 
use RuntimeException;
 
/**
 * An interface to the beanstalk queue service. Implements the beanstalk
 * protocol spec 1.9. Where appropriate the documentation from the protocol
 * has been added to the docblocks in this class.
 *
 * @link https://github.com/kr/beanstalkd/blob/master/doc/protocol.txt
 */
class BeanStalk {
 
  /**
   * Minimum priority value which can be assigned to a job. The minimum
   * priority value is also the _highest priority_ a job can have.
   *
   * @var integer
   */
  const MIN_PRIORITY = 0;
 
  /**
   * Maximum priority value which can be assigned to a job. The maximum
   * priority value is also the _lowest priority_ a job can have.
   *
   * @var integer
   */
  const MAX_PRIORITY = 4294967295;
 
  /**
   * Holds a boolean indicating whether a connection to the server is
   * currently established or not.
   *
   * @var boolean
   */
  public $connected = false;
 
  /**
   * Holds configuration values.
   *
   * @var array
   */
  protected $_config = [];
 
  /**
   * The current connection resource handle (if any).
   *
   * @var resource
   */
  protected $_connection;
 
  /**
   * Constructor.
   *
   * @param array $config An array of configuration values:
   *    - `&#39;persistent&#39;` Whether to make the connection persistent or
   *             not, defaults to `true` as the FAQ recommends
   *             persistent connections.
   *    - `&#39;host&#39;`    The beanstalk server hostname or IP address to
   *             connect to, defaults to `127.0.0.1`.
   *    - `&#39;port&#39;`    The port of the server to connect to, defaults
   *             to `11300`.
   *    - `&#39;timeout&#39;`   Timeout in seconds when establishing the
   *             connection, defaults to `1`.
   *    - `&#39;logger&#39;`   An instance of a PSR-3 compatible logger.
   *
   * @link https://github.com/php-fig/fig-standards/blob/master/accepted/PSR-3-logger-interface.md
   * @return void
   */
  public function __construct(array $config = []) {
    $defaults = [
      &#39;persistent&#39; => true,
      &#39;host&#39; => &#39;127.0.0.1&#39;,
      &#39;port&#39; => 11300,
      &#39;timeout&#39; => 1,
      &#39;logger&#39; => null
    ];
    $this->_config = $config + $defaults;
  }
 
  /**
   * Destructor, disconnects from the server.
   *
   * @return void
   */
  public function __destruct() {
    $this->disconnect();
  }
 
  /**
   * Initiates a socket connection to the beanstalk server. The resulting
   * stream will not have any timeout set on it. Which means it can wait
   * an unlimited amount of time until a packet becomes available. This
   * is required for doing blocking reads.
   *
   * @see \Beanstalk\Client::$_connection
   * @see \Beanstalk\Client::reserve()
   * @return boolean `true` if the connection was established, `false` otherwise.
   */
  public function connect() {
    if (isset($this->_connection)) {
      $this->disconnect();
    }
    $errNum = &#39;&#39;;
    $errStr = &#39;&#39;;
    $function = $this->_config[&#39;persistent&#39;] ? &#39;pfsockopen&#39; : &#39;fsockopen&#39;;
    $params = [$this->_config[&#39;host&#39;], $this->_config[&#39;port&#39;], &$errNum, &$errStr];
 
    if ($this->_config[&#39;timeout&#39;]) {
      $params[] = $this->_config[&#39;timeout&#39;];
    }
    $this->_connection = @call_user_func_array($function, $params);
 
    if (!empty($errNum) || !empty($errStr)) {
      $this->_error("{$errNum}: {$errStr}");
    }
 
    $this->connected = is_resource($this->_connection);
 
    if ($this->connected) {
      stream_set_timeout($this->_connection, -1);
    }
    return $this->connected;
  }
 
  /**
   * Closes the connection to the beanstalk server by first signaling
   * that we want to quit then actually closing the socket connection.
   *
   * @return boolean `true` if diconnecting was successful.
   */
  public function disconnect() {
    if (!is_resource($this->_connection)) {
      $this->connected = false;
    } else {
      $this->_write(&#39;quit&#39;);
      $this->connected = !fclose($this->_connection);
 
      if (!$this->connected) {
        $this->_connection = null;
      }
    }
    return !$this->connected;
  }
 
  /**
   * Pushes an error message to the logger, when one is configured.
   *
   * @param string $message The error message.
   * @return void
   */
  protected function _error($message) {
    if ($this->_config[&#39;logger&#39;]) {
      $this->_config[&#39;logger&#39;]->error($message);
    }
  }
 
  public function errors()
  {
    return $this->_config[&#39;logger&#39;];
  }
  /**
   * Writes a packet to the socket. Prior to writing to the socket will
   * check for availability of the connection.
   *
   * @param string $data
   * @return integer|boolean number of written bytes or `false` on error.
   */
  protected function _write($data) {
    if (!$this->connected) {
      $message = &#39;No connecting found while writing data to socket.&#39;;
      throw new RuntimeException($message);
    }
 
    $data .= "\r\n";
    return fwrite($this->_connection, $data, strlen($data));
  }
 
  /**
   * Reads a packet from the socket. Prior to reading from the socket
   * will check for availability of the connection.
   *
   * @param integer $length Number of bytes to read.
   * @return string|boolean Data or `false` on error.
   */
  protected function _read($length = null) {
    if (!$this->connected) {
      $message = &#39;No connection found while reading data from socket.&#39;;
      throw new RuntimeException($message);
    }
    if ($length) {
      if (feof($this->_connection)) {
        return false;
      }
      $data = stream_get_contents($this->_connection, $length + 2);
      $meta = stream_get_meta_data($this->_connection);
 
      if ($meta[&#39;timed_out&#39;]) {
        $message = &#39;Connection timed out while reading data from socket.&#39;;
        throw new RuntimeException($message);
      }
      $packet = rtrim($data, "\r\n");
    } else {
      $packet = stream_get_line($this->_connection, 16384, "\r\n");
    }
    return $packet;
  }
 
  /* Producer Commands */
 
  /**
   * The `put` command is for any process that wants to insert a job into the queue.
   *
   * @param integer $pri Jobs with smaller priority values will be scheduled
   *    before jobs with larger priorities. The most urgent priority is
   *    0; the least urgent priority is 4294967295.
   * @param integer $delay Seconds to wait before putting the job in the
   *    ready queue. The job will be in the "delayed" state during this time.
   * @param integer $ttr Time to run - Number of seconds to allow a worker to
   *    run this job. The minimum ttr is 1.
   * @param string $data The job body.
   * @return integer|boolean `false` on error otherwise an integer indicating
   *     the job id.
   */
  public function put($pri, $delay, $ttr, $data) {
    $this->_write(sprintf("put %d %d %d %d\r\n%s", $pri, $delay, $ttr, strlen($data), $data));
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;INSERTED&#39;:
      case &#39;BURIED&#39;:
        return (integer) strtok(&#39; &#39;); // job id
      case &#39;EXPECTED_CRLF&#39;:
      case &#39;JOB_TOO_BIG&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * The `use` command is for producers. Subsequent put commands will put
   * jobs into the tube specified by this command. If no use command has
   * been issued, jobs will be put into the tube named `default`.
   *
   * @param string $tube A name at most 200 bytes. It specifies the tube to
   *    use. If the tube does not exist, it will be created.
   * @return string|boolean `false` on error otherwise the name of the tube.
   */
  public function useTube($tube) {
    $this->_write(sprintf(&#39;use %s&#39;, $tube));
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;USING&#39;:
        return strtok(&#39; &#39;);
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Pause a tube delaying any new job in it being reserved for a given time.
   *
   * @param string $tube The name of the tube to pause.
   * @param integer $delay Number of seconds to wait before reserving any more
   *    jobs from the queue.
   * @return boolean `false` on error otherwise `true`.
   */
  public function pauseTube($tube, $delay) {
    $this->_write(sprintf(&#39;pause-tube %s %d&#39;, $tube, $delay));
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;PAUSED&#39;:
        return true;
      case &#39;NOT_FOUND&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /* Worker Commands */
 
  /**
   * Reserve a job (with a timeout).
   *
   * @param integer $timeout If given specifies number of seconds to wait for
   *    a job. `0` returns immediately.
   * @return array|false `false` on error otherwise an array holding job id
   *     and body.
   */
  public function reserve($timeout = null) {
    if (isset($timeout)) {
      $this->_write(sprintf(&#39;reserve-with-timeout %d&#39;, $timeout));
    } else {
      $this->_write(&#39;reserve&#39;);
    }
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;RESERVED&#39;:
        return [
          &#39;id&#39; => (integer) strtok(&#39; &#39;),
          &#39;body&#39; => $this->_read((integer) strtok(&#39; &#39;))
        ];
      case &#39;DEADLINE_SOON&#39;:
      case &#39;TIMED_OUT&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Removes a job from the server entirely.
   *
   * @param integer $id The id of the job.
   * @return boolean `false` on error, `true` on success.
   */
  public function delete($id) {
    $this->_write(sprintf(&#39;delete %d&#39;, $id));
    $status = $this->_read();
 
    switch ($status) {
      case &#39;DELETED&#39;:
        return true;
      case &#39;NOT_FOUND&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Puts a reserved job back into the ready queue.
   *
   * @param integer $id The id of the job.
   * @param integer $pri Priority to assign to the job.
   * @param integer $delay Number of seconds to wait before putting the job in the ready queue.
   * @return boolean `false` on error, `true` on success.
   */
  public function release($id, $pri, $delay) {
    $this->_write(sprintf(&#39;release %d %d %d&#39;, $id, $pri, $delay));
    $status = $this->_read();
 
    switch ($status) {
      case &#39;RELEASED&#39;:
      case &#39;BURIED&#39;:
        return true;
      case &#39;NOT_FOUND&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Puts a job into the `buried` state Buried jobs are put into a FIFO
   * linked list and will not be touched until a client kicks them.
   *
   * @param integer $id The id of the job.
   * @param integer $pri *New* priority to assign to the job.
   * @return boolean `false` on error, `true` on success.
   */
  public function bury($id, $pri) {
    $this->_write(sprintf(&#39;bury %d %d&#39;, $id, $pri));
    $status = $this->_read();
 
    switch ($status) {
      case &#39;BURIED&#39;:
        return true;
      case &#39;NOT_FOUND&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Allows a worker to request more time to work on a job.
   *
   * @param integer $id The id of the job.
   * @return boolean `false` on error, `true` on success.
   */
  public function touch($id) {
    $this->_write(sprintf(&#39;touch %d&#39;, $id));
    $status = $this->_read();
 
    switch ($status) {
      case &#39;TOUCHED&#39;:
        return true;
      case &#39;NOT_TOUCHED&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Adds the named tube to the watch list for the current connection.
   *
   * @param string $tube Name of tube to watch.
   * @return integer|boolean `false` on error otherwise number of tubes in watch list.
   */
  public function watch($tube) {
    $this->_write(sprintf(&#39;watch %s&#39;, $tube));
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;WATCHING&#39;:
        return (integer) strtok(&#39; &#39;);
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Remove the named tube from the watch list.
   *
   * @param string $tube Name of tube to ignore.
   * @return integer|boolean `false` on error otherwise number of tubes in watch list.
   */
  public function ignore($tube) {
    $this->_write(sprintf(&#39;ignore %s&#39;, $tube));
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;WATCHING&#39;:
        return (integer) strtok(&#39; &#39;);
      case &#39;NOT_IGNORED&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /* Other Commands */
 
  /**
   * Inspect a job by its id.
   *
   * @param integer $id The id of the job.
   * @return string|boolean `false` on error otherwise the body of the job.
   */
  public function peek($id) {
    $this->_write(sprintf(&#39;peek %d&#39;, $id));
    return $this->_peekRead();
  }
 
  /**
   * Inspect the next ready job.
   *
   * @return string|boolean `false` on error otherwise the body of the job.
   */
  public function peekReady() {
    $this->_write(&#39;peek-ready&#39;);
    return $this->_peekRead();
  }
 
  /**
   * Inspect the job with the shortest delay left.
   *
   * @return string|boolean `false` on error otherwise the body of the job.
   */
  public function peekDelayed() {
    $this->_write(&#39;peek-delayed&#39;);
    return $this->_peekRead();
  }
 
  /**
   * Inspect the next job in the list of buried jobs.
   *
   * @return string|boolean `false` on error otherwise the body of the job.
   */
  public function peekBuried() {
    $this->_write(&#39;peek-buried&#39;);
    return $this->_peekRead();
  }
 
  /**
   * Handles response for all peek methods.
   *
   * @return string|boolean `false` on error otherwise the body of the job.
   */
  protected function _peekRead() {
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;FOUND&#39;:
        return [
          &#39;id&#39; => (integer) strtok(&#39; &#39;),
          &#39;body&#39; => $this->_read((integer) strtok(&#39; &#39;))
        ];
      case &#39;NOT_FOUND&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Moves jobs into the ready queue (applies to the current tube).
   *
   * If there are buried jobs those get kicked only otherwise delayed
   * jobs get kicked.
   *
   * @param integer $bound Upper bound on the number of jobs to kick.
   * @return integer|boolean False on error otherwise number of jobs kicked.
   */
  public function kick($bound) {
    $this->_write(sprintf(&#39;kick %d&#39;, $bound));
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;KICKED&#39;:
        return (integer) strtok(&#39; &#39;);
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * This is a variant of the kick command that operates with a single
   * job identified by its job id. If the given job id exists and is in a
   * buried or delayed state, it will be moved to the ready queue of the
   * the same tube where it currently belongs.
   *
   * @param integer $id The job id.
   * @return boolean `false` on error `true` otherwise.
   */
  public function kickJob($id) {
    $this->_write(sprintf(&#39;kick-job %d&#39;, $id));
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;KICKED&#39;:
        return true;
      case &#39;NOT_FOUND&#39;:
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /* Stats Commands */
 
  /**
   * Gives statistical information about the specified job if it exists.
   *
   * @param integer $id The job id.
   * @return string|boolean `false` on error otherwise a string with a yaml formatted dictionary.
   */
  public function statsJob($id) {
    $this->_write(sprintf(&#39;stats-job %d&#39;, $id));
    return $this->_statsRead();
  }
 
  /**
   * Gives statistical information about the specified tube if it exists.
   *
   * @param string $tube Name of the tube.
   * @return string|boolean `false` on error otherwise a string with a yaml formatted dictionary.
   */
  public function statsTube($tube) {
    $this->_write(sprintf(&#39;stats-tube %s&#39;, $tube));
    return $this->_statsRead();
  }
 
  /**
   * Gives statistical information about the system as a whole.
   *
   * @return string|boolean `false` on error otherwise a string with a yaml formatted dictionary.
   */
  public function stats() {
    $this->_write(&#39;stats&#39;);
    return $this->_statsRead();
  }
 
  /**
   * Returns a list of all existing tubes.
   *
   * @return string|boolean `false` on error otherwise a string with a yaml formatted list.
   */
  public function listTubes() {
    $this->_write(&#39;list-tubes&#39;);
    return $this->_statsRead();
  }
 
  /**
   * Returns the tube currently being used by the producer.
   *
   * @return string|boolean `false` on error otherwise a string with the name of the tube.
   */
  public function listTubeUsed() {
    $this->_write(&#39;list-tube-used&#39;);
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;USING&#39;:
        return strtok(&#39; &#39;);
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Returns a list of tubes currently being watched by the worker.
   *
   * @return string|boolean `false` on error otherwise a string with a yaml formatted list.
   */
  public function listTubesWatched() {
    $this->_write(&#39;list-tubes-watched&#39;);
    return $this->_statsRead();
  }
 
  /**
   * Handles responses for all stat methods.
   *
   * @param boolean $decode Whether to decode data before returning it or not. Default is `true`.
   * @return array|string|boolean `false` on error otherwise statistical data.
   */
  protected function _statsRead($decode = true) {
    $status = strtok($this->_read(), &#39; &#39;);
 
    switch ($status) {
      case &#39;OK&#39;:
        $data = $this->_read((integer) strtok(&#39; &#39;));
        return $decode ? $this->_decode($data) : $data;
      default:
        $this->_error($status);
        return false;
    }
  }
 
  /**
   * Decodes YAML data. This is a super naive decoder which just works on
   * a subset of YAML which is commonly returned by beanstalk.
   *
   * @param string $data The data in YAML format, can be either a list or a dictionary.
   * @return array An (associative) array of the converted data.
   */
  protected function _decode($data) {
    $data = array_slice(explode("\n", $data), 1);
    $result = [];
 
    foreach ($data as $key => $value) {
      if ($value[0] === &#39;-&#39;) {
        $value = ltrim($value, &#39;- &#39;);
      } elseif (strpos($value, &#39;:&#39;) !== false) {
        list($key, $value) = explode(&#39;:&#39;, $value);
        $value = ltrim($value, &#39; &#39;);
      }
      if (is_numeric($value)) {
        $value = (integer) $value == $value ? (integer) $value : (float) $value;
      }
      $result[$key] = $value;
    }
    return $result;
  }
}
 
?>
.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn