Installation and use of php multi-threaded pthreads

Installation and use of php multi-threaded pthreads

Installing Pthreads basically requires recompiling PHP, adding the --enable-maintainer-zts parameter, but using This document is very sparse; there will be many bugs and unexpected problems, and the production environment can only be haha, so just play around with this thing, for real multi-threading, use Python, C, etc.

Most of the following codes come from the Internet

1. Installation

The one used here is php-7.0.2

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \

make && make install

Install pthreads

pecl install pthreads

2. Thread

$thread = new class extends Thread {
	public function run() {
		echo "Hello World {$this->getThreadId()}\n";                                                                                  

$thread->start() && $thread->join();


class workerThread extends Thread { 
	public function __construct($i){

	public function run(){
			echo $this->i."\n";

	$workers[$i]=new workerThread($i);


3. Worker and Stackable

<p class="programlisting">Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.</p>
class SQLQuery extends Stackable {

	public function __construct($sql) {
		$this->sql = $sql;

	public function run() {
		$dbh  = $this->worker->getConnection();
		$row = $dbh->query($this->sql);
		while($member = $row->fetch(PDO::FETCH_ASSOC)){


class ExampleWorker extends Worker {
	public static $dbh;
	public function __construct($name) {

	public function run(){
		self::$dbh = new PDO('mysql:host=;dbname=testdb','root','123456');
	public function getConnection(){
		return self::$dbh;

$worker = new ExampleWorker("My Worker Thread");

$sql1 = new SQLQuery('select * from test order by id desc limit 1,5');

$sql2 = new SQLQuery('select * from test order by id desc limit 5,5');


4. Mutex lock

Under what circumstances is a mutex lock used? It can be used when you need to control multiple threads and only one thread can work at the same time. A simple counter program to illustrate the difference with or without a mutex

$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
		$this->handle = fopen("/tmp/counter.txt", "w+");
	public function __destruct(){
	public function run() {

		$counter = intval(fgets($this->handle));
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);


for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();


$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);


for ($i=0;$i<50;$i++){


Multiple threads and shared memory

In the example of shared memory, no lock is used and it may still work normally. It is possible that the working memory operation itself has the function of locking

$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
for ($i=0;$i<100;$i++){


for ($i=0;$i<100;$i++){
shm_remove( $shmid );
shm_detach( $shmid );

5. Thread synchronization

In some scenarios, we don’t want thread->start() to start running the program, but we want the thread to wait for our command. The test function of $thread->wait(); is that the thread will not run immediately after thread->start(). It will only run after receiving the signal from $thread->notify();

$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
		$this->shmid = $shmid;
	public function run() {

				}, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
for ($i=0;$i<100;$i++){


for ($i=0;$i<100;$i++){
			}, $threads[$i]);

for ($i=0;$i<100;$i++){
shm_remove( $shmid );
shm_detach( $shmid );

6. Thread pool

A Pool class

class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;

    public function run() {

	if(strlen($this->row['bankno']) > 100 ){
		$bankno = safenet_decrypt($this->row['bankno']);
		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

		$this->sql = $sql;



class Pool {
	public $pool = array();
	public function __construct($count) {
		$this->count = $count;
	public function push($row){
		if(count($this->pool) < $this->count){
			$this->pool[] = new Update($row);
			return true;
			return false;
	public function start(){
		foreach ( $this->pool as $id => $worker){
	public function join(){
		foreach ( $this->pool as $id => $worker){
	public function clean(){
		foreach ( $this->pool as $id => $worker){
			if(! $worker->isRunning()){

try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(

	$sql  = "select id,bankno from members order by id desc";
	$row = $dbh->query($sql);
	$pool = new Pool(5);
	while($member = $row->fetch(PDO::FETCH_ASSOC))

			if($pool->push($member)){ //压入任务到池中
			}else{ //如果池已经满,就开始启动线程

	$dbh = null;

} catch (Exception $e) {
    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";

Dynamic queue thread pool

The above example is to execute start when the thread pool is full. The following example is to create a new thread as soon as there is free space in the thread pool.

class Update extends Thread {

	public $running = false;
	public $row = array();
	public function __construct($row) {

		$this->row = $row;
		$this->sql = null;

	public function run() {

		if(strlen($this->row['bankno']) > 100 ){
			$bankno = safenet_decrypt($this->row['bankno']);
			$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
			file_put_contents("bankno_error.log", $error, FILE_APPEND);

		if( strlen($bankno) > 7 ){
			$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

			$this->sql = $sql;



try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(

	$sql     = "select id,bankno from members order by id desc limit 50";

	$row = $dbh->query($sql);
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
		$id 	= $member['id'];
		while (true){
			if(count($pool) < 5){
				$pool[$id] = new Update($member);
				foreach ( $pool as $name => $worker){
					if(! $worker->isRunning()){


	$dbh = null;

} catch (Exception $e) {
	echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";

pthreads Pool class


class WebWorker extends Worker {

	public function __construct(SafeLog $logger) {
		$this->logger = $logger;

	protected $loger;

class WebWork extends Stackable {

	public function isComplete() {
		return $this->complete;

	public function run() {
			->log("%s executing in Thread #%lu",
					__CLASS__, $this->worker->getThreadId());
		$this->complete = true;

	protected $complete;

class SafeLog extends Stackable {

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf(
					"{$message}\n", $args);

$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());

		return $work->isComplete();


7. Multi-threaded file safe reading and writing

  • LOCK_SH obtains shared lock (reading program)

  • LOCK_EX obtains an exclusive lock (written program

  • LOCK_UN releases the lock (whether shared or exclusive)

  • LOCK_NB if you don’t want flock() to block on lock


$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) {  // 进行排它型锁定
	ftruncate($fp, 0);      // truncate file
	fwrite($fp, "Write something here\n");
	fflush($fp);            // flush output before releasing the lock
	flock($fp, LOCK_UN);    // 释放锁定
} else {
	echo "Couldn't get the lock!";

$fp = fopen('/tmp/lock.txt', 'r+');
if(!flock($fp, LOCK_EX | LOCK_NB)) {
	echo 'Unable to obtain lock';

8. Multi-threading and data connection

When using pthreads and pdo at the same time, one thing to note is that you need to statically declare public static $dbh; and access the database connection through singleton mode.

Worker and PDO

class Work extends Stackable {

        public function __construct() {

        public function run() {
                $dbh  = $this->worker->getConnection();
                $sql     = "select id,name from members order by id desc limit 50";
                $row = $dbh->query($sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){


class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {

        * The run method should just prepare the environment for the work that is coming ...
        public function run(){
                self::$dbh = new PDO('mysql:host=;dbname=example','www','123456');
        public function getConnection(){
                return self::$dbh;

$worker = new ExampleWorker("My Worker Thread");

$work=new Work();


Pool and PDO

Connect the database in the thread pool

# cat pool.php
class ExampleWorker extends Worker {

	public function __construct(Logging $logger) {
		$this->logger = $logger;

	protected $logger;

/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($number) {
		$this->number = $number;
	public function run() {
                $dbhost = 'db.example.com';               // 数据库服务器
                $dbuser = 'example.com';                 // 数据库用户名
                $dbpw = 'password';                               // 数据库密码
                $dbname = 'example_real';
		$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
                        PDO::MYSQL_ATTR_COMPRESS => true,
		$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";
		#echo $sql;
		$row = $dbh->query($sql);
		$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);

			$row = null;

			$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";
		$dbh = null;
		printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);


class Logging extends Stackable {
	protected  static $dbh;
	public function __construct() {
		$dbhost = 'db.example.com';			// 数据库服务器
	        $dbuser = 'example.com';                 // 数据库用户名
        	$dbpw = 'password';                               // 数据库密码
		$dbname = 'example_real';			// 数据库名

		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);

	protected function getConnection(){
                return self::$dbh;

$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);

$dbhost = 'db.example.com';                      // 数据库服务器
$dbuser = 'example.com';                 // 数据库用户名
$dbpw = 'password';                               // 数据库密码
$dbname = 'db_example';
$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
                        PDO::MYSQL_ATTR_COMPRESS => true
$sql = "select `order`,name from accounts where deposit_time is null order by id desc";

$row = $dbh->query($sql);
while($account = $row->fetch(PDO::FETCH_ASSOC))
        $pool->submit(new Work($account));



To further improve the above program, we use the singleton mode $this->worker->getInstance(); to only make a database connection globally, and the thread uses a shared database connection

class ExampleWorker extends Worker {

	#public function __construct(Logging $logger) {
	#	$this->logger = $logger;

	#protected $logger;
	protected  static $dbh;
	public function __construct() {

	public function run(){
		$dbhost = 'db.example.com';			// 数据库服务器
	    $dbuser = 'example.com';        	// 数据库用户名
        $dbpw = 'password';             	// 数据库密码
		$dbname = 'example';				// 数据库名

		self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

	protected function getInstance(){
        return self::$dbh;


/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($data) {
		$this->data = $data;

	public function run() {
		#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );

		try {
			$dbh  = $this->worker->getInstance();
               		$id = $this->data['id'];
			$mobile = safenet_decrypt($this->data['mobile']);
			#printf("%d, %s \n", $id, $mobile);
			if(strlen($mobile) > 11){
				$mobile = substr($mobile, -11);
			if($mobile == 'null'){
			#	$sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";
			#	printf("%s\n",$sql);
			#	$dbh->query($sql);
				$mobile = '';
				$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
				$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";
			$sth = $dbh->prepare($sql);
			$sth->bindValue(':mobile', $mobile);
			$sth->bindValue(':id', $id);
			#echo $sth->debugDumpParams();
		catch(PDOException $e) {
			$error = sprintf("%s,%s\n", $mobile, $id );
			file_put_contents("mobile_error.log", $error, FILE_APPEND);

		#$dbh = null;
		printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);
		#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);

$pool = new Pool(100, \ExampleWorker::class, []);

#foreach (range(0, 100) as $number) {
#	$pool->submit(new Work($number));

$dbhost = 'db.example.com';                     // 数据库服务器
$dbuser = 'example.com';                 		// 数据库用户名
$dbpw = 'password';                             // 数据库密码
$dbname = 'example';
$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(
                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
                        PDO::MYSQL_ATTR_COMPRESS => true

#$sql = "select id, mobile from members where id < :id";
#$sth = $dbh->prepare($sql);
#$result = $sth->fetchAll();
#$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(':mobile', 'aa');
#echo $sth->execute();
#echo $sth->queryString;
#echo $sth->debugDumpParams();

$sql = "select id, mobile from members order by id asc"; // limit 1000";
$row = $dbh->query($sql);
while($members = $row->fetch(PDO::FETCH_ASSOC))
        #$order =  $account['order'];
        $pool->submit(new Work($members));



Summary of operating databases in multi-threads

In general, pthreads is still under development and there are still some shortcomings. We can also see that pthreads’ git is constantly improving the project

Persistent database connections are important, otherwise each thread will open a database connection once and then close it, resulting in many link timeouts.

$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(


