Home >Backend Development >PHP Tutorial >PHP memcache implements message queue example_PHP tutorial
Nowadays, memcache is widely used in server cache. Let me introduce an example of memcache implementing message queue waiting. Friends who need to know more can refer to it.
The principle of memche message queue is to make a fuss about the key, which is used to make a continuous number plus a prefix to record the message or log after serialization. Then the content is dropped to the file or database through a scheduled program.
The use of php to implement message queues. For example, if it is time-consuming to send a large number of emails when sending emails, you can use queues.
The lightweight queue server that facilitates queue implementation is:
starling is a lightweight persistence server that supports the memcache protocol
https://github.com/starling/starling
Beanstalkd is lightweight and efficient , supports persistence, and can handle about 3,000 queues per second
http://kr.github.com/beanstalkd/
Memcache/memcached can also be used to implement message queues in php.
The code is as follows | Copy code | ||||
|
Message queue implemented based on PHP shared memory:
The code is as follows | Copy code | ||||
|
For a large message queue, frequent serialization and deserialization of a large database is too time-consuming. Below is a message queue I implemented using PHP. I only need to insert a piece of data at the tail and operate the tail. There is no need to operate the entire message queue to read and operate. However, this message queue is not thread-safe, I just try to avoid the possibility of conflicts. If the messages are not very dense, for example, only one message every few seconds, you can still consider using it this way.
If you want to achieve thread safety, one suggestion is to lock through the file and then operate. The code is as follows:
The code is as follows:
代码如下 | 复制代码 |
class Memcache_Queue { private $memcache; private $name; private $prefix; function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__") { if ($memcache == null) { throw new Exception("memcache object is null, new the object first."); } $this->memcache = $memcache; $this->name = $name; $this->prefix = $prefix; $this->maxSize = $maxSize; $this->front = 0; $this->real = 0; $this->size = 0; } function __get($name) { return $this->get($name); } function __set($name, $value) { $this->add($name, $value); return $this; } function isEmpty() { return $this->size == 0; } function isFull() { return $this->size == $this->maxSize; } function enQueue($data) { if ($this->isFull()) { throw new Exception("Queue is Full"); } $this->increment("size"); $this->set($this->real, $data); $this->set("real", ($this->real + 1) % $this->maxSize); return $this; } function deQueue() { if ($this->isEmpty()) { throw new Exception("Queue is Empty"); } $this->decrement("size"); $this->delete($this->front); $this->set("front", ($this->front + 1) % $this->maxSize); return $this; } function getTop() { return $this->get($this->front); } function getAll() { return $this->getPage(); } function getPage($offset = 0, $limit = 0) { if ($this->isEmpty() || $this->size < $offset) { return null; } $keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize); $num = 1; for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); $num++; if ($limit > 0 && $limit == $num) { break; } } return array_values($this->memcache->get($keys)); } function makeEmpty() { $keys = $this->getAllKeys(); foreach ($keys as $value) { $this->delete($value); } $this->delete("real"); $this->delete("front"); $this->delete("size"); $this->delete("maxSize"); } private function getAllKeys() { if ($this->isEmpty()) { return array(); } $keys[] = $this->getKeyByPos($this->front); for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); } return $keys; } private function add($pos, $data) { $this->memcache->add($this->getKeyByPos($pos), $data); return $this; } private function increment($pos) { return $this->memcache->increment($this->getKeyByPos($pos)); } private function decrement($pos) { $this->memcache->decrement($this->getKeyByPos($pos)); } private function set($pos, $data) { $this->memcache->set($this->getKeyByPos($pos), $data); return $this; } private function get($pos) { return $this->memcache->get($this->getKeyByPos($pos)); } private function delete($pos) { return $this->memcache->delete($this->getKeyByPos($pos)); } private function getKeyByPos($pos) { return $this->prefix . $this->name . $pos; } } |