RabbitMQ (AMQP 0-9-1) client library for Python
Introduction · Documentation · Example · Examples · Adapters · Contributing
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including RabbitMQ's extensions.
- Supports Python 3.7+ (CPython and PyPy; versions are listed on PyPI). Pika 1.1.0 was the last release that supported Python 2.7.
- Since threads aren't appropriate to every situation, it doesn't require
threads. Pika core takes care not to forbid them, either. The same goes for
greenlets, callbacks, continuations, and generators. Most connection adapters
are single-threaded; use
ThreadSafeConnectionwhen you need to publish or consume from multiple threads. - People may be using direct sockets, plain old
select(), or any of the wide variety of ways of getting network events to and from a Python application. Pika tries to stay compatible with all of these, and to make adapting it to a new environment as simple as possible.
Pika's documentation can be found here.
Here is the most simple example of use, sending a message with the pika.BlockingConnection adapter:
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_publish(exchange='test', routing_key='test',
body=b'Test message.')
connection.close()And an example of writing a blocking consumer:
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
for method_frame, properties, body in channel.consume('test'):
# Display the message parts and acknowledge the message
print(method_frame, properties, body)
channel.basic_ack(method_frame.delivery_tag)
# Escape out of the loop after 10 messages
if method_frame.delivery_tag == 10:
break
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
connection.close()Pika provides the following adapters:
pika.adapters.asyncio_connection.AsyncioConnection- asynchronous adapter for Python 3 AsyncIO's I/O loop.pika.BlockingConnection- synchronous adapter on top of library for simple usage.pika.SelectConnection- asynchronous adapter without third-party dependencies.pika.adapters.gevent_connection.GeventConnection- asynchronous adapter for use with Gevent's I/O loop.pika.adapters.tornado_connection.TornadoConnection- asynchronous adapter for use with Tornado's I/O loop.pika.adapters.twisted_connection.TwistedProtocolConnection- asynchronous adapter for use with Twisted's I/O loop.pika.adapters.thread_safe_connection.ThreadSafeConnection- thread-safe adapter that runs SelectConnection's IOLoop in a background thread. All channel methods are safe to call from any thread simultaneously.
You can also pass multiple pika.ConnectionParameters instances for fault-tolerance as in the code snippet below (host names are just examples, of course). To enable retries, set connection_attempts and retry_delay as needed in the last pika.ConnectionParameters element of the sequence. Retries occur after connection attempts using all of the given connection parameters fail.
import pika
parameters = (
pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),
pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com',
connection_attempts=5, retry_delay=1))
connection = pika.BlockingConnection(parameters)With non-blocking adapters, such as pika.SelectConnection and pika.adapters.asyncio_connection.AsyncioConnection, you can request a connection using multiple connection parameter instances via the connection adapter's create_connection() class method.
pika.adapters.thread_safe_connection.ThreadSafeConnection is the simplest way to use Pika from multiple threads. It runs the IOLoop in a background thread and provides a blocking API that is safe to call from any number of threads simultaneously. Consumer callbacks run on a per-channel worker thread, so slow processing never stalls heartbeats:
from pika.adapters.thread_safe_connection import ThreadSafeConnection
conn = ThreadSafeConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
def on_message(channel, method, properties, body):
process(body)
channel.basic_ack(method.delivery_tag) # safe, no callback scheduling needed
ch.basic_consume('work', on_message)See examples/basic_consumer_threaded.py and examples/basic_publisher_threaded.py.
For other adapters, each connection instance is confined to a single thread. The single-threaded constraint may result in a dropped connection due to AMQP heartbeat timeout in consumers that take a long time to process a message. A common solution is to delegate processing to another thread while the connection's IOLoop thread services heartbeats.
Messages processed in another thread may not be acknowledged directly from that thread. Instead, schedule a callback in the adapter's IOLoop thread:
def ack_message(channel, delivery_tag):
"""Note that `channel` must be the same Pika channel instance via which
the message being acknowledged was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't acknowledge this message;
# log and/or do something that makes sense for your app in this case.
passThe code running in the other thread may request the ack_message() function to be executed in the connection adapter's I/O loop thread using an adapter-specific mechanism:
pika.BlockingConnectionabstracts its I/O loop from the application and thus exposespika.BlockingConnection.add_callback_threadsafe(). Refer to this method's docstring for additional information. For example:
connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))- When using a non-blocking connection adapter, such as
pika.adapters.asyncio_connection.AsyncioConnectionorpika.SelectConnection, you use the underlying asynchronous framework's native API for requesting an I/O loop-bound callback from another thread. For example,pika.SelectConnection's I/O loop providesadd_callback_threadsafe(),pika.adapters.tornado_connection.TornadoConnection's I/O loop hasadd_callback(), whilepika.adapters.asyncio_connection.AsyncioConnection's I/O loop exposescall_soon_threadsafe().
This threadsafe callback request mechanism may also be used to delegate publishing of messages, etc., from a background thread to the connection adapter's thread.
Some RabbitMQ clients (Bunny, Java, .NET, Objective-C, Swift) provide a way to automatically recover a connection, its channels and topology (e.g. queues, bindings and consumers) after a network failure. Others require connection recovery to be performed by the application code and strive to make it a straightforward process. Pika falls into the second category.
Pika supports multiple connection adapters. They take different approaches to connection recovery.
For pika.BlockingConnection adapter exception handling can be used to check for connection errors. Here is a very basic example:
import pika
while True:
try:
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
channel.start_consuming()
# Don't recover if connection was closed by broker
except pika.exceptions.ConnectionClosedByBroker:
break
# Don't recover on channel errors
except pika.exceptions.AMQPChannelError:
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
continueThis example can be found in examples/consume_recover.py.
Generic operation retry libraries such as retry can be used. Decorators make it possible to configure some additional recovery behaviours, like delays between retries and limiting the number of retries:
from retry import retry
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
try:
channel.start_consuming()
# Don't recover connections closed by server
except pika.exceptions.ConnectionClosedByBroker:
pass
consume()This example can be found in examples/consume_recover_retry.py.
For asynchronous adapters, use on_close_callback to react to connection failure events. This callback can be used to clean up and recover the connection.
An example of recovery using on_close_callback can be found in examples/asynchronous_consumer_example.py.
See the Contributing guide for test setup, documentation builds, code formatting, and pull request expectations.
New non-blocking adapters may be implemented in either of the following ways:
- By subclassing
pika.BaseConnection, implementing its abstract method and passing its constructor an implementation ofpika.adapters.utils.nbio_interface.AbstractIOServices.pika.BaseConnectionimplementspika.connection.Connection's abstract methods, including internally-initiated connection logic. For examples, refer to the implementations ofpika.adapters.asyncio_connection.AsyncioConnection,pika.adapters.gevent_connection.GeventConnectionandpika.adapters.tornado_connection.TornadoConnection. - By subclassing
pika.connection.Connectionand implementing its abstract methods. This approach facilitates implementation of custom connection-establishment and transport mechanisms. For an example, refer to the implementation ofpika.adapters.twisted_connection.TwistedProtocolConnection.