search
HomeBackend DevelopmentPHP TutorialRabbitMQ与PHP(二)—— 相关服务安装及怎么用PHP作为守护模式处理消息

RabbitMQ与PHP(二)—— 相关服务安装及如何用PHP作为守护模式处理消息

在上一节中,详细介绍了RabbitMQ的exchange/routingkey/queue等概念,以及示例了如何使用PHP发送和处理消息的代码。这一节,将介绍在项目中如何使用PHP多线程的进行消息实时处理,以及简要介绍一些RabbitMQ的安装相关。熟悉的可以将安装这部分跳过。

一、RabbitMQ的安装:

需要首先安装erlang

#安装jdk环境sudo apt-get install openjdk-7-jdk#安装相关类库和工具包sudo apt-get install libncurses5-dev m4 fop freeglut3-dev libwxgtk2.8-dev g++ libssl-dev xsltproc build-essential tk8.5  unixodbc unixodbc-dev libxml2-utils#下载erlang安装包并安装wget http://www.erlang.org/download/otp_src_R16B03-1.tar.gztar -xzvf otp_src_R16B03-1.tar.gzcd otp_src_R16B03-1./configure --prefix=/usr/local/erlangmakesudo make install

注意:?这里将erlang安装到了指定的目录:?/usr/local/erlang,而不是使用默认的路径。这是一个好的习惯,对于版本控制等都会有好处。但是这会导致后面?rabbitMQ报错:找不到erl?执行文件,需要多做一些处理才行。

更新环境变量:

sudo vi /etc/profile

在最后一行加上

export PATH=/usr/local/erlang/bin:$PATH

保存退出后

source /etc/profile

然后到命令行中输入erl看是否安装成功

安装RabbitMQ

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.2.3/rabbitmq-server-generic-unix-3.2.3.tar.gztar -xzvf rabbitmq-server-generic-unix-3.2.3.tar.gzmv rabbitmq_server-3.2.3/ /usr/local/cd /usr/local/rabbitmq_server-3.2.3/sbin./rabbitmq-server

到这里如果出现一个报错信息:?./rabbitmq-server:?line?86:?erl:?command?not?found??这是因为erlang指定了安装路径,在系统的PATH中找不到。只要export?PATH=$PATH:/usr/local/erlang/bin?就可以了。

如果为了rc.local启动方便,可以将?export?PATH=$PATH:/usr/local/erlang/bin?这一行写入到?rabbitmq-server?文件中:

1f178a82b9014a90f9dd87cda8773912b21beedb

执行后,ps?-aux?一下,看到进程中有/usr/local/erlang/lib/erlang/erts-5.10.1/bin/epmd?-daemon?和?/usr/local/erlang/lib/erlang/erts-5.10.1/bin/beam.smp?就OK了。

sbin目录下还有一个脚本:?rabbitmqctl?也很常用,与?rabbitmq-server?一样需要指明erlang的路径才能正确工作。

常用的方法:

rabbitmqctl?start_app?启动后,执行一下这个比较保险rabbitmqctl?list_exchanges?显示当前所有的交换机rabbitmqctl?list_queue?查看当前有效队列情况

二、 PHP?extension的安装:

PHP操作rabbitmq需要AMQP扩展的支持。下载扩展:?http://pecl.php.net/package/amqp?,安装过程与一般扩展一样,

/usr/local/php/bin/phpize./configure?--with-php-config=/usr/local/php/bin/php-configsudo make?&&?make?install

在编译安装过程中如果报错信息,可参考这篇文章:Ubuntu 12.04安装RabbitMQ的PHP扩展出现的问题及解决办法。

然后编辑php.ini?插入:

[amqp]extension?=?amqp.so

重启apache或nginx,查看phpinfo其中有关于anqp的段落,就OK了。

0df431adcbef760936f25aef2fdda3cc7dd99ec1

三、如何使用PHP进行实时后端消息处理

首先,要保证PHP文件可以正确的以堵塞方式处理消息。代码可参见上一节RabbitMQ与PHP(一)。

然后,我们来借助Python实现一个多线程的守护进程,由这个守护进程来调用PHP,把PHP作为工作线程。

启动脚本:start_jobs.py

