Getting Started with HumusAmqp and RabbitMQ

About this guide

This guide is a quick tutorial that helps you to get started with RabbitMQ and HumusAmqp. It should take about 20 minutes to read and study the provided code examples. This guide covers:

  • Installing RabbitMQ, a mature popular messaging broker server.
  • Installing HumusAmqp via Composer.
  • Producing and consuming messages from cli.
  • Running a JSON-RPC server and client.

Installing RabbitMQ

The RabbitMQ site has a good installation guide that addresses many operating systems. On Mac OS X, the fastest way to install RabbitMQ is with Homebrew:

$ brew install rabbitmq

then run it:

$ rabbitmq-server

On Debian and Ubuntu, you can either download the RabbitMQ .deb package and install it with dpkg or make use of the apt repository that the RabbitMQ team provides.

For RPM-based distributions like RedHat or CentOS, the RabbitMQ team provides an RPM package.

Installing HumusAmqp

  1. You can use composer to install HumusAmqp
$ php composer.phar require prolic/humus-amqp ^1.0
  1. Select a driver to use, currently only php-amqplib and php-amqp (PHP extension) are supported.

Configure your container to return the DriverFactory for the Driver class. If your container supports configuration by array like ZendServiceManager f.e., this should look similar to this. For integration with the Symfony Framework you can use the Symfony container-interop bundle.

return [
    'dependencies' => [
        'factories' => [
            Driver::class => Container\DriverFactory::class,
        ]
    ]
];
  1. Configure your application to use the desired driver.
return [
    'humus' => [
        'amqp' => [
            'driver' => 'amqp-extension',
        ]
    ]
];
  1. Notes about drivers:

1) The PHP Extension (php-amqp) is the fastest one, we strongly recommend using 1.7.1 or building yourself from master to be able to use all features.

There is currently a bug in PhpAmqpLib, see this pull-request. As long as this is not merged and release, you have to manually apply the patch, sorry!

You can do this from the command-line with:

sed -i ‘/$message = $this->get_and_unset_message($delivery_tag);/a $message->delivery_info[“delivery_tag”] = $delivery_tag;’ vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php

2) When using php-amqplib as driver, it’s worth pointing out, that a StreamConnection (same goes for SSLConnection) does not have the possibility to timeout. If you want to let the consumer timeout, when no more messages are received, you should use the SocketConnection instead (assuming you don’t need an SSL connection).

Sample-Configuration

A sample configuration might look like this, more details an explanation will be in the coming chapters.

