A blazing-fast, async, sans-io Apache Pulsar client for Rust.
Status: pre-alpha. The wire protocol layer is feature-rich, the tokio engine is usable end-to-end with supervised reconnect + transparent producer/consumer rebuild, and the moonpool engine carries client/producer/consumer for deterministic-simulation testing. API is unstable. Do not depend on this in production.
Magnetar is a from-scratch Apache Pulsar client driver written in Rust. It mirrors the surface area of the Apache Pulsar Java client and adds two properties that the Java client cannot reach:
- Sans-io core. The protocol state machine (
magnetar-proto) is a pure,quinn-proto-style state machine —handle_bytesin,poll_transmitout,poll_eventfor semantic events,poll_timeoutfor timers. Zero I/O dependencies. Notokio. Noasync. No sockets. It is feed-only. - Multiple swappable engines. The same sans-io state machine is driven by
a production tokio engine (
magnetar-runtime-tokio) and by a deterministic simulation engine (magnetar-runtime-moonpool) for chaos testing of reconnects, partitions, and TLS handshake reorderings under reproducible seeds.
The architecture explicitly bans channels (mpsc, broadcast, watch,
oneshot, crossbeam-channel, flume, async-channel, …). The wake-up
mechanism is Arc<parking_lot::Mutex<State>> plus tokio::sync::Notify plus
core::task::Waker slabs inside the state machine. See
ARCHITECTURE.md for the full rationale.
Magnetar is independent of the existing pulsar-rs crate — it shares neither
code nor dependencies. The goal is feature-complete parity with the Apache
Pulsar Java client.
- Protocol coverage: producer, consumer, reader, partitioned producer, partitioned consumer, multi-topics consumer, pattern (regex) consumer, table view, transactions.
- PIPs implemented or partially wired: PIP-4 (end-to-end encryption),
PIP-30 / PIP-292 (in-band
AUTH_CHALLENGErefresh), PIP-31 (transactions), PIP-37 (chunking + redelivery backoff), PIP-54 (partial-batch ACK), PIP-87 (AutoConsumeSchema broker lookup), PIP-90 (broker-entry metadata), PIP-121 (cluster failover —ServiceUrlProvider+ControlledClusterFailoverAutoClusterFailover), PIP-145 (regex topic discovery), PIP-188 (TOPIC_MIGRATEDwith supervised reconnect), PIP-313 (force unsubscribe). See Supported PIPs.
- Resilience: supervised reconnect with
Connection::reset(Stage 2) + transparent producer / consumer rebuild viarebuild_producers/rebuild_consumers(Stage 3) +memory_limitruntime enforcement (JavaMemoryLimitPolicy::FailImmediately) + global publish-bytes accounting viaAtomicU64CAS inProducer::sendwith release onDrop. - Observability: cumulative counters +
hdrhistogramp50/p99/max latency- rolling-window msgs/sec + bytes/sec rates (
record_rate_window). OpenTelemetry context propagation (traceparent/tracestatevia message properties,feature = "opentelemetry", ADR-0053).
- rolling-window msgs/sec + bytes/sec rates (
- Transports: TCP, TLS 1.3 (
rustls-only — nonative-tls, noopenssl), binary proxy (proxy_to_broker_url), pluggable DNS (DnsResolvertrait +TokioDnsResolverdefault routed throughTransport::connect). - TLS knobs:
tls_trust_certs_file_path,tls_allow_insecure_connection(blanket override),tls_hostname_verification_enable(false)paired with a PEM trust store (chain-on / hostname-off via custom rustls verifier). - Schemas: bytes, string, JSON, Avro, Protobuf, Protobuf-native, KeyValue, Auto-consume, Auto-produce-bytes, plus the full primitive family — Int8, Int16, Int32, Int64, Float, Double, Bool, Date, Time, Timestamp, LocalDate, LocalTime, Instant, LocalDateTime.
- Compression: LZ4, ZSTD, Snappy, ZLIB.
- Auth providers: token, mTLS (the two stock providers in
magnetar-proto::auth), OAuth2ClientCredentialsFlow(working — fetches- caches + auto-refreshes JWTs against a standard OIDC token endpoint),
SASL
PLAIN(RFC 4616, working), SASL Kerberos / GSSAPI vialibgssapiunder theauth-sasl-kerberosfeature (working — multi-roundAUTH_CHALLENGEinitiate loop), Athenz with a pre-fetched role token (AthenzProvider::with_role_token, working), and the opt-in Athenz ZTS round-trip (auth-athenz-zts, working viaAthenzProvider::with_default_signeror a customzts::JwtSigner/zts::ZtsClient).
- caches + auto-refreshes JWTs against a standard OIDC token endpoint),
SASL
- Trackers: ack grouping, unacked-message tracker (ack timeout +
redelivery), negative-ack tracker with
MultiplierRedeliveryBackoff(PIP-37), batch-index ACK set (PIP-54). - Interceptors:
ProducerInterceptor+ConsumerInterceptorSPIs. - Admin REST client: a
reqwest-backed admin client lives inmagnetar-admin. - CLI:
magnetarbinary inmagnetar-clicovers admin lookups and stats today; data-plane subcommands (produce / consume / inspect) are in progress.
Magnetar is not yet on crates.io. Use the Git path until the first release:
[dependencies]
magnetar = { git = "https://github.com/CleverCloud/magnetar", branch = "main" }The default feature set enables the tokio engine. The feature flags catalog:
| Flag | Default | Effect |
|---|---|---|
tokio |
yes | Pulls in magnetar-runtime-tokio plus tokio/futures-util. The public PulsarClient lives behind this flag. |
moonpool |
no | Pulls in magnetar-runtime-moonpool for deterministic-simulation testing. |
admin |
no | Re-exports magnetar-admin under magnetar::admin. |
auth-oauth2 |
no | Pulls in magnetar-auth-oauth2 (OAuth2 ClientCredentialsFlow provider). |
auth-sasl |
no | Pulls in magnetar-auth-sasl (SASL PLAIN + the sans-io Kerberos surface). |
auth-sasl-kerberos |
no | Implies auth-sasl and turns on magnetar-auth-sasl/kerberos, which binds libgssapi. Build host needs the MIT KRB5 / Heimdal headers (krb5-devel / libkrb5-dev) and libclang (clang-libs / libclang-dev) — libgssapi-sys runs bindgen at build time. See ADR-0029. |
auth-athenz |
no | Pulls in magnetar-auth-athenz. |
auth-athenz-zts |
no | Implies auth-athenz and turns on the reqwest-backed ZTS exchange plus in-tree JWT signer support. |
encryption |
no | Pulls in magnetar-messagecrypto plus the PIP-4 bridge type. |
| experimental-v5-client | no | Enables the PIP-466 V5 wrapper surface (magnetar::v5) over the v4 wire commands. |
| scalable-topics | no | Enables the experimental PIP-460 scalable-topic scaffold (topic://, DAG watch, StreamConsumer, topic-info). |
| crypto-aws-lc-rs | yes | rustls crypto provider: aws-lc-rs; brings post-quantum hybrid KEX (X25519MLKEM768). See TLS crypto provider. |
| crypto-ring | no | rustls crypto provider: ring. |
| crypto-openssl | no | rustls crypto provider: rustls-openssl (wraps system OpenSSL via deny.toml carve-out). |
| crypto-fips | no | rustls crypto provider: aws-lc-rs FIPS-validated module (requires cmake + C toolchain). |
The workspace ships eleven crates:
| Crate | Role |
|---|---|
magnetar |
Public façade — re-exports + builder + typed schemas wiring. |
magnetar-proto |
Sans-io protocol crate. The heart of the project. |
magnetar-runtime-tokio |
Production tokio engine with tokio-rustls TLS. |
magnetar-runtime-moonpool |
Deterministic-simulation engine (rustls-over-bytepipe TLS, no native TLS). |
magnetar-admin |
REST admin client (reqwest + rustls-tls). |
magnetar-cli |
magnetar binary — admin lookups today, produce / consume / inspect coming. |
magnetar-fakes |
In-process broker fake (dev-dep). Mirrors Java's MockBrokerService. |
magnetar-auth-oauth2 |
OAuth2 ClientCredentialsFlow auth provider. |
magnetar-auth-sasl |
SASL auth provider. |
magnetar-auth-athenz |
Athenz auth provider. |
magnetar-messagecrypto |
PIP-4 end-to-end encryption (AES-GCM via aws-lc-rs). |
xtask is a workspace member but is not published — it hosts build
helpers (protoc codegen, e2e driver, dependency audits).
The rustls crypto backend is selected at compile time via four
mutually-pluggable Cargo features on the magnetar façade. The wire
protocol — TLS 1.3 (default) / TLS 1.2 — is identical across every
provider; what differs is the audited / FIPS-validated / post-quantum
posture of the underlying primitives.
| Feature | Backend | Post-quantum KEX | FIPS validated | Pure Rust | Default |
|---|---|---|---|---|---|
crypto-aws-lc-rs |
aws-lc-rs | yes (X25519MLKEM768) | no | no (C) | ✓ |
crypto-ring |
ring | no | no | no (C) | |
crypto-openssl |
rustls-openssl | yes | depends on OpenSSL build | no | |
crypto-fips |
aws-lc-fips-sys | (FIPS-approved only) | yes | no (C) |
# Pick a single provider (mutually exclusive at build time).
cargo build -p magnetar --no-default-features --features tokio,crypto-aws-lc-rs
cargo build -p magnetar --no-default-features --features tokio,crypto-ring
cargo build -p magnetar --no-default-features --features tokio,crypto-openssl # needs system OpenSSL
cargo build -p magnetar --no-default-features --features tokio,crypto-fips # needs cmake + C toolchain
# The `magnetar` binary (magnetar-cli) mirrors the same cascade — the
# admin REST client (reqwest + rustls) and the data-plane runtime both
# bind to the selected provider. `cargo build -p magnetar-cli` alone
# defaults to `crypto-aws-lc-rs`.
cargo build -p magnetar-cli --no-default-features --features crypto-ringUnder cargo build --workspace --all-features the compile-time cfg
cascade resolves to aws-lc-rs (highest priority). Single-provider builds
go through cargo run -p xtask -- check-crypto-matrix. A single
compile_error! fires if no crypto-* feature is enabled.
The crypto-aws-lc-rs default picks up rustls 0.23's built-in
prefer-post-quantum feature, so the wire client negotiates the
X25519MLKEM768 hybrid key exchange with brokers that support it.
openssl / openssl-sys are admitted only as transitive deps of
rustls-openssl; the rest of ADR-0005
(no native-tls, rustls everywhere) stays in force. See
ADR-0035 for the binding
decision.
The magnetar binary exposes a sozu / systemd-style identification
banner:
$ magnetar --version
magnetar 0.1.0-dev.0 (a1b2c3d4e5f6-dirty)
built 2026-05-26T14:32:11Z · profile=release · rustc=rustc 1.88.0 (…) · target=x86_64-unknown-linux-gnu
features: +default
pulsar wire protocol: v21
os: linux · report bugs at https://github.com/CleverCloud/magnetar
-Vprints a single-line, never-colorized form:magnetar 0.1.0-dev.0 (sha-dirty).--versionprints the multi-line form above, colorized on a TTY.NO_COLOR=1or piping suppresses ANSI (https://no-color.org).SOURCE_DATE_EPOCH=<unix-seconds>pins the build timestamp for reproducible builds.
Full reference: docs/cli.md.
The high-level PulsarClient builder is the public entry point. It wires the
tokio engine to the sans-io state machine and gives you producer / consumer /
reader / table-view / partitioned / multi-topics / pattern builders.
use magnetar::{OutgoingMessage, PulsarClient};
# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let client = PulsarClient::builder()
.service_url("pulsar://localhost:6650")
.build()
.await?;
let producer = client
.producer("persistent://public/default/orders")
.name("orders-writer")
.compression(magnetar_proto::types::CompressionKind::Zstd)
.batching(/* max_messages */ 256, /* max_bytes */ 128 * 1024)
.create()
.await?;
producer
.send(OutgoingMessage::with_payload(b"hello, pulsar".as_slice()).into())
.await?;
let consumer = client
.consumer("persistent://public/default/orders")
.subscription("worker")
.subscription_type(magnetar_proto::pb::command_subscribe::SubType::Shared)
.subscribe()
.await?;
let msg = consumer.receive().await?;
println!("payload: {:?}", msg.payload);
consumer.ack(msg.message_id).await?;
# Ok(()) }use std::sync::Arc;
use magnetar::{PulsarClient, TypedProducerBuilder, TypedConsumerBuilder};
use magnetar_proto::schema::StringSchema;
# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let client = PulsarClient::builder()
.service_url("pulsar://localhost:6650")
.build()
.await?;
let schema = Arc::new(StringSchema::new());
let producer = client
.typed_producer("persistent://public/default/notes", schema.clone())
.create()
.await?;
producer.new_message().value("a note".to_string()).send().await?;
let consumer = client
.typed_consumer("persistent://public/default/notes", schema)
.subscription("transcriber")
.subscribe()
.await?;
let msg = consumer.receive().await?;
println!("decoded value: {}", msg.value);
consumer.ack(msg.id).await?;
# Ok(()) }use magnetar::PulsarClient;
use magnetar_proto::MessageId;
# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let client = PulsarClient::builder()
.service_url("pulsar://localhost:6650")
.build()
.await?;
let reader = client
.reader("persistent://public/default/events")
.start_message_id(MessageId::EARLIEST)
.create()
.await?;
while let Ok(msg) = reader.receive().await {
println!("entry {:?}", msg.message_id);
}
# Ok(()) }use magnetar::{PulsarClient, MessageRoutingMode};
# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let client = PulsarClient::builder()
.service_url("pulsar://localhost:6650")
.build()
.await?;
let p = client
.partitioned_producer("persistent://public/default/events")
.routing_mode(MessageRoutingMode::RoundRobin)
.batching(/* max_messages */ 128, /* max_bytes */ 64 * 1024)
.create()
.await?;
p.new_message().key("user-42").value(b"event".as_slice()).send().await?;
let c = client
.partitioned_consumer("persistent://public/default/events")
.subscription("workers")
.subscribe()
.await?;
let msg = c.receive().await?;
c.ack(msg.topic(), msg.message_id).await?;
# Ok(()) }use magnetar::PulsarClient;
# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let client = PulsarClient::builder()
.service_url("pulsar://localhost:6650")
.build()
.await?;
let pc = client
.pattern_consumer()
.namespace("public/default")
.pattern("orders-.*")
.subscription("workers")
.subscribe(&client)
.await?;
println!("matched topics: {:?}", pc.topics());
let msg = pc.receive().await?;
pc.ack(msg.topic(), msg.message_id).await?;
# Ok(()) }use magnetar::PulsarClient;
# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let client = PulsarClient::builder()
.service_url("pulsar://localhost:6650")
.build()
.await?;
let view = client
.table_view("persistent://public/default/config")
.subscription("cfg-watcher")
.create()
.await?;
view.for_each(|key, value| println!("{key} = {value:?}"));
let last = view.get("api.threshold");
# Ok(()) }use std::time::Duration;
use magnetar::{PulsarClient, OutgoingMessage};
# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let client = PulsarClient::builder()
.service_url("pulsar://localhost:6650")
.build()
.await?;
// open the transaction-coordinator-backed transaction
let runtime_client = /* obtain magnetar_runtime_tokio::Client */
# unreachable!();
let txn = runtime_client.new_txn(Duration::from_secs(60)).await?;
let producer = client
.producer("persistent://public/default/orders")
.create()
.await?;
producer
.send(OutgoingMessage::with_payload(b"line-item".as_slice()).txn(txn.id()).into())
.await?;
txn.commit().await?;
# Ok(()) }use std::sync::Arc;
use magnetar::{
ConsumerInterceptor, IncomingMessage, OutgoingMessage, ProducerInterceptor, PulsarClient,
send_with_interceptors,
};
#[derive(Debug)]
struct StampSender;
impl ProducerInterceptor for StampSender {
fn before_send(&self, msg: &mut OutgoingMessage) {
msg.properties.push(("client".to_owned(), "magnetar".to_owned()));
}
}
# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let client = PulsarClient::builder()
.service_url("pulsar://localhost:6650")
.build()
.await?;
let producer = client
.producer("persistent://public/default/orders")
.create()
.await?;
let chain: Vec<Arc<dyn ProducerInterceptor>> = vec![Arc::new(StampSender)];
let id = send_with_interceptors(
&producer,
OutgoingMessage::with_payload(b"hi".as_slice()),
&chain,
)
.await?;
println!("acked at {id:?}");
# Ok(()) }A check (✅) is a working public-API surface backed by code in the
workspace. A flag (🟡) means partial — a working subset; check
ARCHITECTURE.md for the open gaps. A cross (❌) is a
known-missing feature.
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
send(...) / sendAsync(...) |
✅ | ✅ | Producer::send returns a SendFut. |
| Producer name | ✅ | ✅ | ProducerBuilder::name. |
| Compression (LZ4, ZSTD, Snappy, ZLIB, NONE) | ✅ | ✅ | ProducerBuilder::compression. |
Batching (BatchMessageContainerImpl) |
✅ | ✅ | ProducerBuilder::batching (max-msgs + max-bytes). |
batchingMaxPublishDelay flush timer |
✅ | ✅ | ProducerBuilder::batching_max_publish_delay. |
| Chunking (PIP-37) | ✅ | ✅ | ProducerBuilder::chunking. Chunks-never-batched enforced. |
initialSequenceId |
✅ | ✅ | ProducerBuilder::initial_sequence_id. |
sendTimeout |
✅ | ✅ | ProducerBuilder::send_timeout. |
accessMode (Shared/Exclusive/WaitForExclusive/Fencing) |
✅ | ✅ | ProducerBuilder::access_mode. PIP-68. |
accessMode getter |
✅ | ✅ | Producer::access_mode. |
getProducerName |
✅ | ✅ | Producer::name. |
getTopic |
✅ | ✅ | Producer::topic. |
isConnected / isClosed |
✅ | ✅ | Producer::is_connected / is_closed. |
getLastSequenceId |
✅ | ✅ | Producer::last_sequence_id. |
getLastSequenceIdPublished |
✅ | ✅ | Producer::last_sequence_id_published. |
getLastDisconnectedTimestamp |
✅ | ✅ | Producer::last_disconnected_timestamp. |
flush() |
✅ | ✅ | Producer::flush. |
close() |
✅ | ✅ | Producer::close. Last-clone Drop additionally fires a best-effort fire-and-forget close (ADR-0057) — beyond Java parity, where an abandoned producer leaks broker-side until disconnect. |
getStats |
✅ | ✅ | Producer::stats — counters + send_latency_{p50,p99,max}_ms via hdrhistogram + rolling per-second msgs_per_sec / bytes_per_sec windows (producer_record_rate_window). |
getCompressionType getter |
✅ | ✅ | Producer::compression. |
Per-message key / orderingKey |
✅ | ✅ | OutgoingMessage::key / ordering_key. |
Per-message eventTime |
✅ | ✅ | OutgoingMessage::event_time_ms. |
deliverAt / deliverAfter |
✅ | ✅ | OutgoingMessage::deliver_at_ms / deliver_after_ms. |
replicationClusters + disableReplication |
✅ | ✅ | OutgoingMessage::replication_clusters / disable_replication. |
newMessage(Transaction) (PIP-31) |
✅ | ✅ | OutgoingMessage::txn(txn_id). |
Properties (per-message key/value) |
✅ | ✅ | OutgoingMessage::property. |
TypedMessageBuilder |
✅ | ✅ | MessageBuilder via ProducerExt::new_message. |
ProducerInterceptor SPI |
✅ | ✅ | magnetar::ProducerInterceptor + send_with_interceptors. |
| OpenTelemetry context propagation | ❌ | ✅ | feature = "opentelemetry" — auto-inject traceparent/tracestate at send boundary (ADR-0053). |
pendingQueueSize getter |
✅ | ✅ | Producer::pending_count (batch_len + batch_bytes are bonus). |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
subscribe(...) (Exclusive / Shared / Failover / Key_Shared) |
✅ | ✅ | ConsumerBuilder::subscription_type. |
receive / receiveAsync / receive(timeout) |
✅ | ✅ | Consumer::receive + receive_with_timeout. |
batchReceive / batchReceiveAsync |
✅ | ✅ | Consumer::receive_batch_with_bytes_cap (cap on count + bytes). |
acknowledge (individual) |
✅ | ✅ | Consumer::ack. |
acknowledgeCumulative |
✅ | ✅ | Consumer::ack_cumulative. |
acknowledge(messages) (batch ack) |
✅ | ✅ | Consumer::ack_batch. |
acknowledge(MessageId, Map<String,String>) |
✅ | ✅ | Consumer::ack_with_properties. |
acknowledge(MessageId, Transaction) (PIP-31) |
✅ | ✅ | Consumer::ack_with_txn. |
acknowledgeAsync(messages, Transaction) |
✅ | ✅ | Consumer::ack_batch_with_txn. |
acknowledgeCumulative(MessageId, Map) |
✅ | ✅ | Consumer::ack_cumulative_with_properties. |
acknowledgeCumulative(MessageId, Transaction) |
✅ | ✅ | Consumer::ack_cumulative_with_txn. |
| Batch-index ACK (PIP-54) | ✅ | ✅ | ack_set bitset stamped on individual acks. |
acknowledgmentGroupTime (grouping window) |
✅ | ✅ | ConsumerBuilder::ack_group_time + ack_grouped / ack_grouped_cumulative. |
negativeAcknowledge |
✅ | ✅ | Consumer::negative_ack. |
negativeAcknowledge(messages) |
✅ | ✅ | Consumer::negative_ack_batch. |
negativeAcknowledge(MessageId, delay) |
✅ | ✅ | Consumer::negative_ack_with_delay. |
MultiplierRedeliveryBackoff (PIP-37) |
✅ | ✅ | magnetar_proto::trackers::MultiplierRedeliveryBackoff. |
reconsumeLater (retry-letter topic) |
✅ | ✅ | Consumer::reconsume_later + _with_properties. |
ackTimeout (unacked tracker) |
✅ | ✅ | ConsumerBuilder::ack_timeout. |
ackTimeoutRedeliveryBackoff (PIP-37) |
✅ | ✅ | ConsumerBuilder::ack_timeout_backoff. |
negativeAckRedeliveryDelay |
✅ | ✅ | ConsumerBuilder::negative_ack_redelivery_delay. |
seek(MessageId) |
✅ | ✅ | Consumer::seek. |
seek(timestamp) |
✅ | ✅ | Consumer::seek_timestamp. |
seekAsync(Function<String, Object>) (per-partition) |
✅ | ✅ | PartitionedConsumer::seek_per_partition / MultiTopicsConsumer::seek_per_partition — callback returns SeekTarget::MessageId or SeekTarget::PublishTimeMs per topic. |
seekToEarliest / seekToLatest |
✅ | ✅ | Consumer::seek_to_earliest / seek_to_latest. |
pause() / resume() / isPaused() |
✅ | ✅ | Consumer::pause / resume / is_paused. |
hasReachedEndOfTopic |
✅ | ✅ | Consumer::has_reached_end_of_topic. |
redeliverUnacknowledgedMessages |
✅ | ✅ | Consumer::redeliver_unacked. |
getLastMessageId |
✅ | ✅ | Consumer::last_message_id. |
getStats (counters) |
✅ | ✅ | Consumer::stats. Includes total_chunked_msgs_received. |
| Stats: rolling windows (msgs/sec, bytes/sec) | ✅ | ✅ | ConsumerStats::msgs_per_sec / bytes_per_sec + ProducerStats same. Runtime calls Connection::consumer_record_rate_window(handle, now) / producer_record_rate_window on a tokio::time::interval ticker; first call records baseline, subsequent calls compute per-second rates from the delta. |
| Stats: latency hdrhistogram (p50/p99/max) | ✅ | ✅ | Consumer::stats exposes receive_latency_{p50,p99,max}_ms; Producer::stats exposes send_latency_{p50,p99,max}_ms. |
subscriptionProperties |
✅ | ✅ | ConsumerBuilder::subscription_property. |
replicateSubscriptionState |
✅ | ✅ | ConsumerBuilder::replicate_subscription_state. |
priorityLevel |
✅ | ✅ | ConsumerBuilder::priority_level. |
keySharedPolicy (sticky / auto-split / hash) |
✅ | ✅ | ConsumerBuilder::key_shared_policy. PIP-34/119/282/379. |
startMessageId |
✅ | ✅ | ConsumerBuilder::start_message_id. |
startMessageRollbackDuration |
✅ | ✅ | ConsumerBuilder::start_message_rollback_duration. |
readCompacted |
✅ | ✅ | ConsumerBuilder::read_compacted. |
forceTopicCreation |
✅ | ✅ | ConsumerBuilder::force_topic_creation. |
| Dead-letter policy | ✅ | ✅ | ConsumerBuilder::dead_letter_policy + Consumer::drain_dead_letter. PIP-22/58/124/409. |
cryptoFailureAction (PIP-4) |
✅ | ✅ | Fail / Discard / Consume all wired end-to-end in magnetar-runtime-tokio::consumer::deliver_post_process. |
| Encryption (PIP-4) | ✅ | ✅ | ConsumerBuilder::encryption accepts a MessageDecryptor. |
ConsumerInterceptor SPI |
✅ | ✅ | magnetar::ConsumerInterceptor + receive_with_interceptors. |
unsubscribe() |
✅ | ✅ | Consumer / multi-topics expose unsubscribe. |
forceUnsubscribe (PIP-313) |
✅ | ✅ | Wired through CommandUnsubscribe.force. |
availablePermits getter |
✅ | ✅ | Consumer::available_permits. |
availableInQueue getter |
✅ | ✅ | Consumer::available_in_queue. |
hasReceivedAnyMessage getter |
✅ | ✅ | Consumer::has_received_any_message. |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
| Auto partition discovery | ✅ | ✅ | PulsarClient::partitions_for_topic + builder. |
MessageRoutingMode (RoundRobin / SinglePartition / Custom) |
✅ | ✅ | MessageRoutingMode. |
Custom MessageRouter trait |
✅ | ✅ | MessageRouter trait + message_router(...). |
| Murmur3 + JavaStringHash hashers | ✅ | ✅ | Murmur3HashHasher / JavaStringHashHasher. |
TypedMessageBuilder-equivalent on partitioned producer |
✅ | ✅ | PartitionedMessageBuilder. |
Per-partition stats / lastSequenceId |
✅ | ✅ | Aggregated across child producers. |
| Auto-update partition count (background ticker) | ✅ | ✅ | PartitionedProducerBuilder::auto_update_partitions_interval spawns a tokio::time::interval that signals partitions_changed_notify; user drives refresh_partitions(&client) from the signal. |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
| Auto partition discovery + one consumer per partition | ✅ | ✅ | PulsarClient::partitioned_consumer. |
Full ConsumerBuilder knob forwarding |
✅ | ✅ | 12 knobs forwarded from builder. |
| Receive / ack / nack / seek / unsubscribe across partitions | ✅ | ✅ | All forwarded. |
| Auto-update partition count | ✅ | ✅ | PartitionedConsumerBuilder::auto_update_partitions_interval mirrors the producer pattern; signal drives refresh_partitions(&client). |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
| Subscribe to N explicit topics under one subscription | ✅ | ✅ | MultiTopicsConsumerBuilder::topics. |
| Receive / ack / nack / seek across all topics | ✅ | ✅ | Per-topic forwarding. |
negativeAckWithDelay / ackCumulative |
✅ | ✅ | Forwarded. |
Dynamic add_topic / remove_topic |
✅ | ✅ | MultiTopicsConsumer::add_topic / remove_topic — subscribe / unsubscribe at runtime. |
| Auto-update partition count (background ticker) | ✅ | ✅ | MultiTopicsConsumerBuilder::auto_update_partitions_interval spawns a tokio::time::interval that signals partitions_changed_notify; user drives refresh_partitions(&client) + add_topic(...) from the signal. |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
| Regex topic subscription | ✅ | ✅ | PatternConsumerBuilder::pattern. |
TopicListChanged delta stream |
✅ | ✅ | Client::next_topic_list_change. |
Manual update() reconcile |
✅ | ✅ | PatternConsumer::update(&client) returns a ReconcileReport. |
| Auto-update background ticker | ✅ | ✅ | PatternConsumer::start_auto_reconcile(client, interval) spawns a tokio::time::interval loop that calls update(&client) on every tick; returns a JoinHandle for clean shutdown. |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
| Non-durable exclusive subscription | ✅ | ✅ | ReaderBuilder builds on ConsumerBuilder. |
startMessageId (Earliest / Latest / explicit) |
✅ | ✅ | ReaderBuilder::start_message_id. |
startMessageIdInclusive rollback duration |
✅ | ✅ | ReaderBuilder::start_message_rollback_duration. |
readCompacted |
✅ | ✅ | ReaderBuilder::read_compacted. |
cryptoKeyReader (PIP-4 decryptor) |
✅ | ✅ | ReaderBuilder::encryption. |
hasMessageAvailable / seek |
✅ | ✅ | Via the underlying consumer surface. |
Stats / closure getters (isClosed, etc.) |
✅ | ✅ | Reader::is_closed, available_in_queue, available_permits. |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
| Compacted-topic snapshot keyed by message key | ✅ | ✅ | TableView::get / for_each / snapshot / keys / values. |
| Listener registration | ✅ | ✅ | TableView::listen (TableViewListener). |
Schema-aware TypedTableView |
✅ | ✅ | TypedTableView<S> decodes per-read. |
startMessageId / subscriptionProperty / property knobs |
✅ | ✅ | TableViewBuilder knob set. |
| Auto-update-partitions ticker | ✅ | ✅ | TableViewBuilder::auto_update_partitions_interval(Duration) spawns a background timer that signals TableView::partitions_changed_notify; callers drive refresh_partitions(&client) from the signal. |
cryptoKeyReader wired through |
✅ | ✅ | TableViewBuilder::encryption + TypedTableViewBuilder::encryption stamp the decryptor onto the underlying ConsumerBuilder. |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
| Transaction coordinator client | ✅ | ✅ | magnetar-proto::txn::TxnClient. |
| Begin / commit / abort | ✅ | ✅ | Client::new_txn + Transaction::commit / abort. |
ADD_PARTITION_TO_TXN / ADD_SUBSCRIPTION_TO_TXN |
✅ | ✅ | Client::add_partition_to_txn / add_subscription_to_txn. |
| Producer publish under txn | ✅ | ✅ | OutgoingMessage::txn. |
| Consumer ack under txn (individual + cumulative + batch) | ✅ | ✅ | Consumer::ack_with_txn and friends. |
END_TXN_ON_PARTITION / _ON_SUBSCRIPTION cleanup |
✅ | ✅ | Driven by end_txn. |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
| Token auth | ✅ | ✅ | magnetar_proto::auth::TokenAuth. |
| mTLS | ✅ | ✅ | magnetar_proto::auth::TlsAuth + tls_trust_certs_pem / tls_trust_certs_file_path. |
| OAuth2 ClientCredentialsFlow | ✅ | ✅ | magnetar_auth_oauth2::ClientCredentialsFlow — POSTs grant_type=client_credentials to the IDP, caches the JWT, refreshes within 30 s of expiry. Reports auth_method_name = "token". |
SASL PLAIN (RFC 4616) |
✅ | ✅ | magnetar_auth_sasl::SaslPlain — \0<username>\0<password> payload. |
| SASL Kerberos / GSSAPI | ✅ | ✅ | magnetar_auth_sasl::SaslKerberos runs the GSSAPI initiate loop via libgssapi (façade feature auth-sasl-kerberos). The multi-round AUTH_CHALLENGE / AUTH_RESPONSE exchange threads through AuthProvider::respond_to_challenge; the four sans-io test layers per ADR-0024 drive a magnetar_auth_sasl::ScriptedGssapiClient so they stay free of a libkrb5 build dep. End-to-end coverage uses a Dockerised KDC fixture. See ADR-0029. |
| Athenz (pre-fetched role token) | ✅ | ✅ | AthenzProvider::with_role_token — bypass the ZTS round-trip when the caller already holds a valid role token. |
| Athenz (ZTS round-trip) | ✅ | ✅ | feature = "auth-athenz-zts" (default off). The pluggable zts::ZtsClient trait (zts::HttpZtsClient does the reqwest-backed POST) exchanges a signed JWT for a role token; AthenzProvider owns the expiry-aware cache and ensure_role_token(now) / needs_refresh(now) (sans-io clock injection). Build via AthenzProvider::with_default_signer(config) (cfg-active in-tree signer) or AthenzProvider::builder() (custom signer / client / wall_clock). The concrete zts::JwtSigner ships in two flavours — jwt_signer::AwsLcRsSigner and jwt_signer::RingSigner — gated on the crypto-provider matrix per ADR-0035; parsed PKCS#8 DER wrapped in zeroize::Zeroizing<…>, byte-identical deterministic RS256 (RFC 8017 §8.2). Full four-layer cross-runtime coverage (tokio/moonpool/differential + e2e) per ADR-0024. See ADR-0041. |
In-band AUTH_CHALLENGE refresh (PIP-30 / PIP-292) |
✅ | ✅ | Driver consults the configured AuthProvider and submits CommandAuthResponse. |
pulsar+ssl:// URLs |
✅ | ✅ | Built-in. |
Binary proxy (proxy_to_broker_url) |
✅ | ✅ | ClientBuilder::proxy_to_broker_url. Both engines ship the full ProxyConnectionPool — tokio in magnetar-runtime-tokio::pool, moonpool in magnetar-runtime-moonpool::pool (see the ADR-0039 2026-06-01 amendment). |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
MessageEncryptor trait on producer |
✅ | ✅ | ProducerBuilder::encryption. |
MessageDecryptor trait on consumer |
✅ | ✅ | ConsumerBuilder::encryption. |
AES-GCM via aws-lc-rs |
n/a (Java uses BouncyCastle) | ✅ | magnetar-messagecrypto::MessageCrypto. |
cryptoFailureAction |
✅ | ✅ | Fail / Discard / Consume all wired end-to-end in magnetar-runtime-tokio::consumer::deliver_post_process. |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
BytesSchema |
✅ | ✅ | |
StringSchema |
✅ | ✅ | |
JsonSchema |
✅ | ✅ | Canonicalised via the Avro parser per Codex Q4. |
AvroSchema |
✅ | ✅ | apache-avro 0.21 — canonical-parsing form. |
ProtobufSchema (descriptor) |
✅ | ✅ | |
ProtobufNativeSchema |
✅ | ✅ | Byte-identical Java FileDescriptorSet output. |
KeyValueSchema |
✅ | ✅ | Byte-identical canonical JSON wrapper. |
AutoConsumeSchema (broker lookup) |
✅ | ✅ | TypedConsumer::receive auto-fetches the broker schema on first call via Connection::get_schema; the result is cached on the schema's Arc<Mutex<Option<pb::Schema>>>. |
AutoProduceBytesSchema |
✅ | ✅ | TypedProducer::send warms the broker schema on first send via Producer::get_schema; encode() stays pass-through per Java parity. |
| Int8 / Int16 / Int32 / Int64 / Float / Double / Bool | ✅ | ✅ | |
| Date / Time / Timestamp / LocalDate / LocalTime / Instant / LocalDateTime | ✅ | ✅ | |
| Schema-version negotiation | ✅ | ✅ | Sent on CommandProducer / CommandSubscribe. |
| Feature | Java | Magnetar | Notes |
|---|---|---|---|
serviceUrl |
✅ | ✅ | ClientBuilder::service_url. |
clientVersion |
✅ | ✅ | ClientBuilder::client_version. |
keepAliveInterval |
✅ | ✅ | ClientBuilder::keepalive. |
operationTimeout |
✅ | ✅ | ClientBuilder::operation_timeout. |
maxMessageSize |
✅ | ✅ | ClientBuilder::max_message_size. |
tlsTrustCertsFilePath |
✅ | ✅ | ClientBuilder::tls_trust_certs_file_path. |
tlsAllowInsecureConnection |
✅ | ✅ | ClientBuilder::tls_allow_insecure_connection(true) — accepts any server cert via a custom rustls verifier. Insecure, do not use in production. |
enableTlsHostnameVerification |
✅ | ✅ | ClientBuilder::tls_hostname_verification_enable(bool) — true uses the standard WebPKI verifier; false paired with tls_trust_certs_pem routes through magnetar_runtime_tokio::tls_config_no_hostname which delegates chain check to WebPKI and intercepts only NotValidForName. |
serviceUrlProvider (URL rotation) |
✅ | ✅ | ClientBuilder::service_url_provider(Arc<dyn ServiceUrlProvider>) — the supervised reconnect path calls provider.get_service_url() on every reconnect attempt so cluster-failover policies can swap URLs between attempts. |
proxyServiceUrl (binary proxy) |
✅ | ✅ | ClientBuilder::proxy_to_broker_url. Both engines route proxy lookups through a per-broker ProxyConnectionPool (ADR-0039, moonpool amendment 2026-06-01). |
Authentication plugin |
✅ | ✅ | ClientBuilder::auth(Arc<dyn AuthProvider>). |
memoryLimit |
✅ | ✅ | ClientBuilder::memory_limit(bytes, MemoryLimitPolicy). Both FailImmediately (atomic CAS, ADR-0017) and ProducerBlock (Waker slab, ADR-0020) ship. |
dnsResolver customisation |
✅ | ✅ | ClientBuilder::dns_resolver(Arc<dyn DnsResolver>) — Transport::connect_with_resolver resolves via the provider on every (re)connect; TokioDnsResolver is the default. |
isClosed / shutdown / getLastDisconnectedTimestamp |
✅ | ✅ | All exposed on PulsarClient. |
| Cluster failover (PIP-121) | ✅ | ✅ | ServiceUrlProvider + StaticServiceUrlProvider + ControlledClusterFailover (proto) + AutoClusterFailover (runtime, with user-supplied HealthProbe callback + background tokio task). All three plug into ClientBuilder::service_url_provider. |
- Moonpool engine parity. Java parity is fully satisfied by the
tokio engine (ADR-0019);
the façade's dependent surfaces are engine-generic through ADR-0026 §D1
extension traits. Producers, consumers, readers, partitioned
producer/consumer, multi-topics, pattern consumers, TableView,
transactions, and typed schema builders all dispatch through
impl<E: Engine> PulsarClient<E>where the selected runtime implements the matching*Apitrait. Per-feature, per-engine status lives in the parity matrix above. - PIP-460 scalable topics ship as an experimental scaffold behind
feature = "scalable-topics"(default off); the in-process four-layer test suite is the binding gate. e2e against a real broker waits for an upstream Pulsar release that ships PIP-460 (currentlyDraft, targeting Pulsar 5.0 LTS). - PIP-466 V5 surface ships as an experimental, engine-generic wrapper
behind
feature = "experimental-v5-client"(default off). No wire change — it wraps the v4 surface. - SASL ships both mechanisms end-to-end:
PLAIN(RFC 4616) under the defaultauth-saslfeature, and Kerberos/GSSAPI vialibgssapiunder theauth-sasl-kerberosfeature. The multi-roundAUTH_CHALLENGEexchange threads throughAuthProvider::respond_to_challenge. The four sans-io test layers drive a deterministicScriptedGssapiClient; the e2e layer runs against a Dockerised KDC. See ADR-0029. - Athenz ships both the pre-fetched role-token path and the opt-in
ZTS round-trip (
auth-athenz-zts). Production-style ZMS+ZTS+certificate bootstrap remains out of scope for the local fixture.
| PIP | Title | Status | Lives in |
|---|---|---|---|
| PIP-4 | End-to-end encryption (AES-GCM) | ✅ | magnetar-messagecrypto, crypto_bridge in magnetar |
| PIP-22 | DLQ topic | ✅ | ConsumerBuilder::dead_letter_policy |
| PIP-30 | In-band AUTH_CHALLENGE refresh |
✅ | magnetar-proto::auth, driver |
| PIP-31 | Transactions | ✅ | magnetar-proto::txn, Client::new_txn |
| PIP-37 | Chunking + ack-timeout redelivery backoff | ✅ | magnetar-proto::producer, trackers::nack |
| PIP-54 | Partial-batch ACK (ack_set bitset) | ✅ | magnetar-proto::consumer |
| PIP-58 | Retry-letter topic | ✅ | Consumer::reconsume_later |
| PIP-68 | Exclusive producer access mode | ✅ | ProducerBuilder::access_mode |
| PIP-90 | Broker-entry metadata envelope | ✅ | magnetar-proto::frame (magic 0x0e02), IncomingMessage::broker_* |
| PIP-124 | Multi-DLQ topics for KeyShared | ✅ | DLQ policy infra |
| PIP-145 | Topic list watcher (regex pattern) | ✅ | magnetar-proto::topic_watcher, PatternConsumer |
| PIP-292 | Better in-band auth refresh ergonomics | ✅ | Driver event handler |
| PIP-313 | Force unsubscribe | ✅ | CommandUnsubscribe.force plumbed |
| PIP-34 / 119 / 282 / 379 | Key_Shared family | ✅ | KeySharedConfig + builder |
| PIP-409 | DLQ + retry-letter polish | ✅ | DLQ + reconsume_later wiring |
| PIP-391 | Batch-index ACK polish | ✅ | Pairs with PIP-54 |
| PIP-188 | TOPIC_MIGRATED |
✅ | Wire opcode decoded; tokio driver's event loop catches ConnectionEvent::TopicMigrated, logs the new-broker hint, and returns an error from driver_loop_inner so the supervisor triggers Connection::reset + reconnect. rebuild_producers / rebuild_consumers re-attach every still-open handle on the new socket. |
| local | Anti-thrash policy (ADR-0028) | ✅ (opt-in) | Per-handle ack-then-drop detector + connection-level cooldown. Mitigates broker-driven post-restart cascades (Pulsar PR #14467 / #13428 / #12846 — ServerCnx#handleProducer ↔ AbstractTopic#addProducer race). SupervisorConfig::anti_thrash_threshold default None. |
| PIP-460 | Scalable topics | 🟡 | Experimental scaffold behind feature = "scalable-topics" (default off), ADR-0031. No released Pulsar broker ships PIP-460 (upstream Draft, targets 5.0 LTS) — so this is scaffold-now / e2e-later. Ships the topic:// URL scheme, hand-encoded wire commands (lookup + DAG-watch + DAG-update, behind the feature gate until the Pulsar 5.0 RC vendor bump per ADR-0026 §D4), the DagWatchSession sans-io state machine, the additive default-None MessageId::segment_id field (v4 wire byte-identical), both-engine ScalableTopicsApi impls, magnetar::scalable::StreamConsumer (StreamConsumer-only, drops on DAG change), and the magnetar topic-info CLI. Four-layer in-process tests are the binding gate; e2e is #[ignore]'d. QueueConsumer / CheckpointConsumer / controller-election / repartition stay out of the current scaffold and will land once an upstream release pins the wire commands. See ADR-0031. |
| PIP-466 | V5 client API surface | ✅ | Behind feature = "experimental-v5-client" (default off). Engine-generic per ADR-0032. magnetar::v5 exposes PulsarClientV5<E: Engine = TokioEngine> (with v4() escape hatch), v5::Producer<E>, v5::StreamConsumer<E> (Exclusive / Failover), v5::QueueConsumer<E> (Shared / KeyShared), and the v5::mapping field-translation table. Moonpool callers name PulsarClientV5<MoonpoolEngine<P>> directly. Wraps the v4 surface — no wire change. See ADR-0032. |
| PIP-180 | Shadow topic | ✅ | Admin REST (create_shadow_topic / delete_shadow_topic / get_shadow_topics / get_shadow_source), producer-side send_with_source_message_id propagating CommandSend.message_id, consumer-side MessageReceivedFromShadow event, structural MessageId equality across source ⇄ shadow. See ADR-0033. |
| PIP-415 | getMessageIdByIndex |
✅ | magnetar-admin::AdminClient::topic_get_message_id_by_index — REST-only per PIP-415 (binary-protocol section intentionally empty; canonical implementation apache/pulsar#24222 is admin / broker / CLI only) |
| PIP-33 | Replicated subscriptions | ✅ | ConsumerBuilder::replicate_subscription_state(bool) flips CommandSubscribe field 14; receive-path filter in magnetar-proto::conn drops REPLICATED_SUBSCRIPTION_* markers and surfaces them via PulsarClient::next_replicated_subscription_marker / poll_replicated_subscription_marker. Client never originates markers — broker-side machinery only. See ADR-0034. |
| PIP-121 | Cluster failover (Auto + Controlled) | ✅ | ServiceUrlProvider + StaticServiceUrlProvider + ControlledClusterFailover (proto) + AutoClusterFailover (runtime with HealthProbe). Active URL re-resolved on every supervised-reconnect attempt. |
Magnetar publishes two engines that drive the same sans-io state machine. Pick at compile time via feature flags. The parity matrix above is the per-feature, per-engine status snapshot; this section describes each engine's surface.
- TLS via
tokio-rustls(ring backend); nonative-tls, noopenssl. - One driver task per connection — see ARCHITECTURE.md §"The driver loop".
- The user-facing futures (
Consumer::receive,Producer::send, …) lock the shared state machine, register theirWakerin a slab, and wait. The driver picks them up as the matchingOpOutcomelands. - This is what
magnetar::PulsarClientwires by default (PulsarClient<TokioEngine>).
- Drives the same sans-io state machine as the tokio engine over
moonpool_core::Providers(a bundle ofNetworkProvider,TimeProvider,TaskProvider,RandomProvider,StorageProvider). PlugTokioProvidersfor production-style runs against a real broker, or amoonpool-simprovider bundle for reproducible chaos under a seed. - TLS uses a local
rustls::ClientConnectionadapter (tls.rs) that drivesread_tls/process_new_packets/write_tlsover the moonpool byte pipe — the handshake stays deterministic under chaos. - See
docs/moonpool-engine.mdfor the engine's surface, supervised reconnect, chaos test pack, and the tokio ↔ moonpool differential equivalence harness.
- Pulsar 4.0+ (LTS). The CONNECT frame advertises
ProtocolVersion::V21and the connection falls back to whichever lower version the broker reports onCONNECTED. - The end-to-end suite runs against
apachepulsar/pulsar:4.0.4.
Magnetar targets a feature-complete Apache Pulsar driver with full
Java-client parity, driven by two interchangeable engines (production
tokio + deterministic-simulation moonpool) over the same sans-io
protocol state machine (ADR-0010,
ADR-0019). Java
parity is satisfied on the tokio engine; the moonpool engine is on the
same trajectory and the differential harness enforces tokio ↔ moonpool
EventStream equivalence.
The bulk of the parity matrix above ships on main, including:
- PIP-180 shadow topic (ADR-0033).
- PIP-33 replicated subscriptions (ADR-0034).
- SASL Kerberos / GSSAPI (ADR-0029).
- Pluggable rustls crypto provider (aws-lc-rs / ring / openssl / fips — ADR-0035).
- Daily 128-random-seed moonpool sweep (ADR-0036).
- Anti-thrash supervised reconnect policy (opt-in, ADR-0028).
Known open work is narrow and tracked in
docs/follow-ups.md: PIP-460 e2e waits for a
Pulsar 5.0 RC that pins the scalable-topic wire commands, the moonpool
git dependency waits for a release containing upstream PR #113, and a
few simulation / test-harness gaps remain.
API is unstable until the first tagged release — do not depend on this in production yet.
The whole workspace builds against stable Rust 1.88.
# Build / lint / format
cargo build --workspace --all-features
cargo clippy --workspace --all-features -- -D warnings
cargo +nightly fmt --check
# Unit + integration tests (no broker needed)
cargo test --workspace
# Dependency audits
cargo deny check
# Docs
RUSTDOCFLAGS="-D warnings --cfg tokio_unstable --cfg tracing_unstable" \
cargo doc --workspace --all-features --no-depsEnd-to-end tests against a real broker run as part of
cargo test --workspace --all-features (ADR-0046 — no --features e2e,
no #[ignore]). Docker is the only prerequisite; the suite spins
pulsar:4.0.4 via testcontainers-rs.
Additional xtask checks specific to the sans-io invariants:
cargo run -p xtask -- check-no-channels # greps src/** for banned channel crates
cargo run -p xtask -- check-no-io-deps # magnetar-proto must not depend on any I/O crate
cargo run -p xtask -- codegen --check # asserts proto codegen has no driftApache-2.0 — see LICENSE and NOTICE. The project vendors
a verbatim copy of the Apache Pulsar wire protocol definition
(PulsarApi.proto, PulsarMarkers.proto), released by the Apache Software
Foundation under Apache-2.0.
See GUIDELINES.md and CONTRIBUTING.md for project conventions before sending a patch.