# _*_ coding:utf-8 _*_'''yoka at 实现多线程处理任务的守护进程Created on 2012-4-7@author: xwarrior@update: [email protected]'''#在此引入项目需要的数据包from MyJobs import MyJobsfrom MyThread import MyThreadimport loggingimport timedef main():    logger = logging.getLogger('main()')    logger.info('server start!')    worker_threads = 2 #定义需要启动的线程数量    timeline = 2 #线程检查时间间隔,秒    thread_pool = {}    for i in range(0, worker_threads ):        param = 'some param'        job = MyJobs( param )        thread = MyThread( job, i )        thread.setDaemon = True        thread.start()        logger.info('start thread %s' %( thread.getName() ))        thread_pool[i] = thread    #干完就结束模式    #for eachKey in thread_pool.keys():    # thread_pool[eachKey].join()    #保持线程数量模式    while 1:        time.sleep(timeline)        # 检查每一个线程        for eachKey in thread_pool.keys():            if thread_pool[eachKey].isAlive():                print 'thread alive:' + str(i)            else:                print 'thread down:' + str(i)                thread_pool[eachKey].run()    logger.info('main exist!')    returnif __name__ == '__main__':    #init config format    FORMAT = '%(asctime)-15s %(name)s %(levelname)s file %(filename)s:lineno %(lineno)s - %(message)s'    logging.basicConfig(format=FORMAT,level=logging.INFO)    main()    pass

线程脚本:?MyThread.py

# _*_ coding:utf-8 _*_'''Created on 2013-03-25@author: [email protected]'''from threading import Threadclass MyThread(Thread):    '''    创建线程    '''    def __init__(self,job,thread_id):        '''        Constructor        '''        self.job = job        Thread.__init__(self, name = 'my_thread_%s' %(thread_id))    def run(self):        self.job.run()    def stop(self):        self.job.exit()

任务脚本:?MyJobs.py

# _*_ coding:utf-8 _*_'''Created on 2013-03-25@author: [email protected]'''import osimport urllib2class MyJobs(object):    def __init__(self, param ):        #do something        self.param = param    def __del__(self):        ''' destruct '''        self.exit()    def exit(self):        ''' 退出'''        self.quit = True    def run(self):        ''' 开始处理 '''        #使用shell模式        #cmd = '/usr/bin/curl "http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param) + '"'        cmd = '/usr/local/php/bin/php -c /usr/local/php/lib/nginx.ini /home/jimmy/at/DocumentRoot/try/amqp_consume.php ' + str(self.param)        re = os.system(cmd)        #使用web模式        #req = urllib2.Request('http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param))        #response = urllib2.urlopen(req)        #re = response.read()        #print re

在任务调度(start_jobs.py)中,设计了两种工作模式:

一种工作模式是一共启动N个线程去干活,适合于尽快完成一个大任务;

另一种是保持进程数量,当发现某个进程完成后,再重新将进程启动起来。显然,用户守护处理消息适合这种模式。

具体工作在MyJob.py中,提供了系统Shell调用和采用URL调用两种方式。推荐使用shell直接调用php的方式,这样可以灵活控制Php.ini,比如增加auto_prepend_file、增长max_execution_time等。

实际项目中,假定有5种类型的消息,可以启动20个线程,将thread_id当作参数传递给PHP。PHP将thread_id%5当作待处理类型,就可以得到每种类型有4个线程工作的场景了。

考虑到PHP的执行时间限制及内存泄露问题,可以将consume.php脚本进行一下改进,让PHP脚本每次处理指定数量的消息后就退出,由Python多线程框架重新启动线程,以保证运行稳定可靠。另外,将应答改为手工应答,确保消息获得正确有效处理。

/$q->consume('processMessage'); //需手动应答/*** 消费回调函数*/function processMessage($envelope, $queue) {    global $counter;    $msg = $envelope->getBody();    echo $msg."\n"; //处理消息    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答    if($counter++ > 5)return FALSE; //处理5个消息后退出}

用?两个线程,检查间隔2秒,SHELL模式?测试运行结果:

f703738da9773912b0eab410f9198618377ae2f6

由上图可见,运行开始后,检查2个线程都处于活跃状态,并对消息进行了正确处理,当处理到一定数量后,PHP程序结束,父进程检查到有进程处于完成状态,重新将其启动(第二个绿色框)。完全与预期相符。

?

http://nonfu.me/p/8838.html

Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
在Go语言中使用RabbitMQ:完整指南在Go语言中使用RabbitMQ:完整指南Jun 19, 2023 am 08:10 AM

随着现代应用程序的复杂性增加,消息传递已成为一种强大的工具。在这个领域,RabbitMQ已成为一个非常受欢迎的消息代理,可以用于在不同的应用程序之间传递消息。在这篇文章中,我们将探讨如何在Go语言中使用RabbitMQ。本指南将涵盖以下内容:RabbitMQ简介RabbitMQ安装RabbitMQ基础概念Go语言中的RabbitMQ入门RabbitMQ和Go