return [
    'dependencies' => [
        'factories' => [
            Driver::class => Container\DriverFactory::class,
            'default-amqp-connection' => [Container\ConnectionFactory::class, 'default-amqp-connection'],
            'demo-producer' => [Container\ProducerFactory::class, 'demo-producer'],
            'topic-producer' => [Container\ProducerFactory::class, 'topic-producer'],
            'demo-consumer' => [Container\CallbackConsumerFactory::class, 'demo-consumer'],
            'topic-consumer-error' => [Container\CallbackConsumerFactory::class, 'topic-consumer-error'],
            'demo-rpc-server' => [Container\JsonRpcServerFactory::class, 'demo-rpc-server'],
            'demo-rpc-server2' => [Container\JsonRpcServerFactory::class, 'demo-rpc-server2'],
            'demo-rpc-client' => [Container\JsonRpcClientFactory::class, 'demo-rpc-client'],
            'my_callback' => $my_callback_factory,
        ],
    ],
    'humus' => [
        'amqp' => [
            'driver' => 'amqp-extension',
            'exchange' => [
                'demo' => [
                    'name' => 'demo',
                    'type' => 'direct',
                    'connection' => 'default-amqp-connection',
                ],
                'demo.error' => [
                    'name' => 'demo.error',
                    'type' => 'direct',
                    'connection' => 'default-amqp-connection',
                ],
                'topic-exchange' => [
                    'name' => 'topic-exchange',
                    'type' => 'topic',
                    'connection' => 'default-amqp-connection',
                ],
                'demo-rpc-client' => [
                    'name' => 'demo-rpc-client',
                    'type' => 'direct',
                    'connection' => 'default-amqp-connection',
                ],
                'demo-rpc-server' => [
                    'name' => 'demo-rpc-server',
                    'type' => 'direct',
                    'connection' => 'default-amqp-connection',
                ],
                'demo-rpc-server2' => [
                    'name' => 'demo-rpc-server2',
                    'type' => 'direct',
                    'connection' => 'default-amqp-connection',
                ],
            ],
            'queue' => [
                'foo' => [
                    'name' => 'foo',
                    'exchanges' => [
                        'demo' => [
                            [
                                'arguments' => [
                                    'x-dead-letter-exchange' => 'demo.error', // must be defined as exchange before
                                ],
                            ],
                        ],
                    ],
                    'connection' => 'default-amqp-connection',
                ],
                'demo-rpc-client' => [
                    'name' => '',
                    'exchanges' => [
                        'demo-rpc-client' => [],
                    ],
                    'connection' => 'default-amqp-connection',
                ],
                'demo-rpc-server' => [
                    'name' => 'demo-rpc-server',
                    'exchanges' => [
                        'demo-rpc-server' => [],
                    ],
                    'connection' => 'default-amqp-connection',
                ],
                'demo-rpc-server2' => [
                    'name' => 'demo-rpc-server2',
                    'exchanges' => [
                        'demo-rpc-server2' => [],
                    ],
                    'connection' => 'default-amqp-connection',
                ],
                'info-queue' => [
                    'name' => 'info-queue',
                    'exchanges' => [
                        'topic-exchange' => [
                            [
                                'routing_keys' => [
                                    '#.err',
                                ],
                            ]
                        ],
                    ],
                    'connection' => 'default-amqp-connection',
                ],
            ],
            'connection' => [
                'default-amqp-connection' => [
                    'host' => 'localhost',
                    'port' => 5672,
                    'login' => 'guest',
                    'password' => 'guest',
                    'vhost' => '/',
                    'persistent' => true,
                    'read_timeout' => 3, //sec, float allowed
                    'write_timeout' => 1, //sec, float allowed
                ],
            ],
            'producer' => [
                'demo-producer' => [
                    'type' => 'plain',
                    'exchange' => 'demo',
                    'qos' => [
                        'prefetch_size' => 0,
                        'prefetch_count' => 10,
                    ],
                    'auto_setup_fabric' => true,
                ],
                'topic-producer' => [
                    'exchange' => 'topic-exchange',
                    'auto_setup_fabric' => true,
                ],
            ],
            'callback_consumer' => [
                'demo-consumer' => [
                    'queue' => 'foo',
                    'callback' => 'echo',
                    'idle_timeout' => 10,
                    'delivery_callback' => 'my_callback',
                ],
                'topic-consumer-error' => [
                    'queue' => 'info-queue',
                    'qos' => [
                        'prefetch_count' => 100,
                    ],
                    'auto_setup_fabric' => true,
                    'callback' => 'echo',
                    'logger' => 'consumer-logger',
                ],
            ],
            'json_rpc_server' => [
                'demo-rpc-server' => [
                    'callback' => 'poweroftwo',
                    'queue' => 'demo-rpc-server',
                    'auto_setup_fabric' => true,
                ],
                'demo-rpc-server2' => [
                    'callback' => 'randomint',
                    'queue' => 'demo-rpc-server2',
                    'auto_setup_fabric' => true,
                ],
            ],
            'json_rpc_client' => [
                'demo-rpc-client' => [
                    'queue' => 'demo-rpc-client',
                    'auto_setup_fabric' => true,
                ],
            ],
        ],
    ],
];

Tell Us What You Think!

Please take a moment to tell us what you think about this guide: Send an e-mail, say hello in the HumusAmqp gitter chat. or raise an issue on Github.

Let us know what was unclear or what has not been covered. Maybe you do not like the guide style or grammar or discover spelling mistakes. Reader feedback is key to making the documentation better.