Smart sockets for simple network communication.
Attribution: And1mu [CC BY-SA 4.0 (https://creativecommons.org/licenses/by-sa/4.0)]
- Simple bind/connect semantics for multiple peers
- Automatic reconnection
- Message-based, not stream-based
- Built-in heartbeating
- Built-in reliability options (at-most-once, at-least-once)
- Optional TLS, native to each language (no OpenSSL/C toolchain in Rust)
aiomsg is a family of native language implementations of one shared wire
protocol. A socket written in any of these languages interoperates on the wire
with a socket written in any other. Each implementation is idiomatic to its
language (no C bindings, no FFI wrappers), and each lives in its own
subdirectory:
Language icons are served by the Devicon CDN.
Every implementation supports TLS, using its language's idiomatic facility
(rustls with the pure-Rust ring backend for both Rust crates, crypto/tls
for Go, the standard ssl module for Python, OpenSSL for C/C++/Zig, JSSE for
Java, Node's tls module for JavaScript, SslStream for C#, and LuaSec for
Lua). TLS sockets interoperate across languages just as plain ones do. The
cross-language conformance suite exercises both transports. See each
implementation's README for the TLS API.
The canonical, language-independent wire specification lives in PROTOCOL.md. The overall plan and per-language design notes are in DESIGN.md.
You do not need to wait for each implementation to be published to every
language registry. Pin this repository to a commit, tag, or branch and point
your language's normal dependency tool at the implementation directory you use.
In the examples below, <git-ref> means a normal Git reference: a commit SHA,
a tag, or a branch name. Tags work with the Git-ref forms shown here; for
reproducible builds, prefer a commit SHA or release tag over main. If your
package manager cannot install from a Git subdirectory directly, add the repo as
a Git submodule and use the shown path-based reference. Commit the submodule
pointer to pin the exact revision:
git submodule add https://github.com/cjrh/aiomsg.git vendor/aiomsgpip install "aiomsg @ git+https://github.com/cjrh/aiomsg.git@<git-ref>#subdirectory=python-lib"
uv add "aiomsg @ git+https://github.com/cjrh/aiomsg.git@<git-ref>#subdirectory=python-lib"For a local checkout or submodule:
pip install ./vendor/aiomsg/python-lib
uv add ./vendor/aiomsg/python-libCargo does not need a subdirectory field here. It searches the Git repository
for a package with the requested package name:
package = "aiomsg"selectsrust-lib-async/Cargo.toml.package = "aiomsg-sync"selectsrust-lib-sync/Cargo.toml.
Use exactly one of tag, rev, or branch to pin the Git reference. For
example, pinned to a tag.
Async Tokio implementation from rust-lib-async/:
[dependencies]
aiomsg = { git = "https://github.com/cjrh/aiomsg.git", package = "aiomsg", tag = "<tag>" }Synchronous threaded implementation from rust-lib-sync/:
[dependencies]
aiomsg = { git = "https://github.com/cjrh/aiomsg.git", package = "aiomsg-sync", tag = "<tag>" }For a commit SHA or branch, replace tag = "<tag>" with rev = "<commit-sha>"
or branch = "<branch>".
For a local checkout or submodule, use path instead of git:
Async Tokio implementation:
[dependencies]
aiomsg = { path = "vendor/aiomsg/rust-lib-async" }Synchronous threaded implementation:
[dependencies]
aiomsg = { package = "aiomsg-sync", path = "vendor/aiomsg/rust-lib-sync" }go get github.com/cjrh/aiomsg/golang-lib@<git-ref>Commit SHAs and branches work directly. If you use semantic release tags for the
Go module, tag them with the subdirectory prefix, such as golang-lib/v1.0.0.
Then import it in Go code:
import aiomsg "github.com/cjrh/aiomsg/golang-lib"For a local checkout or submodule, add a replace directive:
replace github.com/cjrh/aiomsg/golang-lib => ./vendor/aiomsg/golang-libCMake 3.18+ can fetch one implementation directory from this repository. Set
SOURCE_SUBDIR to c-lib, cpp-lib-sync, or cpp-lib-async:
include(FetchContent)
FetchContent_Declare(aiomsg
GIT_REPOSITORY https://github.com/cjrh/aiomsg.git
GIT_TAG <git-ref>
SOURCE_SUBDIR c-lib)
FetchContent_MakeAvailable(aiomsg)
target_link_libraries(myapp PRIVATE aiomsg::aiomsg)For a local checkout or submodule:
add_subdirectory(vendor/aiomsg/c-lib aiomsg-c)
target_link_libraries(myapp PRIVATE aiomsg::aiomsg)Zig can fetch Git packages directly when the Git repository root is the Zig
package root. In this monorepo the Zig package root is zig-lib/, and Zig does
not currently have a Git URL subdirectory selector. If the Zig implementation
is ever split into its own repository, the direct form would be:
zig fetch --save git+https://...#<git-ref>For this repository, pin it with Git, then reference the Zig package directory
from build.zig.zon:
.dependencies = .{
.aiomsg = .{ .path = "vendor/aiomsg/zig-lib" },
},Then import the module in build.zig:
const aiomsg = b.dependency("aiomsg", .{ .target = target, .optimize = optimize });
exe.root_module.addImport("aiomsg", aiomsg.module("aiomsg"));Node package managers can install a Git repository directly when package.json
is at the repository root. Here it lives in javascript-lib/, and npm has no
standard Git URL subdirectory selector. If the JavaScript implementation is ever
split into its own repository, the direct form would be a normal Git dependency:
{
"dependencies": {
"aiomsg": "github:cjrh/aiomsg-js#<git-ref>"
}
}For this repository, use a Git submodule or local checkout and a normal file:
dependency:
{
"dependencies": {
"aiomsg": "file:vendor/aiomsg/javascript-lib"
}
}Or install it into the current project:
npm install ./vendor/aiomsg/javascript-libGradle and Maven do not have a simple standard dependency notation for a Git subdirectory. Use a Git submodule or local checkout and include the Java build as a Gradle composite build.
In settings.gradle.kts:
includeBuild("vendor/aiomsg/java-lib") {
dependencySubstitution {
substitute(module("aiomsg:aiomsg")).using(project(":"))
}
}In build.gradle.kts:
dependencies {
implementation("aiomsg:aiomsg:0")
}NuGet does not consume arbitrary Git repositories directly. Use a Git submodule or local checkout and reference the library project:
dotnet add reference vendor/aiomsg/csharp-lib/Aiomsg/Aiomsg.csprojEquivalent .csproj entry:
<ItemGroup>
<ProjectReference Include="vendor/aiomsg/csharp-lib/Aiomsg/Aiomsg.csproj" />
</ItemGroup>LuaRocks can install the rockspec from this repository at a pinned revision:
luarocks install https://raw.githubusercontent.com/cjrh/aiomsg/<git-ref>/lua-lib/aiomsg-1.0-1.rockspecFor a local checkout or submodule:
luarocks make vendor/aiomsg/lua-lib/aiomsg-1.0-1.rockspecThe code examples throughout this document are in Python (the reference), but the design principles, message-distribution patterns, and developer experience described below are shared by every implementation. Only the spelling differs (async/await in Python and Rust-async; goroutines and channels in Go; blocking calls in Rust-sync).
Let's make two microservices; one will send the current time to the other. Here's the end that binds to a port (a.k.a, the "server"):
import asyncio, time
from aiomsg import Søcket
async def main():
async with Søcket() as sock:
await sock.bind('127.0.0.1', 25000)
while True:
await sock.send(time.ctime().encode())
await asyncio.sleep(1)
asyncio.run(main())Running as a different process, here is the end that does the connecting (a.k.a, the "client"):
import asyncio
from aiomsg import Søcket
async def main():
async with Søcket() as sock:
await sock.connect('127.0.0.1', 25000)
async for msg in sock.messages():
print(msg.decode())
asyncio.run(main())Note that these are both complete, runnable programs, not fragments.
Looks a lot like conventional socket programming, except that these sockets have a few extra tricks. These are described in more detail further down in rest of this document.
Let's look at the same demo in the other language implementations:
The wire protocol is identical, so any of these ends can talk to the Python
ends above (and to each other). Each snippet is two complete programs, the
bind end ("server") and the connect end ("client"), matching the runnable
examples/ in each implementation.
Blocking calls on background threads; no async runtime. The bind end:
use aiomsg::{SendMode, Socket};
fn main() -> aiomsg::Result<()> {
let sock = Socket::builder().send_mode(SendMode::Publish).build();
sock.bind("127.0.0.1:25000")?;
loop {
sock.send(format!("{:?}", std::time::SystemTime::now()))?;
std::thread::sleep(std::time::Duration::from_secs(1));
}
}The connect end:
use aiomsg::Socket;
fn main() -> aiomsg::Result<()> {
let sock = Socket::new();
sock.connect("127.0.0.1:25000")?;
for msg in sock.messages() {
println!("{}", String::from_utf8_lossy(&msg));
}
Ok(())
}The same shape with tokio; bind/connect/send/recv are .awaited. The
bind end:
use aiomsg::{SendMode, Socket};
#[tokio::main]
async fn main() -> aiomsg::Result<()> {
let sock = Socket::builder().send_mode(SendMode::Publish).build();
sock.bind("127.0.0.1:25000").await?;
loop {
sock.send(format!("{:?}", std::time::SystemTime::now())).await?;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}The connect end:
use aiomsg::Socket;
#[tokio::main]
async fn main() -> aiomsg::Result<()> {
let sock = Socket::new();
sock.connect("127.0.0.1:25000").await?;
while let Some(msg) = sock.recv().await {
println!("{}", String::from_utf8_lossy(&msg));
}
Ok(())
}Goroutines and channels under the hood; the public API is plain blocking calls. The bind end:
package main
import (
"time"
aiomsg "github.com/cjrh/aiomsg/golang-lib"
)
func main() {
sock := aiomsg.NewSocket(aiomsg.WithSendMode(aiomsg.Publish))
defer sock.Close()
sock.Bind("127.0.0.1:25000")
for {
sock.Send([]byte(time.Now().Format(time.RFC3339)))
time.Sleep(time.Second)
}
}The connect end:
package main
import (
"fmt"
aiomsg "github.com/cjrh/aiomsg/golang-lib"
)
func main() {
sock := aiomsg.NewSocket()
defer sock.Close()
sock.Connect("127.0.0.1:25000")
for msg := range sock.Messages() {
fmt.Println(string(msg.Data))
}
}Built on std.Io (the 0.16 interface), which hands main its io backend and
allocator. The bind end:
const std = @import("std");
const aiomsg = @import("aiomsg");
pub fn main(init: std.process.Init) !void {
const sock = try aiomsg.Socket.init(init.gpa, init.io, .{ .mode = .publish });
defer sock.deinit();
try sock.bind("127.0.0.1", 25000);
var i: usize = 0;
while (true) : (i += 1) {
var buf: [64]u8 = undefined;
sock.send(std.fmt.bufPrint(&buf, "tick {d}", .{i}) catch unreachable);
init.io.sleep(std.Io.Duration.fromMilliseconds(1000), .boot) catch {};
}
}The connect end (recv returns null only once the socket is closed and
drained; the caller owns each payload):
const std = @import("std");
const aiomsg = @import("aiomsg");
pub fn main(init: std.process.Init) !void {
const sock = try aiomsg.Socket.init(init.gpa, init.io, .{});
defer sock.deinit();
try sock.connect("127.0.0.1", 25000);
while (sock.recv()) |m| {
defer init.gpa.free(m.data);
std.debug.print("{s}\n", .{m.data});
}
}Node.js, ES modules, zero runtime dependencies. The bind end:
import { Socket, SendMode } from "aiomsg";
const sock = new Socket({ sendMode: SendMode.PUBLISH });
await sock.bind("127.0.0.1", 25000);
setInterval(() => sock.send(Buffer.from(new Date().toString())), 1000);The connect end (messages() is an async iterator; it ends when the socket
closes):
import { Socket } from "aiomsg";
const sock = new Socket();
await sock.connect("127.0.0.1", 25000);
for await (const msg of sock.messages()) {
console.log(msg.toString("utf8"));
}Idiomatic async/await; Socket is IAsyncDisposable. The bind end:
using Aiomsg;
await using var sock = new Socket(SendMode.Publish);
await sock.BindAsync("127.0.0.1", 25000);
while (true)
{
sock.Send(System.Text.Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")));
await Task.Delay(1000);
}The connect end:
using Aiomsg;
await using var sock = new Socket();
sock.Connect("127.0.0.1", 25000);
await foreach (var message in sock.Messages())
Console.WriteLine(System.Text.Encoding.UTF8.GetString(message));A cooperative reactor on LuaSocket/LuaSec, driven by the calls that wait on it
(run pumps without receiving; recv/messages pump while receiving). The
bind end:
local aiomsg = require("aiomsg")
local sock = aiomsg.Socket.new({ send_mode = aiomsg.SendMode.PUBLISH })
sock:bind("127.0.0.1", 25000)
while true do
sock:send(os.date())
sock:run(1.0)
endThe connect end:
local aiomsg = require("aiomsg")
local sock = aiomsg.Socket.new()
sock:connect("127.0.0.1", 25000)
for message in sock:messages() do
print(message)
endLooks a lot like ZeroMQ yes? no? Well if you
don't know anything about
ZeroMQ, that's fine too. The rest of this document will assume that you
don't know anything about ZeroMQ. aiomsg is heavily influenced
by ZeroMQ.
There are some differences; hopefully they make things simpler than zmq. For one thing, aiomsg is pure-python so no compilation step is required, and relies only on the Python standard library (and that won't change).
Also, we don't have special kinds of socket pairs like ZeroMQ has. There is
only the one Søcket class. The only role distinction you need to make
between different socket instances is this: some sockets will bind
and others will connect.
This is the leaky part of the API that comes from the underlying BSD socket API. A bind socket will bind to a local interface and port. A connect socket must connect to a bind socket, which can be on the same machine or a remote machine. This is the only complicated bit. You must decide, in a distributed microservices architecture, which sockets must bind and which must connect. A useful heuristic is that the service which is more likely to require horizontal scaling should have the connect sockets. This is because the hostnames to which they will connect (these will be the bind sockets) will be long-lived.
What you see above in the demo is pretty much a typical usage of
network sockets. So what's special about aiomsg? These are
the high-level features:
-
Messages, not streams:
Send and receive are message-based, not stream based. Much easier! This does mean that if you want to transmit large amounts of data, you're going to have have to break them up yourself, send the pieces, and put them back together on the other side.
-
Automatic reconnection
These sockets automatically reconnect. You don't have to write special code for it. If the bind end (a.k.a "server") is restarted, the connecting end will automatically reconnect. This works in either direction. Try it! run the demo code and kill one of the processes. And then start it up again. The connection will get re-established.
-
Many connections on a single "socket"
The bind end can receive multiple connections, but you do all your
.send()and.recv()calls on a single object. (No callback handlers or protocol objects.)More impressive is that the connecting end is exactly the same; it can make outgoing
connect()calls to multiple peers (bind sockets), and you make all yoursend()andrecv()calls on a single object.This will be described in more detail further on in this document.
-
Message distribution patterns
Receiving messages is pretty simple: new messages just show up (remember that messages from all connected peers come through the same call):
async with Søcket() as sock: await sock.bind() async for msg in sock.messages(): print(f"Received: {msg}")
However, when sending messages you have choices. The choices affect which peers get the message. The options are:
- Publish: every connected peer is sent a copy of the message
- Round-robin: each connected peer is sent a unique message; the messages are distributed to each connection in a circular pattern.
- By peer identity: you can also send to a specific peer by using its identity directly.
The choice between pub-sub and round-robin must be made when creating the
Søcket():from aiomsg import Søcket, SendMode async with Søcket(send_mode=SendMode.PUBLISH) as sock: await sock.bind() async for msg in sock.messages(): await sock.send(msg)
This example receives a message from any connected peer, and sends that same message to every connected peer (including the original sender). By changing
PUBLISHtoROUNDROBIN, the message distribution pattern changes so that each "sent" message goes to only one connected peer. The next "sent" message will go to a different connected peer, and so on.For identity-based message sending, that's available any time, regardless of what you choose for the
send_modeparameter; for example:import asyncio from aiomsg import Søcket, SendMode async def main(): async with Søcket() as sock1, Søcket(send_mode=SendMode.PUBLISH) as sock2: await sock1.bind(port=25000) await sock2.bind(port=25001) while True: peer_id, msg = await sock1.recv_identity() # Imagine that the sender constructs each message with # an id like <[prefix][id][\x00][data]> msg_id, _, data = msg.partition(b"\x00") # This goes to all peers (publish mode) await sock2.send(data) # This only goes to the specified peer (identity) await sock1.send(msg_id + b"\x00ok", identity=peer_id) asyncio.run(main())
This example shows how you can receive messages on one socket (
sock1, which could have thousands of connected peers), and relay those messages to thousands of other peers connected on a different socket (sock2).For this example, the
send_modeofsock1doesn't matter because ifidentityis specified in thesend()call, it'll ignoresend_modecompletely.Oh, and the example above is a complete, runnable program which is pretty amazing!
-
Built-in heartbeating
Because ain't nobody got time to mess around with TCP keepalive settings. The heartbeating is internal and opaque to your application code. You won't even know it's happening, unless you enable debug logs. Heartbeats are sent only during periods of inactivity, so they won't interfere with your application messages.
In theory, you really shouldn't need heartbeating because TCP is a very robust protocol; but in practice, various intermediate servers and routers sometimes do silly things to your connection if they think a connection has been idle for too long. So, automatic heartbeating is baked in to let all intermediate hops know you want the connection to stay up, and if the connection goes down, you will know much sooner than the standard TCP keepalive timeout duration (which can be very long!).
If either a heartbeat or a message isn't received within a specific timeframe, that connection is destroyed. Whichever peer is making the
connect()call will then automatically try to reconnect, as discussed earlier. -
Built-in reliability choices
Ah, so what do "reliability choices" mean exactly...?
It turns out that it's quite hard to send messages in a reliable way. Or, stated another way, it's quite hard to avoid dropping messages: one side sends and the other side never gets the message.
aiomsgalready buffers messages when being sent. Consider the following example:from aiomsg import Søcket, SendMode async with Søcket(send_mode=SendMode.PUBLISH) as sock: await sock.bind() while True: await sock.send(b'123) await asyncio.sleep(1.0)
This server above will send the bytes
b"123"to all connected peers; but what happens if there are no connected peers? In this case the message will be buffered internally until there is at least one connected peer, and when that happens, all buffered messages will immediately be sent. To be clear, you don't have to do anything extra. This is just the normal behaviour, and it works the same with theROUNDROBINsend mode.Message buffering happens whenever there are no connected peers available to receive a message. Sounds great right? Unfortunately, this is not quite enough to prevent messages from getting lost. It is still easy to have your process killed immediately after sending data into a kernel socket buffer, but right before the bytes actually get transmitted. In other words, your code thinks the message got sent, but it didn't actually get sent.
The only real solution for adding robustness is to have peers reply to you saying that they received the message. Then, if you never receive this notification, you should assume that the message might not have been received, and send it again.
aiomsgwill do this for you (so again there is no work on your part), but you do have to turn it on.This option is called the
DeliveryGuarantee. The default option, which is just basic message buffering in the absence of any connected peers, is calledDeliveryGuarantee.AT_MOST_ONCE. It means, literally, that any "sent" message will received by a connected peer no more than once (of course, it may also be zero, as described above).The alternative is to set
DeliveryGuarantee.AT_LEAST_ONCE, which enables the internal "retry" feature. It will be possible, under certain conditions, that any given message could be received more than once, depending on timing and situation. This is how the code looks if you enable it:from aiomsg import Søcket, SendMode, DeliveryGuarantee async with Søcket( send_mode=SendMode.ROUNDROBIN, delivery_guarantee=DeliveryGuarantee.AT_LEAST_ONCE ) as sock: await sock.bind() while True: await sock.send(b'123) await asyncio.sleep(1.0)
It's pretty much exactly the same as before, but we added the
AT_LEAST_ONCEoption. Note thatAT_LEAST_ONCEdoes not work for thePUBLISHsending mode. (Would it make sense to enable?)As a minor point, you should note that when
AT_LEAST_ONCEis enabled, it does not mean that every send waits for acknowledgement before the next send. That would incur too much latency. Instead, there is a "reply checker" that runs on a timer, and if a reply hasn't been received for a particular message in a certain timeframe (5.0 seconds by default), that message will be sent again.The connection may have gone down and back up within those 5 seconds, and there may be new messages buffered for sending before the retry send happens. In this case, the retry message will arrive after those buffered messages. This is a long way of saying that the way that message reliability has been implemented can result in messages being received in a different order to what they were sent. In exchange for this, you get a lower overall latency because sending new messages is not waiting on previous messages getting acknowledged.
Finally: if the target peer goes down, and the sending peer goes down, all memory of pending retries is lost. This is because aiomsg keeps the retry log in memory. If your use-case requires greater resilience than what the delivery guarantee provides, then you should not use
AT_LEAST_ONCE, but instead build your own retry system where you track pending replies an actual database that can persist state across restarts. Currently, aiomsg does not do this. (It's an interesting feature and I may work on it in the future, but it isn't there right now.)
The message distribution patterns are what make aiomsg powerful. It
is the way you connect up a whole bunch of microservices that brings the
greatest leverage. We'll go through the different scenarios using a
cookbook format. These examples are in Python, but the same patterns apply to all implementations.
In the code snippets that follow, you should assumed that each snippet is a complete working program, except that some boilerplate is omitted. This is the basic template:
import asyncio
from aiomsg import Søcket, SendMode, DeliveryGuarantee
<main() function>
asyncio.run(main())Just substitute in the main() function from the snippets below to
make the complete programs.
The choice of "which peer should bind" is unaffected by the sending mode of the socket.
flowchart LR
subgraph BindPublisher["Publisher binds"]
PB["publisher<br/>bind socket<br/>SendMode.PUBLISH"] -->|"News!"| BPA["connected peer A"]
PB -->|"News!"| BPB["connected peer B"]
end
subgraph ConnectPublisher["Publisher connects"]
BB["stable peer<br/>bind socket"] <-->|"connect()"| PC["publisher<br/>connect socket<br/>SendMode.PUBLISH"]
PC -->|"News!"| BB
end
Compare
# Publisher that binds
async def main():
async with Søcket(send_mode=SendMode.PUBLISH) as sock:
await sock.bind()
while True:
await sock.send(b'News!')
await asyncio.sleep(1)versus
# Publisher that connects
async def main():
async with Søcket(send_mode=SendMode.PUBLISH) as sock:
await sock.connect()
while True:
await sock.send(b'News!')
await asyncio.sleep(1)The same is true for the round-robin sending mode. You will usually choose the bind peer based one which service is least likely to require dynamic scaling. This means that the mental conception of socket peers as either a server or client is not that useful.
In this recipe, one service needs to send messages to another service that is horizontally scaled.
The trick here is that we don't want to use bind sockets on horizontally-scaled services, because other peers that need to make a connect call will need to know what hostname to use. Each instance in a horizontally-scaled service has a different IP address, and it becomes difficult to keep the "connect" side up-to-date about which peers are available. This can also change as the horizontally-scaled service increases or decreases the number of instances. (In ZeroMQ documentation, this is described as the Dynamic Discovery Problem).
aiomsg handles this very easily: just make sure that the
dynamically-scaled service is making the connect calls:
flowchart LR
DNS["DNS<br/>jobcreator.com"] -. "resolves to" .-> JC["jobcreator.py<br/>bind 0.0.0.0:25001<br/>SendMode.ROUNDROBIN"]
W1["worker.py instance A<br/>connect jobcreator.com:25001"] -->|"TCP connect"| JC
W2["worker.py instance B<br/>connect jobcreator.com:25001"] -->|"TCP connect"| JC
WN["worker.py instance N<br/>connect jobcreator.com:25001"] -->|"TCP connect"| JC
JC -->|"job (round-robin)"| W1
JC -->|"job (round-robin)"| W2
JC -->|"job (round-robin)"| WN
This is the manually-scaled service (has a specific domain name):
# jobcreator.py -> DNS for "jobcreator.com" should point to this machine.
async def main():
async with Søcket(send_mode=SendMode.ROUNDROBIN) as sock:
await sock.bind(hostname="0.0.0.0", port=25001)
while True:
await sock.send(b"job")
await asyncio.sleep(1)These are the downstream workers (don't need a domain name):
# worker.py - > can be on any number of machines
async def main():
async with Søcket() as sock
await sock.connect(hostname='jobcreator.com', port=25001)
while True:
work = await sock.recv()
<do work>With this code, after you start up jobcreator.py on the machine
to which DNS resolves the domain name "jobcreator.com", you can start
up multiple instances of worker.py on other machines, and work
will get distributed among them. You can even change the number of
worker instances dynamically, and everything will "just work", with
the main instance distributing work out to all the connected workers
in a circular pattern.
This core recipe provides a foundation on which many of the other recipes are built.
In this scenario, there are actually two instances of the job-creating service, not one. This would typically be done for reliability, and each instance would be placed in a different availability zones. Each instance will have a different domain name.
It turns out that the required setup follows directly from the previous one: you just add another connect call in the workers.
flowchart TB
subgraph Names["Stable DNS names"]
direction LR
DNSA["DNS<br/>a.jobcreator.com"]
DNSB["DNS<br/>b.jobcreator.com"]
end
subgraph Creators["2 jobcreator.py instances"]
direction LR
JCA["AZ A<br/>bind 0.0.0.0:25001<br/>SendMode.ROUNDROBIN"]
JCB["AZ B<br/>bind 0.0.0.0:25001<br/>SendMode.ROUNDROBIN"]
end
Workers["dynamic worker pool<br/>worker.py instances A ... N<br/>connect a.jobcreator.com:25001<br/>connect b.jobcreator.com:25001"]
DNSA -. "resolves to" .-> JCA
DNSB -. "resolves to" .-> JCB
JCA -->|"jobs"| Workers
JCB -->|"jobs"| Workers
The manually-scaled service is as before, but you start on instance of
jobcreator.py on machine "a.jobcreator.com", and start another
on machine "b.jobcreator.com". Obviously, it is DNS that is configured
to point to the correct IP addresses of those machines (or you could
use IP addresses too, if these are internal services).
# jobcreator.py -> Configure DNS to point to these instances
async def main():
async with Søcket(send_mode=SendMode.ROUNDROBIN) as sock:
await sock.bind(hostname="0.0.0.0", port=25001)
while True:
await sock.send(b"job")
await asyncio.sleep(1)As before, the downstream workers, but this time each worker makes
multiple connect() calls; one to each job creator's domain name:
# worker.py - > can be on any number of machines
async def main():
async with Søcket() as sock:
await sock.connect(hostname='a.jobcreator.com', port=25001)
await sock.connect(hostname='b.jobcreator.com', port=25001)
while True:
work = await sock.recv()
<do work>aiomsg will return work from the sock.recv() call above as
it comes in from either job creation service. And as before, the number
of worker instances can be dynamically scaled, up or down, and all the
connection and reconnection logic will be handled internally.
If both services need to be dynamically-scaled, and can have
varying numbers of instances at any time, we can no longer rely
on having one end do the socket bind to a dedicated domain name.
We really would like each to make connect() calls, as we've
seen in previous examples.
How to solve it?
The answer is to create an intermediate proxy service that has two bind sockets, with long-lived domain names. This is what will allow the other two dynamically-scaled services to have a dynamic number of instances.
flowchart LR
DNS["DNS<br/>proxy.jobcreator.com"] -. "resolves to" .-> P1
DC1["dynamiccreator.py instance A<br/>connect proxy.jobcreator.com:25001"] -->|"TCP connect"| P1
DCN["dynamiccreator.py instance N<br/>connect proxy.jobcreator.com:25001"] -->|"TCP connect"| P1
subgraph Proxy["proxy.py"]
P1["sock1<br/>bind 0.0.0.0:25001"] -->|"recv work; forward"| P2["sock2<br/>bind 0.0.0.0:25002<br/>SendMode.ROUNDROBIN"]
end
W1["worker.py instance A<br/>connect proxy.jobcreator.com:25002"] -->|"TCP connect"| P2
WN["worker.py instance N<br/>connect proxy.jobcreator.com:25002"] -->|"TCP connect"| P2
P2 -->|"job (round-robin)"| W1
P2 -->|"job (round-robin)"| WN
Here is the new job creator, whose name we change to dynamiccreator.py
to reflect that it is now dynamically scalable:
# dynamiccreator.py -> can be on any number of machines
async def main():
async with Søcket(send_mode=SendMode.ROUNDROBIN) as sock:
await sock.connect(hostname="proxy.jobcreator.com", port=25001)
while True:
await sock.send(b"job")
await asyncio.sleep(1)Note that our job creator above is now making a connect() call to
proxy.jobcreator.com:25001 rather than binding to a local port.
Let's see what it's connecting to. Here is the intermediate proxy
service, which needs a dedicated domain name, and two ports allocated
for each of the bind sockets.
# proxy.py -> Set up DNS to point "proxy.jobcreator.com" to this instance
async def main():
async with Søcket() as sock1, \
Søcket(send_mode=SendMode.ROUNDROBIN) as sock2:
await sock1.bind(hostname="0.0.0.0", port=25001)
await sock2.bind(hostname="0.0.0.0", port=25002)
while True:
work = await sock1.recv()
await sock2.send(work)Note that sock1 is bound to port 25001; this is what our job creator
is connecting to. The other socket, sock2, is bound to port 25002, and
this is the one that our workers will be making their connect() calls
to. Hopefully it's clear in the code that work is being received from
sock1 and being sent onto sock2. This is pretty much a feature
complete proxy service, and with only minor additions for error-handling
can be used for real work.
For completeness, here are the downstream workers:
# worker.py - > can be on any number of machines
async def main():
async with Søcket() as sock:
await sock.connect(hostname='proxy.jobcreator.com', port=25002)
while True:
work = await sock.recv()
<do work>Note that the workers are connecting to port 25002, as expected.
You might be wondering: isn't this just moving our performance problem to a different place? If the proxy service is not scalable, then surely that becomes the "weakest link" in our system architecture?
This is a pretty typical reaction, but there are a couple of reasons why it might not be as bad as you think:
- The proxy service is doing very, very little work. Thus, we expect it to suffer from performance problems only at a much higher scale compared to our other two services which are likely to be doing more CPU-bound work (in real code, not my simple examples above).
- We could compile only the proxy service into faster low-level code using
any number of tools such as Cython, C, C++, Rust, D and so on, in order
to improve its performance, if necessary (this would require implementing
the
aiomsgprotocols in that other language though). This allows us to retain the benefits of using a dynamic language like Python in the dynamically scaled services where much greater business logic is captured (these can be then be horizontally scaled quite easily to handle performance issues if necessary). - Performance is not the only reason services are dynamically scaled. It is always a good idea, even in low-throughput services, to have multiple instances of a service running in different availability zones. Outages do happen, yes, even in your favourite cloud provider's systems.
- A separate proxy service as shown above isolates a really complex problem and removes it from your business logic code. It might not be easy to appreciate how significant that is. As your dev team is rapidly iterating on business features, and redeploying new versions several times a day, the proxy service is unchanging, and doesn't require redeployment. In this sense, it plays a similar role to more traditional messaging systems like RabbitMQ and ActiveMQ.
- We can still run multiple instances of our proxy service using an earlier technique, as we'll see in the next recipe.
This scenario is exactly like the previous one, except that we're nervous about having only a single proxy service, since it is a single point of failure. Instead, we're going to have 3 instances of the proxy service running in parallel.
Let's jump straight into code. The proxy code itself is actually unchanged from before. We just need to run more copies of it on different machines. Each machine will have a different domain name.
flowchart TB
ENV["PROXY_HOSTNAMES<br/>px1.jobcreator.com; px2.jobcreator.com; px3.jobcreator.com"]
DC["dynamiccreator.py pool<br/>instances A ... N<br/>connect px1.jobcreator.com:25001<br/>connect px2.jobcreator.com:25001<br/>connect px3.jobcreator.com:25001"]
subgraph Proxies["proxy.py fleet"]
direction TB
PX1["px1.jobcreator.com<br/>sock1 bind 0.0.0.0:25001<br/>sock2 bind 0.0.0.0:25002"]
PX2["px2.jobcreator.com<br/>sock1 bind 0.0.0.0:25001<br/>sock2 bind 0.0.0.0:25002"]
PX3["px3.jobcreator.com<br/>sock1 bind 0.0.0.0:25001<br/>sock2 bind 0.0.0.0:25002"]
end
W["worker.py pool<br/>instances A ... N<br/>connect px1.jobcreator.com:25002<br/>connect px2.jobcreator.com:25002<br/>connect px3.jobcreator.com:25002"]
ENV -. "configured in" .-> DC
ENV -. "configured in" .-> W
DC -->|"jobs to port 25001"| PX1
DC -->|"jobs to port 25001"| PX2
DC -->|"jobs to port 25001"| PX3
PX1 -->|"forwarded jobs from port 25002"| W
PX2 -->|"forwarded jobs from port 25002"| W
PX3 -->|"forwarded jobs from port 25002"| W
# proxy.py -> unchanged from the previous recipe
async def main():
async with Søcket() as sock1, \
Søcket(send_mode=SendMode.ROUNDROBIN) as sock2:
await sock1.bind(hostname="0.0.0.0", port=25001)
await sock2.bind(hostname="0.0.0.0", port=25002)
while True:
work = await sock1.recv()
await sock2.send(work)For the other two dynamically scaled services, we need to tell them all the domain names to connect to. We could set that up in an environment variable:
$ export PROXY_HOSTNAMES="px1.jobcreator.com;px2.jobcreator.com;px3.jobcreator.com"Then, it's really easy to modify our services to make use of that. First, the dynamically-scaled job creator:
# dynamiccreator.py -> can be on any number of machines
async def main():
async with Søcket(send_mode=SendMode.ROUNDROBIN) as sock:
for proxy in os.environ['PROXY_HOSTNAMES'].split(";"):
await sock.connect(hostname=proxy, port=25001)
while True:
await sock.send(b"job")
await asyncio.sleep(1)And the change for the worker code is identical (making sure the correct port is being used, 25002):
# worker.py - > can be on any number of machines
async def main():
async with Søcket() as sock:
for proxy in os.environ['PROXY_HOSTNAMES'].split(";"):
await sock.connect(hostname=proxy, port=25002)
while True:
work = await sock.recv()
<do work>Three proxies, each running in a different availability zone, should be adequate for most common scenarios.
TODO: more scenarios involving identity (like ROUTER-DEALER)
Secure connectivity is extremely important, even in an internal microservices infrastructure. From a design perspective, the single biggest positive impact that can be made on security is to make it easy for users to do the "right thing".
For this reason, aiomsg does nothing new at all. It uses the existing
support for secure connectivity in the Python standard library, and
uses the same APIs exactly as-is.
All you have to do is create an SSLContext object, exactly as you normally would for conventional Python sockets, and pass that in.
Mutual TLS authentication (mTLS)
is where the client verifies the server and the server verifies
the client. In aiomsg, names like "client" and "server" are less
useful, so let's rather say that the connect socket verifies the
target bind socket, and the bind socket also verifies the incoming
connecting socket.
It sounds complicated, but at a high level you just need to supply
an SSLContext instance to the bind socket, and a different SSLContext
instance to the connect socket (usually on a different computer). The details
are all stored in the SSLContext objects.
flowchart LR
C["connect socket<br/>connect 127.0.0.1:25000<br/>SSLContext SERVER_AUTH"] <-->|"TLS connection"| B["bind socket<br/>bind 127.0.0.1:25000<br/>SSLContext CLIENT_AUTH"]
ServerCert["server.cert + server.key"] -. "load_cert_chain" .-> B
ClientCert["client.cert + client.key"] -. "load_cert_chain" .-> C
CTrust["connect context<br/>load_verify_locations(server.cert)<br/>check_hostname = True"] -->|"verifies bind peer"| B
BTrust["bind context<br/>load_verify_locations(client.cert)"] -->|"verifies connect peer"| C
Let's first look at how that looks for a typical bind socket and connect socket:
# bind end
import ssl
import asyncio, time
from aiomsg import Søcket
async def main():
ctx = ssl.SSLContext(...) # <--------- NEW!
async with Søcket() as sock:
await sock.bind('127.0.0.1', 25000, ssl_context=ctx)
while True:
await s.send(time.ctime().encode())
asyncio.run(main())# connect end
import ssl
import asyncio
from aiomsg import Søcket
async def main():
ctx = ssl.SSLContext(...) # <--------- NEW!
async with Søcket() as sock:
await sock.connect('127.0.0.1', 25000, ssl_context=ctx)
async for msg in sock.messages():
print(msg.decode())
asyncio.run(main())If you compare these two code snippets to what was shown in the Demo
section, you'll see it's almost exactly the same, except that we're
passing a new ctx parameter into the respective bind() and connect()
calls, which is an instance of SSLContext.
So if you already know how to work with Python's built-in SSLContext
object, you can already create secure connections with aiomsg and
there's nothing more you need to learn.
You might not know how to set up the SSLContext object.
Here, I'll give a crash course, but please remember that I am
not a security expert so make sure to ask an actual security expert
to review your work if you're working on a production system.
The best way to create an SSLContext object is not with its
constructor, but rather a helper function called create_default_context(),
which sets a lot of sensible defaults that you would otherwise have to
do manually. So that's how you get the context instance.
You do have to specify whether the purpose of the context object is to verify a client or a server. Let's have a look at that:
# bind socket, or "server"
ctx: SSLContext = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)So here, above, we're creating a context object for a bind socket. The
purpose of the context is going to be to verify incoming client connections,
that's why the CLIENT_AUTH purpose was given. As you might imagine,
on the other end, i.e., the connect socket (or "client"), the purpose
is going to be to verify the server:
# connect socket, or "client"
ctx: SSLContext = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)Once you've created the context, the remaining parameters have the same meaning for both client and server.
The way TLS works (the artist formerly known as SSL) is that each end of a connection has two pieces of information:
- A certificate (may be shared publicly)
- A key (MUST NOT BE SHARED! SECRET!)
When the two sockets establish a connection, they trade certificates, but do not trade keys. Anyway, let's look at what you need to actually set in the code. We'll start with the connect socket (client).
# connect socket, or "client"
ctx: SSLContext = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
ctx.load_verify_locations(<something that can verify the server cert>)The above will let the client verify that the server it is connecting to is the correct one. When the socket connects, the server socket will send back a certificate and the client checks that against one of those mysterious "verify locations".
For mutual TLS, the server also wants to check the client. What does it check? Well, the client must also provide a certificate back to the server. So that requires an additional line in the code block above:
# connect socket, or "client"
ctx: SSLContext = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
ctx.load_verify_locations(<something that can verify the server cert>)
# Client needs a pair of "cert" and "key"
ctx.load_cert_chain(certfile="client.cert", keyfile="client.key")So that completes everything we need to do for the SSL context on the client side. On the server side, everything is almost exactly the same:
# bind socket, or "server"
ctx: SSLContext = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(<something that can verify the client cert>)
# Server needs a pair of "cert" and "key"
ctx.load_cert_chain(certfile="server.cert", keyfile="server.key")That describes everything you need to do to set up mutual TLS using
SSLContext instances.
There are a few loose ends to tie up though. Where do you get the
certfile and keyfile from? And what is this mysterious
"verify location"? The first question is easier. The cert and key can be
generated using the OpenSSL command-line application:
$ openssl req -newkey rsa:2048 -nodes -keyout server.key \
-x509 -days 365 -out server.cert \
-subj '/C=GB/ST=Blah/L=Blah/O=Blah/OU=Blah/CN=example.com'Running the above command will create two new files, server.cert and
server.key; these are ones you specify in earlier commands. Generating
these files for the client is exactly the same, but you use different
names.
You could also use Let's Encrypt
to generate the cert and key, in which case you don't have to run the
above commands. IF you use Let's Encrypt, you've also solved the
other problem of supplying a "verify location", and in fact you won't need
to call load_verify_locations() in the client code at all. This is
because there are a bunch of root certificate authorities that are
provided with most operating systems, and Let's Encrypt is one of those.
However, for the sake of argument, let's say you want to make your own certificates and you don't want to rely on system-provided root certificates at all; how to do the verification? Well it turns out that a very simple solution is to just use the target certificate itself to be the "verify location". For example, here is the client context again:
# connect socket, or "client"
ctx: SSLContext = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
ctx.load_verify_locations("server.cert") # <--- Same one as the server
# Client needs a pair of "cert" and "key"
ctx.load_cert_chain(certfile="client.cert", keyfile="client.key")and then in the server's context, you could also use the client's cert as the "verify location":
# bind socket, or "server"
ctx: SSLContext = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations("client.cert) # <--- Same as on client
# Server needs a pair of "cert" and "key"
ctx.load_cert_chain(certfile="server.cert", keyfile="server.key")Obviously, the client code and the server code are running on different computers and you need to make sure that the right files are on the right computers in the right places.
There are a lot of ways to make this more sophisticated, but it's probably a good idea to get the simple case working, as described above, before looking at the more complicated cases. A cool option is to make your own root certificate authority, which can be a standard "verify location" in all your microservices, and then when you make certs and keys for each microservice, you just have to "sign" them with the root key. This process is described in Be your own certificate authority by Moshe Zadka
Hope that helps!
The slashed O is used in homage to ØMQ, a truly wonderful library that changed my thinking around what socket programming could be like.
You don't have to reverse-engineer anything: the complete, language-independent
wire protocol is specified in PROTOCOL.md. It is deliberately
tiny: length-prefixed frames carrying a small typed envelope
(HELLO, HEARTBEAT, DATA, DATA_REQ, ACK), so reimplementing
it in a new language is straightforward. Anything written to that spec will
interoperate with every existing implementation.
If the language you want already has an implementation (see Implementations above), you don't need to implement anything at all, just use it.
This repository hosts several language implementations, each in its own
subdirectory with its own tooling. Every implementation exposes a uniform
just test recipe, and the top-level justfile dispatches to each.
Install these packages before running just test-all, just build-agents, or
the cross-language conformance suite. They cover the Python, Rust, Go, C,
C++, Zig, Java, JavaScript, C#, and Lua implementations.
Fedora Linux 44:
sudo dnf install -y \
git just uv python3 python3-devel \
rust cargo golang \
gcc gcc-c++ cmake make pkgconf-pkg-config openssl-devel asio-devel \
zig java-25-openjdk-devel nodejs24 nodejs24-npm dotnet-sdk-9.0 \
lua lua-devel lua-socket lua-sec luarocksUbuntu/Debian-family systems:
sudo apt update
sudo apt install -y \
git just curl \
rustup golang-go \
build-essential cmake pkg-config libssl-dev libasio-dev \
openjdk-25-jdk nodejs npm dotnet-sdk-9.0 \
python3 python3-dev python3-venv python3-pip \
lua5.4 liblua5.4-dev luarocks lua-socket lua-sec
curl -LsSf https://astral.sh/uv/install.sh | shUbuntu packages for zig, openjdk-25-jdk, and dotnet-sdk-9.0 vary by
release and enabled repositories. The Zig implementation uses Zig 0.16's
std.Io, so install Zig 0.16 or newer from your distro package, Snap, or
https://ziglang.org/download/ if apt install zig is older. For .NET, enable
Microsoft's package repository if your Ubuntu release does not provide
dotnet-sdk-9.0 directly.
Arch Linux:
sudo pacman -Syu --needed \
git just uv python \
rust go \
base-devel cmake pkgconf openssl asio \
zig jdk-openjdk nodejs npm dotnet-sdk \
lua luarocks lua-socket lua-secThe Java implementation is tested with OpenJDK 25 locally, but any recent JDK
with virtual-thread support should work. The C# implementation targets .NET 9;
on rolling distributions such as Arch, dotnet-sdk should be version 9 or
newer.
The Python reference implementation uses uv and just:
$ git clone https://github.com/cjrh/aiomsg
$ cd aiomsg/python-lib
$ just sync # create the venv with test + lint deps
$ just test # run the test suite
$ just lint # ruff check
Releasing the Python package is documented in python-lib/RELEASING.md;
in short, just release patch (from python-lib/) bumps the version,
tags, and pushes, and a pushed v* tag triggers PyPI publishing via GitHub
Actions.
See each implementation's own README for language-specific developer
instructions.