AMQPとかよくわかりませんが、herokuでRabbitMQ BigWigを試してみましたー。 今回はPHPでRabbitMQのチュートリアルを試してみました。
AMQPに関しては以下のサイトが詳しいです。
heroku関連の設定
まずはRabbitMQ BigWigとredisのアドオンを追加。redisはConsumerが取得したメッセージの格納用に使います。
$ heroku addons:create rabbitmq-bigwig
$ heroku addons:create redistogo
Procfileはこんな感じで。
web: vendor/bin/heroku-php-apache2 public
worker: php consumer.php
AMQPライブラリのインストール
composer.jsonはこんな感じに書いて、composer installしてください。
{
"require": {
"videlalvaro/php-amqplib": "2.5.*",
"predis/predis": "*"
}
}
Publisherのコード
{rootディレクトリ}/public/publisher.phpに以下のコードを書きます。
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
define("CHANNEL_NAME", "hello");
define("RABBITMQ_BIGWIG_TX_URL", getenv("RABBITMQ_BIGWIG_TX_URL"));
$connection = connectRabbitMQ();
$channel = setupRabbitMQ($connection);
$body = json_encode(Array(
"id" => ceil(microtime(true)*1000),
"body" => "Hello World" . time()
));
$msg = new AMQPMessage($body);
$channel->basic_publish($msg, '', CHANNEL_NAME);
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
function connectRabbitMQ() {
$url = parse_url(RABBITMQ_BIGWIG_TX_URL);
$connection = new AMQPStreamConnection($url["host"], $url["port"], $url["user"],
$url["pass"], substr($url["path"], 1));
return $connection;
}
function setupRabbitMQ($connection) {
$channel = $connection->channel();
$channel->queue_declare(CHANNEL_NAME, false, false, false, false);
return $channel;
}
環境変数RABBITMQ_BIGWIG_TX_URLはPublisher用の接続URLになります。
Consumerのコード
{rootディレクトリ}/consumer.phpに以下のコードを書きます。こちらはwebではなくworkerとして起動させます。拾ってきたキューメッセージをredisに格納するだけの単純なプログラムです。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
Predis\Autoloader::register();
define("CHANNEL_NAME", "hello");
define("RABBITMQ_BIGWIG_RX_URL", getenv("RABBITMQ_BIGWIG_RX_URL"));
define("REDISTOGO_URL", getenv("REDISTOGO_URL"));
$channel = setupRabbitMQ();
$client = setupRedis();
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
global $client;
var_dump(json_decode($msg->body));
$body = json_decode($msg->body);
$val = $client->set($body->id, $body->body);
};
$channel->basic_consume(CHANNEL_NAME, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
function setupRabbitMQ() {
$url = parse_url(RABBITMQ_BIGWIG_RX_URL);
$connection = new AMQPStreamConnection($url["host"], $url["port"], $url["user"],
$url["pass"], substr($url["path"], 1));
$channel = $connection->channel();
$channel->queue_declare(CHANNEL_NAME, false, false, false, false);
return $channel;
}
function setupRedis() {
$url = parse_url(REDISTOGO_URL);
// Named array of connection parameters:
return new Predis\Client([
'scheme' => 'tcp',
'host' => $url['host'],
'password' => $url['pass'],
'port' => $url['port'],
]);
}
環境変数RABBITMQ_BIGWIG_RX_URLはConsumer用の接続URLになります。
動作確認
https://{heroku app domain}/publisher.phpにアクセスするとpublisher.php内でRabbitMQにメッセージを送り、workerがそのメッセージを取り出してredisに格納します。RabbitMQ BigWigの管理コンソール上ではこんな感じでメッセージ処理されている様子を確認できます。