Split transport IO out of Session into a Connection handle#202
Split transport IO out of Session into a Connection handle#202Ddystopia wants to merge 2 commits into
Session into a Connection handle#202Conversation
…n handle `Session<'buf, IO>` previously owned the transport for the whole session lifetime, which baked a single `IO` type and lifetime into the session type. That made it impossible to reuse one `Session` across reconnects when the transport's lifetime changes between connections — most painfully over TLS, where the record buffers are borrowed for the connection's lifetime and so had to outlive the session (forcing a leak per connection). Drop the `IO` generic from `Session`. `Session::connect<IO>(io)` now returns a `Connection<'_, 'buf, IO>` handle that owns the transport and borrows the session. All network operations (drive/poll/recv/publish/subscribe/ unsubscribe/disconnect) live on the handle; session-state queries are reachable through `Deref` to `Session`. The same `Session` can now be reconnected with a fresh transport of a different `IO` type or buffer lifetime. The handle can be dropped or `mem::forget`-ed at any time (e.g. to release a TLS record-buffer borrow without running Drop), so correctness must not depend on the handle's Drop. All state carried across reconnects — in-flight QoS replay, keepalive timers, the inbound packet reader — is therefore reset at the start of every `connect()` rather than in a disconnect/Drop path. The old `handle_disconnect()` is split accordingly: its transport-drop is now handled by the handle's ownership, its replay/timer/reader resets move to `connect()`, and its liveness latch becomes a `connected` flag set at the sites that detect a fatal disconnect. Also: - `Connection::disconnect`/`disconnect_with` take `&mut self` so a cancelled disconnect can be retried. - `is_connected()` moves from `Session` to the `Connection` handle and reports per-connection liveness: it latches false the moment any operation observes a transport- or protocol-level disconnect (broker `DISCONNECT`, transport error, keepalive timeout) or a graceful `disconnect`, after which further operations on the handle fail fast with `Error::Disconnected` instead of driving the dead transport. - Ported the unit tests, both integration suites, and the README doctest to the handle API. BREAKING CHANGE: `Session` loses its `IO` type parameter and `Session::connect()` now returns a `Connection` handle that owns the transport; the network operations move from `Session` to the handle (session-state queries remain reachable via the handle's `Deref` to `Session`). `is_connected()` moves to `Connection`, and `Connection::disconnect`/`disconnect_with` take `&mut self`.
…leaking The `tls_public_broker` example used to `Box::leak` a fresh pair of TLS record buffers for every connection, because the old `Session<'buf, IO>` baked the transport's lifetime into the session type and so required the buffers to live as long as the session. Now that the transport lives in the `Connection` handle, the buffers only need to outlive a single connection. Rework the example to own one pair of record buffers and reuse them across a reconnect: it connects over TLS, does a publish/receive round-trip, drops the handle (releasing the buffers), then reconnects and reuses the same buffers for a second TLS session. No leaks, no per-connection allocation — which the previous API could not express.
2d21632 to
5a49ef6
Compare
|
Thanks for the contribution! The real issue is not “MQTT needs a connection handle API.” It is: The handle proposal fixes one class of lifetime pressure by moving IO out of The cleaner architecture is: Session = durable MQTT state Session can keep Option. The missing API is explicit detachment, not a new public connection model. Proposal: pub async fn disconnect(&mut self) -> Result<Option<IO>, Error<IO::Error>>;
pub async fn disconnect_with(
&mut self,
disconnect: Disconnect<'_>,
) -> Result<Option<IO>, Error<IO::Error>>;
pub fn take_connection(&mut self) -> Option<IO>;Rationale:
This is a smaller, more conservative change. It preserves minimq’s current model, avoids leaking TLS-specific lifetime mechanics into MQTT, and still solves the practical buffer-reuse problem. |
|
@jordens Look, the issue isn't in drop of the IO, but in the borrow-checker. The lifetime of the buffer is a type too. The borrow-checker doesn't know that |
|
Looky looky my friend! |
|
So you want to push the verification of your (probably) ideas on me, fine. Is this skeleton (playground) enough for you, or you want me to push a fork of minimq with Further notes: you might next say that I can feed the
The changes are pure boilerplate, just moving methods around, I didn't write it manually, why write boilerplate manually? Or do you suggest that responding without verifying anything is equivalent to the design process? cc @jordens |
|
Note: I don't need your human response. I need your human verification. Please keep the middle man between us, just run the code and fact check before saying. |
jordens
left a comment
There was a problem hiding this comment.
I wanted verification of your premise! That motivates taking IO out of Session and separating the persistent MQTT Session state from the transport/network layer.
This clearly isn't just boilerplate. This is a redesign. If it was boilerplate, we wouldn't be discussing. And if you expect me to review a ~1k line PR written from an AI, you can be sure I'll have an AI do that if anything. Do not complain about getting an answer from an AI for your AI PR.
I still want the public API kept narrower than this PR: Session should remain MQTT state, and the live transport lifetime should be represented by an explicit live connection/lease,
not via Deref-magic or stale "connected" flags on Session.
Regarding the design I was thinking typestate Session<'buf>::connect<IO>(self, io) -> Connection<'buf, IO> but I suspect that will make any Session owner/user a bit tricky and might also make cancel-safety harder. Happy to get input on that.
| impl<'buf, IO> core::ops::Deref for Connection<'_, 'buf, IO> { | ||
| type Target = Session<'buf>; | ||
|
|
||
| fn deref(&self) -> &Self::Target { | ||
| self.session | ||
| } | ||
| } |
There was a problem hiding this comment.
No Deref magic. Do pub fn session(&self) -> &Session.
| /// operation observes a fatal transport- or protocol-level disconnect. State carried to the | ||
| /// next connection is reset in `connect`, not here, so the latch does not depend on the | ||
| /// handle's `Drop` running. | ||
| connected: bool, |
There was a problem hiding this comment.
This needs to be live: bool on Connection. Otherwise dropping Connection leaves Session connected.
The reset of transport-local session state belongs in Session, but the live/dead latch belongs in Connection.
| //! Use this as a TLS transport example. Bounded/cooperative session driving is done by wrapping | ||
| //! the cancel-safe blocking `Session::poll()` in an external timeout at the call site. | ||
| //! It also showcases the connection-handle API: the transport lives in the | ||
| //! [`Connection`](minimq::Connection) handle returned by [`Session::connect`], not in the |
There was a problem hiding this comment.
Do not explain what it doesn't do. That's AI stink.
| /// Number of successful round-trips to perform before exiting. A real long-running client would | ||
| /// loop forever; we stop after a few so the example terminates while still reconnecting (and so | ||
| /// reusing the record buffers) more than once. | ||
| const ROUND_TRIPS: usize = 2; |
There was a problem hiding this comment.
That makes this example complicated and harder to read. Can we stick to the simple existing one? The AI comment lecturing is annoying! If all you need is removing the IO type from Session, we don't need an example to show that this PR does it.
| /// Open a TLS connection over the caller-owned record buffers. | ||
| /// | ||
| /// The returned connection borrows `read_record`/`write_record` for its own lifetime `'a`, not | ||
| /// `'static`, which is what lets the caller reuse the buffers once the connection is dropped. |
There was a problem hiding this comment.
Don't bother explaining what it doesn't do.
| /// Latch the current transport as disconnected. Called at every site that observes a fatal | ||
| /// transport- or protocol-level failure. Only flips the liveness flag; state needed for the | ||
| /// next connection is reset in `connect`. | ||
| pub(super) fn mark_disconnected(&mut self) { |
There was a problem hiding this comment.
That's not sufficient.
We need to call something like self.session.reset_transport_state_for_reconnect(); in Connection like the old handle_disconnect(). Otherwise e.g. can_publish() can be true while Connection has been dropped.
| /// | ||
| /// This is the graceful counterpart to simply dropping the handle: it sends the MQTT | ||
| /// `DISCONNECT` so the broker closes cleanly and suppresses the Will. Just dropping (or | ||
| /// [`mem::forget`](core::mem::forget)-ing) the handle skips this and is treated by the broker |
There was a problem hiding this comment.
What's with this repeated riffing on about mem::forget()?
| * **Breaking:** `Session` no longer carries the transport `IO` type parameter. `Session::connect()` | ||
| now returns a `Connection` handle that owns the transport and borrows the session; the network | ||
| operations (`drive`/`poll`/`recv`/`publish`/`subscribe`/`unsubscribe`/`disconnect`) move onto the | ||
| handle, while session-state queries remain reachable through its `Deref` to `Session`. This lets a | ||
| single `Session` be reused across reconnects with transports of different types or buffer | ||
| lifetimes — for example reusing TLS record buffers instead of leaking a fresh pair per connection. | ||
| * **Breaking:** `is_connected()` moves from `Session` to the `Connection` handle and now reports | ||
| *per-connection* liveness: it latches `false` as soon as any operation observes a transport- or | ||
| protocol-level disconnect (broker `DISCONNECT`, transport error, keepalive timeout) or a graceful | ||
| `disconnect`, after which further operations on the handle fail fast with `Error::Disconnected` | ||
| instead of driving the dead transport. |
There was a problem hiding this comment.
Shorten this. Remove the rationalizing.
| } | ||
|
|
||
| #[test] | ||
| fn can_publish_requires_a_live_connection() { |
There was a problem hiding this comment.
That's a useless test. It only tests its own harness.
And it doesn't show the problem on dropped Connection.
| /// Dropping (or [`mem::forget`](core::mem::forget)-ing) the handle releases the transport and the | ||
| /// session borrow; the same `Session` can then be reconnected with a fresh transport — possibly of | ||
| /// a different `IO` type or buffer lifetime. State needed across reconnects (in-flight QoS replay, | ||
| /// keepalive timers, the inbound packet reader) is reset at the start of the next | ||
| /// [`Session::connect`](Session::connect), so correctness does not depend on this handle's `Drop` | ||
| /// running. | ||
| /// | ||
| /// Note that dropping or forgetting the handle is an *ungraceful* MQTT close: **no `DISCONNECT` | ||
| /// packet is sent** (a sync `Drop` cannot perform the async write), so the broker sees an abnormal | ||
| /// disconnect and will publish the configured Will, if any. Whether the underlying transport is | ||
| /// actually closed is up to the `IO`'s own `Drop`. For a clean shutdown — `DISCONNECT` sent, Will | ||
| /// suppressed — call [`disconnect`](Self::disconnect) (or [`disconnect_with`](Self::disconnect_with)) | ||
| /// before dropping. Reconnecting afterwards with [`Session::connect`](Session::connect) resumes the | ||
| /// broker session via `CONNECT` `clean_start=false`; see [`ConnectEvent`](crate::ConnectEvent). |
There was a problem hiding this comment.
It's doesn't "release". It leaks!
All this doesn't belong here anyway.
|
Great, thanks for the review. I'll follow up around Monday. |
Motivation
Session<'buf, IO>owns the transport for the whole session lifetime, which bakes a singleIOtype and lifetime into the session type. That makes it impossible to reuse one
Sessionacrossreconnects when the transport's lifetime differs between connections - most painfully over TLS,
where the record buffers are borrowed for the connection's lifetime and so have to outlive the
session. In practice that forces a fresh allocation (a leak) per connection: the old
tls_public_brokerexampleBox::leaks a new record-buffer pair every time it connects.What changed
Sessionloses itsIOgeneric.Session::connect::<IO>(io)now returns aConnection<'_, 'buf, IO>handle that owns the transport and borrows the session:drive/poll/recv/publish/subscribe/unsubscribe/disconnect)live on the handle; session-state queries (
is_pending,can_publish, …) are reachable throughDereftoSession.connect_event()exposes theConnected/Reconnectedresult.mem::forget-safe by construction. The handle may be dropped ormem::forget-ed at anytime (e.g. to release a TLS record-buffer borrow without running
Drop), so correctness can'tdepend on
Drop. All state carried across reconnects - in-flight QoS replay, keepalive timers,the inbound packet reader - is reset at the start of every
connect()instead of in adisconnect/
Droppath. The oldhandle_disconnect()is split accordingly: transport-drop -> thehandle's ownership; replay/timer/reader resets ->
connect(); and its liveness latch -> aconnectedflag set at the sites that detect a fatal disconnect.is_connected()moves to theConnectionhandle and reports per-connection liveness: itlatches
falsethe moment any operation observes a transport- or protocol-level disconnect(broker
DISCONNECT, transport error, keepalive timeout) or a gracefuldisconnect, after whichfurther operations fail fast with
Error::Disconnectedinstead of driving a dead transport.Connection::disconnect/disconnect_withtake&mut selfso a cancelled disconnect can beretried.
Drop vs. graceful disconnect
Dropping (or
mem::forget-ing) the handle is an ungraceful MQTT close: noDISCONNECTpacketis sent (a sync
Dropcannot perform the async write), so the broker sees an abnormal disconnectand will publish the configured Will, if any. Reconnecting afterwards resumes the broker session
(
CONNECTclean_start=false->Reconnected). For a clean shutdown -DISCONNECTsent, Willsuppressed - call
conn.disconnect().awaitbefore dropping. This is documented onConnection,connect, anddisconnect, and covered by a test asserting a bare drop emits noDISCONNECTandthat the next
connectresumes.Showcase
The
tls_public_brokerexample is reworked to own one pair of TLS record buffers and reuse themacross a reconnect - connect, round-trip, drop the handle, reconnect reusing the same buffers -
with no
Box::leakand no per-connection allocation. The previous API could not express this.Testing
Whole suite green: 68 unit tests, 59
async_clientintegration tests, 2real_brokertests, 3 doctests - all ported to the handle API, plus new regression tests for the disconnect
latch and the ungraceful-drop/resume behavior.
Breaking changes
Sessionloses itsIOtype parameter;Session::connect()returns aConnectionhandle.Sessionto the handle (queries remain via the handle'sDeref).is_connected()moves fromSessiontoConnection.Connection::disconnect/disconnect_withtake&mut self.The README, all examples, and the test suite are updated to match. See the
CHANGELOG.md[Unreleased]entry.