Skip to content

Split transport IO out of Session into a Connection handle#202

Open
Ddystopia wants to merge 2 commits into
quartiq:masterfrom
Ddystopia:session-io-handle
Open

Split transport IO out of Session into a Connection handle#202
Ddystopia wants to merge 2 commits into
quartiq:masterfrom
Ddystopia:session-io-handle

Conversation

@Ddystopia

@Ddystopia Ddystopia commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Motivation

Session<'buf, IO> owns the transport for the whole session lifetime, which bakes a single IO
type and lifetime into the session type. That makes it impossible to reuse one Session across
reconnects when the transport's lifetime differs between connections - most painfully over TLS,
where the record buffers are borrowed for the connection's lifetime and so have to outlive the
session. In practice that forces a fresh allocation (a leak) per connection: the old
tls_public_broker example Box::leaks a new record-buffer pair every time it connects.

What changed

Session loses its IO generic. Session::connect::<IO>(io) now returns a
Connection<'_, 'buf, IO> handle that owns the transport and borrows the session:

let mut session = Session::new(config);          // no IO in the type
loop {
    let io = open_transport().await?;            // a fresh IO, any lifetime
    let mut conn = session.connect(io).await?;   // handle owns io, borrows session
    let op = conn.publish(/* … */).await?;
    while conn.is_pending(&op) { conn.poll().await?; }
    // conn drops here -> transport (and its buffer borrows) released -> reuse next loop
}
  • All network operations (drive/poll/recv/publish/subscribe/unsubscribe/disconnect)
    live on the handle; session-state queries (is_pending, can_publish, …) are reachable through
    Deref to Session. connect_event() exposes the Connected/Reconnected result.
  • mem::forget-safe by construction. The handle may be dropped or mem::forget-ed at any
    time (e.g. to release a TLS record-buffer borrow without running Drop), so correctness can't
    depend on Drop. All state carried across reconnects - in-flight QoS replay, keepalive timers,
    the inbound packet reader - is reset at the start of every connect() instead of in a
    disconnect/Drop path. The old handle_disconnect() is split accordingly: transport-drop -> the
    handle's ownership; replay/timer/reader resets -> connect(); and its liveness latch -> a
    connected flag set at the sites that detect a fatal disconnect.
  • is_connected() moves to the Connection handle and reports per-connection liveness: it
    latches false the moment any operation observes a transport- or protocol-level disconnect
    (broker DISCONNECT, transport error, keepalive timeout) or a graceful disconnect, after which
    further operations fail fast with Error::Disconnected instead of driving a dead transport.
  • Connection::disconnect/disconnect_with take &mut self so a cancelled disconnect can be
    retried.

Drop vs. graceful disconnect

Dropping (or mem::forget-ing) the handle is an ungraceful MQTT close: no DISCONNECT packet
is sent (a sync Drop cannot perform the async write), so the broker sees an abnormal disconnect
and will publish the configured Will, if any. Reconnecting afterwards resumes the broker session
(CONNECT clean_start=false -> Reconnected). For a clean shutdown - DISCONNECT sent, Will
suppressed - call conn.disconnect().await before dropping. This is documented on Connection,
connect, and disconnect, and covered by a test asserting a bare drop emits no DISCONNECT and
that the next connect resumes.

Showcase

The tls_public_broker example is reworked to own one pair of TLS record buffers and reuse them
across a reconnect
- connect, round-trip, drop the handle, reconnect reusing the same buffers -
with no Box::leak and no per-connection allocation. The previous API could not express this.

Testing

Whole suite green: 68 unit tests, 59 async_client integration tests, 2 real_broker
tests, 3 doctests - all ported to the handle API, plus new regression tests for the disconnect
latch and the ungraceful-drop/resume behavior.

