Home  >  Article  >  Backend Development  >  AMAZON SQS(一)PHP Producer

AMAZON SQS(一)PHP Producer

WBOY
WBOYOriginal
2016-06-13 12:23:251680browse

AMAZON SQS(1)PHP Producer
AMAZON SQS(1)PHP Producer

1. Some Basic Information
Price: 1 million 2$
Send Throughput: 
Receive Throughput:
Message Latency:  500 ms
Message Size:  256 KB
Batch Size: 10 message

2. Env Set Up
Install composer
> curl -sS https://getcomposer.org/installer | php

Install the latest Package
> php composer.phar require aws/aws-sdk-php

this will install all the dependencies
> php composer.phar install

Here is my composer.json which will identify the dependencies.
{
    "require": {
        "aws/aws-sdk-php": "3.0.6",
        "predis/predis": "1.0.1"  
    }
}

And command below will do the magic, something maven or ivy
>php composer.phar install

Some import classes will be as follow,
SQSConnector.inc

require 'vendor/autoload.php';

use Aws\Sqs\SqsClient;

class SQSConnector {
const queueURL = "https://sqs.us-east-1.amazonaws.com/xx_test";

protected $client = null;

function __construct(){
try {
$this->client = SqsClient::factory(array(
'region'  => 'us-east-1',
'version' => 'latest'
));
echo "Successfully connected to SQS\n";
} catch (Exception $e) {
    echo "Couldn't connected to SQS";
    echo $e->getMessage();
}
}
}

A time track class which can provide us more detail about the time consuming during the process, TimeTracker.inc

class TimeTracker implements Countable {

protected $count = 0;
protected $distribution;
protected $distributionPrecision = 0;
protected $startTime;
protected $totalTime;
protected $name;

public function __construct($name, $distributionPrecision = 2) {
$this->name = $name;
$this->distribution = array();
$this->distributionPrecision = $distributionPrecision;
$this->start();
}

public function count() {
return $this->count;
}

public function start($time = null) {
return $this->startTime = ($time === null) ? microtime(true) : $time;
}

public function stop($count = 1, $time = null) {
$interval = (($time === null) ? microtime(true) : $time) - $this->startTime;
$this->totalTime += $interval;
$key = (string)round($interval, $this->distributionPrecision);
if  (!array_key_exists($key, $this->distribution)) $this->distribution[$key] = 0;
$this->distribution[$key]++;
$this->count += $count;
return $interval;
}

public function __toString() {
ksort($this->distribution);
//return "{$this->name}: " . print_r($this->distribution, true) .
return "{$this->name}: {$this->count}m " . round($this->totalTime,2) . 's ' .
round($this->count/$this->totalTime,1) . "m/s\n";
}

}

Async Send the message via PHP in SQSSender.php
#!/usr/bin/php

require_once 'TimeTracker.inc';
require_once 'SQSConnector.inc';
use GuzzleHttp\Promise;

define('MSGNUM_REQUEST', 10);
define('MAXSIZE_REQUEST', 262144);

class SQSSender extends SQSConnector {
function sendMessage($job) {
$timer = new TimeTracker('sendMessage');
$promise_array = array();

if(strlen($job) >= MAXSIZE_REQUEST)
{
echo "The size of message is larger than 256KB\n";
var_dump($job);
return;
}

$promise = $this->client->sendMessageAsync(array(
'QueueUrl' => self::queueURL,
'MessageBody' => $job,
));
$promise->wait();
$timer->stop();

print $timer;
}

function sendMessageBatch($jobs, $timer) {
$promise_array = array();


$id=1;
$message = '';
$msg_num = 0;
foreach($jobs as $job)
{
$maxsize_msg = (int)(MAXSIZE_REQUEST/MSGNUM_REQUEST);
$tmp_msg = $message.$job;
if(strlen($job) >= $maxsize_msg)
{
$this->sendMessage($job);
}
else if(strlen($tmp_msg) >= $maxsize_msg)
{
$entries[] = array(
'Id' => $id,
'MessageBody' => $message
);
$id++;
$message = $job;
if($id > 10){
$promise = $this->client->sendMessageBatchAsync(array(
'QueueUrl' => self::queueURL,
'Entries' => $entries,
));
$entries = array();
$id = 1;

$promise_array['key' . count($timer)] = $promise;
/*if(count($promise_array) % 50 == 0){
Promise\unwrap($promise_array); //unwrap the promise list and wait for all promise complete
$promise_array = array();
}*/

$timer->stop($msg_num);
$msg_num = 0;
$timer->start();
}
$msg_num++;
}
else
{
$message = $tmp_msg;
$msg_num++;
}
}
$entries[] = array(
'Id' => $id,
'MessageBody' => $message
);
$promise = $this->client->sendMessageBatchAsync(array(
'QueueUrl' => self::queueURL,
'Entries' => $entries,
));
$promise_array['key' . count($timer)] = $promise;
Promise\unwrap($promise_array); //unwrap the promise list and wait for all promise complete
$promise_array = array();
$timer->stop($msg_num);
print $timer;
$timer->start();
}
}

/*$sender = new SQSSender;
$promise = $sender->send(500);
if(count($promise) > 0){
Promise\unwrap($promises);
}*/

?>

Before we run the test, the AMAZON require us to configure the key here>
carl-mac:.aws carl$ pwd
/Users/carl/.aws
carl-mac:.aws carl$ ls -l
total 8
-rw-r--r--  1 carl  staff  116 Jul  6 17:00 credentials
carl-mac:.aws carl$

The content will be as follow:
[default]
aws_access_key_id = xxxxxxxx
aws_secret_access_key = xxxxxxxxxxxxx

References:
http://colby.id.au/benchmarking-sqs/
http://www.warski.org/blog/2014/06/benchmarking-sqs/
http://nsono.net/amazon-sqs-vs-rabbitmq/

kafka
http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

SQS
http://docs.aws.amazon.com/aws-sdk-php/guide/latest/service-sqs.html
http://aws.amazon.com/documentation/sqs/
http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/promises.html

http://docs.aws.amazon.com/aws-sdk-php/v3/api/index.html

scala queue
https://github.com/adamw/mqperf/tree/master/src/main/scala/com/softwaremill/mqperf/mq

SQS auto scaling
https://aws.amazon.com/articles/Amazon-SQS/1464

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn