Lapin implements the AMQP 0-9-1 specification on top of an async I/O layer. It is runtime-agnostic: the same code works with tokio (default), smol, or async-global-executor.
For full API documentation see docs.rs/lapin.
use futures_lite::stream::StreamExt;
use lapin::{
options::*, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
#[tokio::main]
async fn main() -> Result<()> {
let addr = std::env::var("AMQP_ADDR")
.unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
channel
.queue_declare("hello".into(), QueueDeclareOptions::durable(), FieldTable::default())
.await?;
channel
.basic_publish(
"".into(),
"hello".into(),
BasicPublishOptions::default(),
b"Hello, world!",
BasicProperties::default(),
)
.await?
.await?;
let mut consumer = channel
.basic_consume(
"hello".into(),
"my_consumer".into(),
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
while let Some(delivery) = consumer.next().await {
let delivery = delivery?;
delivery.ack(BasicAckOptions::default()).await?;
}
Ok(())
}Pass .enable_auto_recover() in ConnectionProperties to have lapin
automatically reconnect and replay exchanges, queues, bindings, and consumers
after a network failure:
use lapin::ConnectionProperties;
let props = ConnectionProperties::default().enable_auto_recover();After catching a recoverable error on a channel, call
channel.wait_for_recovery(error).await to block until recovery is complete.
| Flag | Notes |
|---|---|
tokio (default) |
Requires a running Tokio runtime |
smol |
Uses the smol executor |
async-global-executor |
Uses async-global-executor |
| Flag | Notes |
|---|---|
rustls (default) |
TLS via rustls |
native-tls |
TLS via the platform native library |
openssl |
TLS via OpenSSL |
| Flag | Notes |
|---|---|
rustls-platform-verifier (default) |
Platform trust store |
rustls-native-certs |
Native root certificates |
rustls-webpki-roots-certs |
Bundled webpki root set |
| Flag | Notes |
|---|---|
rustls--aws_lc_rs (default) |
Uses aws-lc-rs |
rustls--ring |
Uses ring (more portable) |
| Flag | Notes |
|---|---|
hickory-dns |
Hickory DNS resolver (avoids spurious network hangs) |
codegen |
Force protocol code regeneration at build time |
verbose-errors |
More detailed AMQP parser error messages |
Lapin can use any runtime by supplying an async_rs::Runtime value:
use lapin::{Connection, ConnectionProperties, Result};
async fn connect_with_custom_runtime() -> Result<()> {
let runtime = async_rs::Runtime::tokio_current();
let conn = Connection::connect_with_runtime(
"amqp://localhost",
ConnectionProperties::default(),
runtime,
).await?;
drop(conn);
Ok(())
}See async-rs for available runtime wrappers.