Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Sending Logs to a Remote Server Using RabbitMQ

DZone's Guide to

Sending Logs to a Remote Server Using RabbitMQ

Learn all about sending logs via RabbitMQ to a remote server in this great tutorial.

· Performance Zone
Free Resource

Some time ago I wrote an article to show how to send Silex logs to a remote server. Today I want to use a messaging queue to do it. Normally, when I need queues, I use Gearman but today I want to play with RabbitMQ.

When we work with web applications it’s important to have, in some way or another, one way to decouple operations from the main request. Messaging queues are great tools to perform those operations. They even allow us to create our workers with different languages than the main request. For example, these days, I’m working with modbus devices. The whole modbus logic is written in Python and I want to use a Frontend with PHP. I can rewrite the modbus logic with PHP (there’re PHP libraries to connect with modbus devices), but I’m not that crazy. In situations like this, queues are our friends.

The idea in this post is the same than the previous post. We’ll use event dispatcher to emit events and we’ll send those events to a RabbitMQ queue. We’ll use a Service Provider called.


<?php
include __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use RabbitLogger\LoggerServiceProvider;
use Silex\Application;
use Symfony\Component\HttpKernel\Event;
use Symfony\Component\HttpKernel\KernelEvents;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

$app = new Application(['debug' => true]);
$app->register(new LoggerServiceProvider($connection, $channel));

$app->on(KernelEvents::TERMINATE, function (Event\PostResponseEvent $event) use ($app) {
    $app['rabbit.logger']->info('TERMINATE');
});

$app->on(KernelEvents::CONTROLLER, function (Event\FilterControllerEvent $event) use ($app) {
    $app['rabbit.logger']->info('CONTROLLER');
});

$app->on(KernelEvents::EXCEPTION, function (Event\GetResponseForExceptionEvent $event) use ($app) {
    $app['rabbit.logger']->info('EXCEPTION');
});

$app->on(KernelEvents::FINISH_REQUEST, function (Event\FinishRequestEvent $event) use ($app) {
    $app['rabbit.logger']->info('FINISH_REQUEST');
});

$app->on(KernelEvents::RESPONSE, function (Event\FilterResponseEvent $event) use ($app) {
    $app['rabbit.logger']->info('RESPONSE');
});

$app->on(KernelEvents::REQUEST, function (Event\GetResponseEvent $event) use ($app) {
    $app['rabbit.logger']->info('REQUEST');
});

$app->on(KernelEvents::VIEW, function (Event\GetResponseForControllerResultEvent $event) use ($app) {
    $app['rabbit.logger']->info('VIEW');
});

$app->get('/', function (Application $app) {
    $app['rabbit.logger']->info('inside route');
    return "HELLO";
});

$app->run();

Here we can see the service provider:


<?php
namespace RabbitLogger;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Silex\Application;
use Silex\ServiceProviderInterface;

class LoggerServiceProvider implements ServiceProviderInterface
{
    private $connection;
    private $channel;

    public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel)
    {
        $this->connection = $connection;
        $this->channel    = $channel;
    }

    public function register(Application $app)
    {
        $app['rabbit.logger'] = $app->share(
            function () use ($app) {
                $channelName = isset($app['logger.channel.name']) ? $app['logger.channel.name'] : 'logger.channel';
                return new Logger($this->connection, $this->channel, $channelName);
            }
        );
    }

    public function boot(Application $app)
    {
    }
}

And here the logger:


<?php
namespace RabbitLogger;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\LogLevel;
use Silex\Application;

class Logger implements LoggerInterface
{
    private $connection;
    private $channel;
    private $queueName;

    public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel, $queueName = 'logger')
    {
        $this->connection = $connection;
        $this->channel    = $channel;
        $this->queueName  = $queueName;
        $this->channel->queue_declare($queueName, false, false, false, false);
    }

    function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }

    public function emergency($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::EMERGENCY);
    }

    public function alert($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::ALERT);
    }

    public function critical($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::CRITICAL);
    }

    public function error($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::ERROR);
    }

    public function warning($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::WARNING);
    }

    public function notice($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::NOTICE);
    }

    public function info($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::INFO);
    }

    public function debug($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::DEBUG);
    }
    public function log($level, $message, array $context = [])
    {
        $this->sendLog($message, $context, $level);
    }

    private function sendLog($message, array $context = [], $level = LogLevel::INFO)
    {
        $msg = new AMQPMessage(json_encode([$message, $context, $level]), ['delivery_mode' => 2]);
        $this->channel->basic_publish($msg, '', $this->queueName);
    }
}

And finally, the RabbitMQ Worker to process our logs


require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('logger.channel', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//$channel->basic_qos(null, 1, null);
$channel->basic_consume('logger.channel', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

To run the example we must:

Start RabbitMQ server


rabbitmq-server

start Silex server


php -S 0.0.0.0:8080 -t www

start worker


php worker/worker.php

You can see the whole project in my GitHub account

Topics:
rabbitmq

Published at DZone with permission of Gonzalo Ayuso, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}