SpringBoot怎么整合RabbitMQ实现延迟队列SpringBoot怎么整合RabbitMQ实现延迟队列May 16, 2023 pm 08:31 PM

如何保证消息不丢失rabbitmq消息投递路径生产者->交换机->队列->消费者总的来说分为三个阶段。1.生产者保证消息投递可靠性。2.mq内部消息不丢失。3.消费者消费成功。什么是消息投递可靠性简单点说就是消息百分百发送到消息队列中。我们可以开启confirmCallback生产者投递消息后,mq会给生产者一个ack.根据ack,生产者就可以确认这条消息是否发送到mq.开启confirmCallback修改配置文件#NONE:禁用发布确认模式,是默认值,CORRELATED:

go-zero与RabbitMQ的应用实践go-zero与RabbitMQ的应用实践Jun 23, 2023 pm 12:54 PM

现在越来越多的企业开始采用微服务架构模式,而在这个架构中,消息队列成为一种重要的通信方式,其中RabbitMQ被广泛应用。而在go语言中,go-zero是近年来崛起的一种框架,它提供了很多实用的工具和方法,让开发者更加轻松地使用消息队列,下面我们将结合实际应用,来介绍go-zero和RabbitMQ的使用方法和应用实践。1.RabbitMQ概述Rabbit

Java中的Runnable和Thread的区别有哪些?Java中的Runnable和Thread的区别有哪些?May 07, 2023 pm 05:19 PM

在java中可有两种方式实现多线程,一种是继承Thread类,一种是实现Runnable接口;Thread类是在java.lang包中定义的。一个类只要继承了Thread类同时覆写了本类中的run()方法就可以实现多线程操作了,但是一个类只能继承一个父类,这是此方法的局限。下面看例子:packageorg.thread.demo;classMyThreadextendsThread{privateStringname;publicMyThread(Stringname){super();this

Swoole与RabbitMQ集成实践:打造高可用性消息队列系统Swoole与RabbitMQ集成实践:打造高可用性消息队列系统Jun 14, 2023 pm 12:56 PM

随着互联网时代的到来,消息队列系统变得越来越重要。它可以使不同的应用之间实现异步操作、降低耦合度、提高可扩展性,进而提升整个系统的性能和用户体验。在消息队列系统中,RabbitMQ是一个强大的开源消息队列软件,它支持多种消息协议、被广泛应用于金融交易、电子商务、在线游戏等领域。在实际应用中,往往需要将RabbitMQ和其他系统进行集成。本文将介绍如何使用sw

SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列May 15, 2023 pm 03:28 PM

简介RabbitMQ消息简介RabbitMQ的消息默认不会超时。什么是死信队列?什么是延迟队列?死信队列:DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。以下几种情况会导致消息变成死信:消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;消息过期;队

PHP开发:使用 RabbitMQ 实现任务队列PHP开发:使用 RabbitMQ 实现任务队列Jun 15, 2023 pm 05:33 PM

随着互联网的不断发展,网站的流量越来越大,访问量的增长带来的问题也越来越多。当用户量过大时,服务器负载会增大,这时就需要使用一些技术手段来解决这些问题。任务队列就是其中的一种方式,可以将一些耗时的操作异步执行,从而缓解服务器压力。本文将介绍如何使用RabbitMQ实现任务队列。一、什么是RabbitMQRabbitMQ是一个开源的消息中间件,它实现了

怎么用SpringBoot+RabbitMQ实现消息可靠传输怎么用SpringBoot+RabbitMQ实现消息可靠传输May 29, 2023 pm 10:34 PM

环境配置SpringBoot整合RabbitMQ实现消息的发送。1.添加maven依赖org.springframework.bootspring-boot-starterorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-amqp2.添加application.yml配置文件spring:rabbitmq:host:192.168.3.19port:5672user

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

Repo: How To Revive Teammates
1 months agoBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
2 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island Adventure: How To Get Giant Seeds
1 months agoBy尊渡假赌尊渡假赌尊渡假赌

Hot Tools

SublimeText3 Linux new version

SublimeText3 Linux new version

SublimeText3 Linux latest version

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

EditPlus Chinese cracked version

EditPlus Chinese cracked version

Small size, syntax highlighting, does not support code prompt function

WebStorm Mac version

WebStorm Mac version

Useful JavaScript development tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor