2015-09-01

herokuでRabbitMQ Bigwig試してみた

AMQPとかよくわかりませんが、herokuでRabbitMQ BigWigを試してみましたー。 今回はPHPでRabbitMQのチュートリアルを試してみました。

AMQPに関しては以下のサイトが詳しいです。

heroku関連の設定

まずはRabbitMQ BigWigとredisのアドオンを追加。redisはConsumerが取得したメッセージの格納用に使います。

$ heroku addons:create rabbitmq-bigwig
$ heroku addons:create redistogo

Procfileはこんな感じで。

web: vendor/bin/heroku-php-apache2 public
worker: php consumer.php

AMQPライブラリのインストール

composer.jsonはこんな感じに書いて、composer installしてください。

{
  "require": {
    "videlalvaro/php-amqplib": "2.5.*",
    "predis/predis": "*"
  }
}

Publisherのコード

{rootディレクトリ}/public/publisher.phpに以下のコードを書きます。

<?php

require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

define("CHANNEL_NAME", "hello");
define("RABBITMQ_BIGWIG_TX_URL", getenv("RABBITMQ_BIGWIG_TX_URL"));

$connection = connectRabbitMQ();
$channel = setupRabbitMQ($connection);

$body = json_encode(Array(
    "id" => ceil(microtime(true)*1000),
    "body" => "Hello World" . time()
));
$msg = new AMQPMessage($body);
$channel->basic_publish($msg, '', CHANNEL_NAME);

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

function connectRabbitMQ() {
    $url = parse_url(RABBITMQ_BIGWIG_TX_URL);
    $connection = new AMQPStreamConnection($url["host"], $url["port"], $url["user"],
        $url["pass"], substr($url["path"], 1));
    return $connection;
}

function setupRabbitMQ($connection) {
    $channel = $connection->channel();
    $channel->queue_declare(CHANNEL_NAME, false, false, false, false);
    return $channel;
}

環境変数RABBITMQ_BIGWIG_TX_URLはPublisher用の接続URLになります。

Consumerのコード

{rootディレクトリ}/consumer.phpに以下のコードを書きます。こちらはwebではなくworkerとして起動させます。拾ってきたキューメッセージをredisに格納するだけの単純なプログラムです。

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
Predis\Autoloader::register();

define("CHANNEL_NAME", "hello");
define("RABBITMQ_BIGWIG_RX_URL", getenv("RABBITMQ_BIGWIG_RX_URL"));
define("REDISTOGO_URL", getenv("REDISTOGO_URL"));

$channel = setupRabbitMQ();
$client = setupRedis();
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
  global $client;
  var_dump(json_decode($msg->body));
  $body = json_decode($msg->body);
  $val = $client->set($body->id, $body->body);
};

$channel->basic_consume(CHANNEL_NAME, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

function setupRabbitMQ() {
    $url = parse_url(RABBITMQ_BIGWIG_RX_URL);
    $connection = new AMQPStreamConnection($url["host"], $url["port"], $url["user"],
        $url["pass"], substr($url["path"], 1));
    $channel = $connection->channel();
    $channel->queue_declare(CHANNEL_NAME, false, false, false, false);
    return $channel;
}

function setupRedis() {
    $url = parse_url(REDISTOGO_URL);

    // Named array of connection parameters:
    return new Predis\Client([
        'scheme' => 'tcp',
        'host'   => $url['host'],
        'password'   => $url['pass'],
        'port'   => $url['port'],
    ]);
}

環境変数RABBITMQ_BIGWIG_RX_URLはConsumer用の接続URLになります。

動作確認

https://{heroku app domain}/publisher.phpにアクセスするとpublisher.php内でRabbitMQにメッセージを送り、workerがそのメッセージを取り出してredisに格納します。RabbitMQ BigWigの管理コンソール上ではこんな感じでメッセージ処理されている様子を確認できます。

rabbitmq-management-chart

このエントリーをはてなブックマークに追加