A Python workspace for NATS messaging system, containing:
- nats-py - An asyncio Python client for NATS
- nats-server - Python library for managing NATS servers for development and testing
The main NATS client for Python, providing async/await support for pub/sub and JetStream.
Installation: pip install nats-py
Documentation: See Getting Started and JetStream below
A Python library for managing NATS servers in development and testing environments. Provides async APIs to start, configure, and manage NATS server instances and clusters.
Installation: pip install nats-server
Documentation: See the nats-server package for details
Should be compatible with at least Python +3.8.
import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError
async def main():
# It is very likely that the demo server will see traffic from clients other than yours.
# To avoid this, start your own locally and modify the example to use it.
nc = await nats.connect("nats://demo.nats.io:4222")
# You can also use the following for TLS against the demo server.
#
# nc = await nats.connect("tls://demo.nats.io:4443")
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Simple publisher and async subscriber via coroutine.
sub = await nc.subscribe("foo", cb=message_handler)
# Stop receiving after 2 messages.
await sub.unsubscribe(limit=2)
await nc.publish("foo", b'Hello')
await nc.publish("foo", b'World')
await nc.publish("foo", b'!!!!!')
# Synchronous style with iterator also supported.
sub = await nc.subscribe("bar")
await nc.publish("bar", b'First')
await nc.publish("bar", b'Second')
try:
async for msg in sub.messages:
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await sub.unsubscribe()
except Exception as e:
pass
async def help_request(msg):
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await nc.publish(msg.reply, b'I can help')
# Use queue named 'workers' for distributing requests
# among subscribers.
sub = await nc.subscribe("help", "workers", help_request)
# Send a request and expect a single response
# and trigger timeout if not faster than 500 ms.
try:
response = await nc.request("help", b'help me', timeout=0.5)
print("Received response: {message}".format(
message=response.data.decode()))
except TimeoutError:
print("Request timed out")
# Remove interest in subscription.
await sub.unsubscribe()
# Terminate connection to NATS.
await nc.drain()
if __name__ == '__main__':
asyncio.run(main())Starting v2.0.0 series, the client now has JetStream support:
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
await msg.ack()
print(msg)
# Create single ephemeral push based subscriber.
sub = await js.subscribe("foo")
msg = await sub.next_msg()
await msg.ack()
# Create single push based subscriber that is durable across restarts.
sub = await js.subscribe("foo", durable="myapp")
msg = await sub.next_msg()
await msg.ack()
# Create deliver group that will be have load balanced messages.
async def qsub_a(msg):
print("QSUB A:", msg)
await msg.ack()
async def qsub_b(msg):
print("QSUB B:", msg)
await msg.ack()
await js.subscribe("foo", "workers", cb=qsub_a)
await js.subscribe("foo", "workers", cb=qsub_b)
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print("\t", ack)
# Create ordered consumer with flow control and heartbeats
# that auto resumes on failures.
osub = await js.subscribe("foo", ordered_consumer=True)
data = bytearray()
while True:
try:
msg = await osub.next_msg()
data.extend(msg.data)
except TimeoutError:
break
print("All data in stream:", len(data))
await nc.close()
if __name__ == '__main__':
asyncio.run(main())TLS connections can be configured with an ssl context
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
keyfile='client-key.pem')
await nats.connect(servers=["tls://127.0.0.1:4443"], tls=ssl_ctx, tls_hostname="localhost")Setting the scheme to tls in the connect URL will make the client create a default ssl context automatically:
import asyncio
import ssl
from nats.aio.client import Client as NATS
async def run():
nc = NATS()
await nc.connect("tls://demo.nats.io:4443")Note: If getting SSL certificate errors in OS X, try first installing the certifi certificate bundle. If using Python 3.7 for example, then run:
$ /Applications/Python\ 3.7/Install\ Certificates.command
-- pip install --upgrade certifi
Collecting certifi
...
-- removing any existing file or link
-- creating symlink to certifi certificate bundle
-- setting permissions
-- update completeSince v0.9.0 release, you can also optionally install NKEYS in order to use the new NATS v2.0 auth features:
pip install nats-py[nkeys]Usage:
await nats.connect("tls://connect.ngs.global:4222", user_credentials="/path/to/secret.creds")NATS client functionality is split across two layers: the core client
(nats-py, this repo) and Orbit,
a separate set of packages with higher-level utilities.
The split exists so the core can stay small, stable, and consistent across NATS clients in every language, while Orbit can iterate quickly on opinionated abstractions without dragging the core API along for the ride.
- Direct API over Core NATS and JetStream as exposed by
nats-server. - Lightweight, unopinionated, performance-oriented.
- API surface kept in parity with other official NATS clients (Rust, Go, .NET, Java, JS, C). A feature shipped here should look the same shape everywhere.
- Stable, conservative versioning. Breaking changes are rare and deliberate.
- Higher-level, opinionated abstractions built on top of the core client.
- Per-package versioning, so an experimental utility can iterate without bumping every other piece.
- Free to be language-specific: a Python-idiomatic API does not need to match the equivalent in other languages.
- May lag, omit, or extend cross-client parity items.
| Concern | Core (nats-py) |
Orbit |
|---|---|---|
| Connect, publish, subscribe, request/reply | ✅ | |
| JetStream publish, consumers, streams, KV, OS | ✅ | |
| Service API (request/reply micro-services) | ✅ | |
| Wire-protocol coverage, auth, TLS, reconnection | ✅ | |
| Cross-client parity, conservative semver | ✅ | |
| Opinionated helpers / sugar over core APIs | ✅ | |
| New experimental patterns (e.g. partitioned groups) | ✅ | |
| KV codecs, distributed counters, NATS contexts | ✅ | |
| Python-idiomatic abstractions with no parity mandate | ✅ | |
| Per-utility versioning, faster API churn allowed | ✅ |
Rule of thumb: if it is a thin mapping of something
nats-serveralready speaks and every official client must expose it, it belongs in core. If it is a pattern, helper, or abstraction layered on top, it belongs in Orbit.
┌──────────────────────────────────────────────────────┐
│ Application code │
└──────────────┬───────────────────────────┬───────────┘
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ Orbit packages │ uses │ nats-py (core) │
│ (opinionated, │──────▶│ (parity, stable, │
│ per-pkg semver) │ │ protocol-level) │
└───────────────────┘ └─────────┬─────────┘
│
▼
┌─────────────┐
│ nats-server │
└─────────────┘
See CONTRIBUTING.md for development setup and guidelines.
Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.