Breaking changes

  • Session loses its IO type parameter; Session::connect() returns a Connection handle.
  • Network operations move from Session to the handle (queries remain via the handle's Deref).
  • is_connected() moves from Session to Connection.
  • Connection::disconnect/disconnect_with take &mut self.

The README, all examples, and the test suite are updated to match. See the CHANGELOG.md
[Unreleased] entry.

…n handle

`Session<'buf, IO>` previously owned the transport for the whole session
lifetime, which baked a single `IO` type and lifetime into the session type.
That made it impossible to reuse one `Session` across reconnects when the
transport's lifetime changes between connections — most painfully over TLS,
where the record buffers are borrowed for the connection's lifetime and so
had to outlive the session (forcing a leak per connection).

Drop the `IO` generic from `Session`. `Session::connect<IO>(io)` now returns a
`Connection<'_, 'buf, IO>` handle that owns the transport and borrows the
session. All network operations (drive/poll/recv/publish/subscribe/
unsubscribe/disconnect) live on the handle; session-state queries are reachable
through `Deref` to `Session`. The same `Session` can now be reconnected with a
fresh transport of a different `IO` type or buffer lifetime.

The handle can be dropped or `mem::forget`-ed at any time (e.g. to release a
TLS record-buffer borrow without running Drop), so correctness must not depend
on the handle's Drop. All state carried across reconnects — in-flight QoS
replay, keepalive timers, the inbound packet reader — is therefore reset at the
start of every `connect()` rather than in a disconnect/Drop path. The old
`handle_disconnect()` is split accordingly: its transport-drop is now handled by
the handle's ownership, its replay/timer/reader resets move to `connect()`, and
its liveness latch becomes a `connected` flag set at the sites that detect a
fatal disconnect.

Also:
- `Connection::disconnect`/`disconnect_with` take `&mut self` so a cancelled
  disconnect can be retried.
- `is_connected()` moves from `Session` to the `Connection` handle and reports
  per-connection liveness: it latches false the moment any operation observes a
  transport- or protocol-level disconnect (broker `DISCONNECT`, transport error,
  keepalive timeout) or a graceful `disconnect`, after which further operations
  on the handle fail fast with `Error::Disconnected` instead of driving the dead
  transport.
- Ported the unit tests, both integration suites, and the README doctest to the
  handle API.

BREAKING CHANGE: `Session` loses its `IO` type parameter and `Session::connect()`
now returns a `Connection` handle that owns the transport; the network
operations move from `Session` to the handle (session-state queries remain
reachable via the handle's `Deref` to `Session`). `is_connected()` moves to
`Connection`, and `Connection::disconnect`/`disconnect_with` take `&mut self`.
…leaking

The `tls_public_broker` example used to `Box::leak` a fresh pair of TLS record
buffers for every connection, because the old `Session<'buf, IO>` baked the
transport's lifetime into the session type and so required the buffers to live
as long as the session.

Now that the transport lives in the `Connection` handle, the buffers only need
to outlive a single connection. Rework the example to own one pair of record
buffers and reuse them across a reconnect: it connects over TLS, does a
publish/receive round-trip, drops the handle (releasing the buffers), then
reconnects and reuses the same buffers for a second TLS session. No leaks, no
per-connection allocation — which the previous API could not express.
@Ddystopia Ddystopia force-pushed the session-io-handle branch from 2d21632 to 5a49ef6 Compare June 18, 2026 11:25
@jordens

jordens commented Jun 18, 2026

Copy link
Copy Markdown
Member

Thanks for the contribution!
This identifies a real issue but picks too large a solution.

The real issue is not “MQTT needs a connection handle API.” It is: Session<'buf, IO> owns Option<IO>, and some embedded TLS transports borrow connection-local buffers. If minimq drops IO internally on disconnect/error, the caller cannot explicitly recover or release/recycle the TLS object and its borrowed buffers at the right phase boundary.

The handle proposal fixes one class of lifetime pressure by moving IO out of Session, but it spreads the live transport lifetime through the public MQTT API. It also keeps monomorphization in practice because methods become generic over IO; and if users truly need dynamic transport switching, dyn Io is already a plausible escape hatch. Different concrete IO types across one durable MQTT session do not seem important enough to justify redesigning the common API.

The cleaner architecture is:

Session = durable MQTT state
IO = current live transport
TLS/socket buffers = transport-layer resources owned/lent by the app

Session can keep Option. The missing API is explicit detachment, not a new public connection model.

Proposal:

pub async fn disconnect(&mut self) -> Result<Option<IO>, Error<IO::Error>>;
pub async fn disconnect_with(
    &mut self,
    disconnect: Disconnect<'_>,
) -> Result<Option<IO>, Error<IO::Error>>;

pub fn take_connection(&mut self) -> Option<IO>;

Rationale:

  • disconnect() sends MQTT DISCONNECT, flushes, then returns the detached IO.
  • Ok(None) keeps current idempotent “already disconnected” behavior.
  • take_connection() gives an explicit ungraceful detach path for dead/abandoned transports.
  • connect(io) can remain Error::NotReady if an IO is still attached.
  • The TLS-buffer lifecycle becomes simple: disconnect/take old IO, drop it, reuse buffers, build new TLS IO, reconnect.
  • Cancellation safety is preserved if disconnect() writes through self.connection.as_mut() and only take()s after successful completion.

This is a smaller, more conservative change. It preserves minimq’s current model, avoids leaking TLS-specific lifetime mechanics into MQTT, and still solves the practical buffer-reuse problem.

@Ddystopia

Copy link
Copy Markdown
Contributor Author

@jordens Look, the issue isn't in drop of the IO, but in the borrow-checker. The lifetime of the buffer is a type too. The borrow-checker doesn't know that pub fn take_connection(&mut self) -> Option<IO>; actually moved out the IO from the inside. If you don't want a contribution then close a PR, I won't contribute anything ever if you'll be spitting AI nonsense without verifying it, I'm sorry for harsh words.

@jordens

jordens commented Jun 18, 2026

Copy link
Copy Markdown
Member

Looky looky my friend!
The borrow checker absolutely tracks that moving the IO out releases the borrow! That is exactly why Option::take() is the relevant operation here.
In your use case and the (expanded) TLS example the Session<'buf, IO> stays compatible across reconnects.
What you need is something that returns the IO without dropping it so the borrow ends!
If you have a concrete reduced example where take_connection(); drop(io); make_new_tls_from_same_buffers(); session.connect(new_io) still fails, that would be valuable. Without that, your handle rewrite is unmotivated.
Let whoever/whatever wrote your commits read the proposal and give it a try. Assure me that you wrote that PR and commits manually and you'll get a human response.

@Ddystopia

Ddystopia commented Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

So you want to push the verification of your (probably) ideas on me, fine. Is this skeleton (playground) enough for you, or you want me to push a fork of minimq with take_connection?

Further notes: you might next say that I can feed the io1 into the next connect (different from your request) and it will compile, but won't work: new transport has to be created, closed transport can't be reused.

Assure me that you wrote that PR and commits manually and you'll get a human response.

The changes are pure boilerplate, just moving methods around, I didn't write it manually, why write boilerplate manually? Or do you suggest that responding without verifying anything is equivalent to the design process?

cc @jordens

@Ddystopia

Ddystopia commented Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Note: I don't need your human response. I need your human verification. Please keep the middle man between us, just run the code and fact check before saying.

@jordens jordens left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted verification of your premise! That motivates taking IO out of Session and separating the persistent MQTT Session state from the transport/network layer.
This clearly isn't just boilerplate. This is a redesign. If it was boilerplate, we wouldn't be discussing. And if you expect me to review a ~1k line PR written from an AI, you can be sure I'll have an AI do that if anything. Do not complain about getting an answer from an AI for your AI PR.

I still want the public API kept narrower than this PR: Session should remain MQTT state, and the live transport lifetime should be represented by an explicit live connection/lease,
not via Deref-magic or stale "connected" flags on Session.

Regarding the design I was thinking typestate Session<'buf>::connect<IO>(self, io) -> Connection<'buf, IO> but I suspect that will make any Session owner/user a bit tricky and might also make cancel-safety harder. Happy to get input on that.

Comment on lines +183 to +189
impl<'buf, IO> core::ops::Deref for Connection<'_, 'buf, IO> {
type Target = Session<'buf>;

fn deref(&self) -> &Self::Target {
self.session
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No Deref magic. Do pub fn session(&self) -> &Session.

/// operation observes a fatal transport- or protocol-level disconnect. State carried to the
/// next connection is reset in `connect`, not here, so the latch does not depend on the
/// handle's `Drop` running.
connected: bool,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be live: bool on Connection. Otherwise dropping Connection leaves Session connected.

The reset of transport-local session state belongs in Session, but the live/dead latch belongs in Connection.

//! Use this as a TLS transport example. Bounded/cooperative session driving is done by wrapping
//! the cancel-safe blocking `Session::poll()` in an external timeout at the call site.
//! It also showcases the connection-handle API: the transport lives in the
//! [`Connection`](minimq::Connection) handle returned by [`Session::connect`], not in the

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not explain what it doesn't do. That's AI stink.

/// Number of successful round-trips to perform before exiting. A real long-running client would
/// loop forever; we stop after a few so the example terminates while still reconnecting (and so
/// reusing the record buffers) more than once.
const ROUND_TRIPS: usize = 2;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes this example complicated and harder to read. Can we stick to the simple existing one? The AI comment lecturing is annoying! If all you need is removing the IO type from Session, we don't need an example to show that this PR does it.

/// Open a TLS connection over the caller-owned record buffers.
///
/// The returned connection borrows `read_record`/`write_record` for its own lifetime `'a`, not
/// `'static`, which is what lets the caller reuse the buffers once the connection is dropped.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't bother explaining what it doesn't do.

/// Latch the current transport as disconnected. Called at every site that observes a fatal
/// transport- or protocol-level failure. Only flips the liveness flag; state needed for the
/// next connection is reset in `connect`.
pub(super) fn mark_disconnected(&mut self) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not sufficient.
We need to call something like self.session.reset_transport_state_for_reconnect(); in Connection like the old handle_disconnect(). Otherwise e.g. can_publish() can be true while Connection has been dropped.

///
/// This is the graceful counterpart to simply dropping the handle: it sends the MQTT
/// `DISCONNECT` so the broker closes cleanly and suppresses the Will. Just dropping (or
/// [`mem::forget`](core::mem::forget)-ing) the handle skips this and is treated by the broker

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's with this repeated riffing on about mem::forget()?

Comment thread CHANGELOG.md
Comment on lines +13 to +23
* **Breaking:** `Session` no longer carries the transport `IO` type parameter. `Session::connect()`
now returns a `Connection` handle that owns the transport and borrows the session; the network
operations (`drive`/`poll`/`recv`/`publish`/`subscribe`/`unsubscribe`/`disconnect`) move onto the
handle, while session-state queries remain reachable through its `Deref` to `Session`. This lets a
single `Session` be reused across reconnects with transports of different types or buffer
lifetimes — for example reusing TLS record buffers instead of leaking a fresh pair per connection.
* **Breaking:** `is_connected()` moves from `Session` to the `Connection` handle and now reports
*per-connection* liveness: it latches `false` as soon as any operation observes a transport- or
protocol-level disconnect (broker `DISCONNECT`, transport error, keepalive timeout) or a graceful
`disconnect`, after which further operations on the handle fail fast with `Error::Disconnected`
instead of driving the dead transport.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shorten this. Remove the rationalizing.

}

#[test]
fn can_publish_requires_a_live_connection() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a useless test. It only tests its own harness.
And it doesn't show the problem on dropped Connection.

Comment on lines +163 to +176
/// Dropping (or [`mem::forget`](core::mem::forget)-ing) the handle releases the transport and the
/// session borrow; the same `Session` can then be reconnected with a fresh transport — possibly of
/// a different `IO` type or buffer lifetime. State needed across reconnects (in-flight QoS replay,
/// keepalive timers, the inbound packet reader) is reset at the start of the next
/// [`Session::connect`](Session::connect), so correctness does not depend on this handle's `Drop`
/// running.
///
/// Note that dropping or forgetting the handle is an *ungraceful* MQTT close: **no `DISCONNECT`
/// packet is sent** (a sync `Drop` cannot perform the async write), so the broker sees an abnormal
/// disconnect and will publish the configured Will, if any. Whether the underlying transport is
/// actually closed is up to the `IO`'s own `Drop`. For a clean shutdown — `DISCONNECT` sent, Will
/// suppressed — call [`disconnect`](Self::disconnect) (or [`disconnect_with`](Self::disconnect_with))
/// before dropping. Reconnecting afterwards with [`Session::connect`](Session::connect) resumes the
/// broker session via `CONNECT` `clean_start=false`; see [`ConnectEvent`](crate::ConnectEvent).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's doesn't "release". It leaks!

All this doesn't belong here anyway.

@Ddystopia

Copy link
Copy Markdown
Contributor Author

Great, thanks for the review. I'll follow up around Monday.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants