Skip to content

[2.0] Refactor MQTT session management to use store-based persistence#1806

Open
jfallows wants to merge 45 commits into
developfrom
claude/mqtt-session-kafka-removal-rGtCy
Open

[2.0] Refactor MQTT session management to use store-based persistence#1806
jfallows wants to merge 45 commits into
developfrom
claude/mqtt-session-kafka-removal-rGtCy

Conversation

@jfallows

@jfallows jfallows commented May 29, 2026

Copy link
Copy Markdown
Contributor

Description

This PR refactors MQTT session management to use a store-based persistence model instead of Kafka consumer group coordination. The changes introduce a new session storage mechanism that allows MQTT brokers to persist and retrieve session state independently, enabling better session takeover and redirect capabilities.

Fixes #1798
Fixes #1799
Fixes #1800

Key Changes

Core Session Management:

  • Introduced MqttKafkaSessionOffsetsHelper to manage session offset persistence in a dedicated store
  • Added MqttKafkaSessionOffsetsFW type for serializing session offset metadata
  • Removed Kafka consumer group-based session coordination (KafkaGroup, KafkaGroupBeginExFW, KafkaGroupFlushExFW)
  • Updated MqttKafkaSessionFactory to use the new store-based approach instead of group coordination

MQTT Server Enhancements:

  • Added SERVER_BUSY reason code support for connection rejection
  • Introduced InstanceId class for unique broker instance identification
  • Added Closeable interface support for proper resource cleanup
  • Enhanced session takeover and redirect handling with timestamp-based tracking

Configuration & Storage:

  • Added memory store configuration support to MQTT options
  • Updated MqttConfiguration to support configurable session stores
  • Modified MqttBindingConfig to include store references
  • Updated example configuration to include memory store definition

Test Updates:

  • Added MqttKafkaSessionOffsetsHelperTest for session offset persistence logic
  • Removed obsolete Kafka group-based session test scenarios
  • Updated MQTT-Kafka integration tests to work with new session model
  • Added new MQTT session redirect and takeover test scenarios

Specification Changes:

  • Updated IDL to include MqttKafkaSessionOffsets structure
  • Removed test scripts for deprecated Kafka group coordination flows
  • Simplified session management test scripts by removing group-based coordination
  • Added new test scenarios for store-based session management

Motivation

The previous Kafka consumer group-based approach had limitations for session management across multiple broker instances. The new store-based model provides:

  • Better isolation between session state and message publishing
  • Simpler session takeover and redirect logic
  • Support for external session stores (memory, persistent, etc.)
  • Cleaner separation of concerns between session management and message flow

Testing

Existing unit and integration tests have been updated to validate the new session persistence model. The test suite covers session creation, takeover, redirect, and expiry scenarios with the store-based approach.

https://claude.ai/code/session_01Gaf31VM5LZAFmUvtuBmqfz

claude added 30 commits May 26, 2026 22:33
)

Add an optional, currently-unused `store` field to the mqtt-kafka binding
options so configurations can prepare for StoreHandler-based session
ownership coordination. When `store` is absent, log a one-time deprecation
warning per binding instance announcing the upcoming requirement. Existing
highlander-group ownership behavior is unchanged.

https://claude.ai/code/session_0186rZE8mW1U947XTN4kzZ1x
Add a failing schema test asserting that an mqtt-kafka proxy configuration
without the 'store' option is rejected, ahead of making 'store' required as
part of migrating session ownership to the Store SPI. The companion
proxy.missing.store.yaml omits 'store'; validation currently accepts it
(store remains optional per #1797), so the test is red until the schema
marks 'store' required.

https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
Introduce internal MqttKafkaConfiguration Duration properties
session.lease and session.renew, sizing the StoreHandler-based session
ownership lease TTL and renewal interval. Defaults (30s lease, 10s renew)
favor fast cooperative takeover with a bounded TTL fallback. These are
runtime tunables, not binding options, and will be consumed by the
Store-based ownership coordination in MqttKafkaSessionFactory.

https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
…1798)

Mark the 'store' option required in the mqtt-kafka schema and declare a
store on every binding configuration, completing the deprecation begun in
#1797. Spec/test configs reference a 'test' store; the runnable example
uses a 'memory' store. SchemaTest now also registers the engine test-store
schema patch so 'type: test' validates, and shouldRejectProxyWithoutStore
passes now that the option is required.

https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
Resolve the configured store reference to a StoreHandler when attaching an
mqtt-kafka binding, exposing it on MqttKafkaBindingConfig for the session
ownership coordination that will replace the Kafka consumer-group protocol.
Mirrors the binding.resolveId + supplyStore pattern used by binding-mcp.

https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
Define MqttKafkaSessionOffsets / MqttKafkaSessionOffsetMetadata flyweights
to carry per-(topic,partition) consumed offset, idempotent producer state,
and in-flight QoS packet IDs as a single compacted mqtt-sessions record,
replacing per-clientId Kafka consumer-group offset commit metadata.
…1799)

KafkaSessionOffsetsHelper encodes/decodes the per-session QoS offset state
(per-(topic,partition) consumed offset, idempotent producer state, in-flight
packet IDs) to/from the MqttKafkaSessionOffsets flyweight, with a round-trip
unit test. Adds partitionId/producerSequence carriers to KafkaOffsetMetadata.
)

Add mqtt_kafka:sessionOffsets() / matchSessionOffsets() functions so .rpt
scripts can construct and assert the MqttKafkaSessionOffsets blob as a
mqtt-sessions record value, with round-trip and match/mismatch unit tests.
#1799)

Replace the per-clientId consumer-group offsetFetch/offsetCommit Kafka
choreography with a client#offsets record on the compacted mqtt-sessions
topic: a single merged FETCH reads the prior offsets/producer/in-flight
state, and each commit point becomes a merged PRODUCE of a full
MqttKafkaSessionOffsets snapshot. initProducerId + meta retained; #migrate
and highlander group left for a later removal pass.

Note: k3po ITs cannot run in the current execution environment (control
agent fails to start), so this scenario is validated by CI, not locally.
)

Replace the per-clientId Kafka consumer-group offsetFetch/offsetCommit
choreography with a client#offsets record on the compacted mqtt-sessions
topic. Session resume reads the record via a merged FETCH_ONLY stream and
decodes the MqttKafkaSessionOffsets blob (offsets, idempotent producer
state, in-flight QoS2 packet IDs); each commit point produces a full
snapshot via a merged PRODUCE_ONLY stream keyed clientId#offsets. meta,
initProducerId, group and #migrate are unchanged.

Verified: MqttKafkaPublishProxyIT#shouldSendMessageQos2 passes against the
engine; KafkaIT#shouldPublishQoS2Message and the helper unit test stay green.
Sibling QoS2 scenarios remain on the old choreography pending spec rewrite.
…o #offsets (#1799)

Rewrite the publish.qos2.retained and publish.mixture.qos kafka scripts to
the client#offsets record model. Verified against the engine:
MqttKafkaPublishProxyIT#shouldSendMessageQos2Retained and
#shouldSendMessageMixtureQos both pass.
…1799)

During session resume the offset-fetch branch created the session stream
immediately AND again after the seed #offsets produce was acked, so the
duplicate stream left the PUBREL->doCommitOffsetComplete->produce completion
path unreachable (no PUBCOMP). Drop the premature doCreateSessionStream so
recovery matches the fresh-session path (session stream created once, after
the seed snapshot is acked). Migrates publish.qos2.recovery spec to #offsets.

Verified: MqttKafkaPublishProxyIT#shouldSendMessageQos2DuringRecovery passes,
no regression across the qos2/retained/mixture scenarios.
…ort specs to #offsets (#1799)

Rewrite publish.qos2.init.producer.abort and publish.qos2.offset.commit.abort
phase1/phase2 kafka scripts to the client#offsets model (FETCH_ONLY preamble
and PRODUCE_ONLY commit streams). Verified against the engine: the three
corresponding MqttKafkaPublishProxyIT abort methods pass (meta.abort already
passed unchanged).
…ct (#1799)

When the merged FETCH_ONLY client#offsets stream aborts before delivering
the offsets record, the QoS2 mqtt reply is not yet open (deferred for
publishQosMax==2), so doMqttAbort on the unopened reply was unobservable and
the client hung. Also reset the mqtt initial in KafkaOffsetFetchStream
onKafkaAbort, mirroring how meta.abort already surfaces connect-abort.
Migrates publish.qos2.offset.fetch.abort spec to #offsets.

Verified: full MqttKafkaPublishProxyIT passes (36 tests, 0 failures).
…ry (#1799)

Rename the mqtt-sessions record key postfixes #will-signal -> #will and
#expiry-signal -> #expiry across the session factory constants and all 65
affected spec scenarios. The type header values (will-signal/expiry-signal)
and the will-message key prefix (#will-) are unchanged; clientId extraction
strips the constant's length so it stays correct. Clean rename, no legacy
read-both fallback.

Verified against the engine: MqttKafkaSessionProxyIT (33), MqttKafkaPublishProxyIT
(36) and MqttKafkaSubscribeProxyIT (54) all pass.
…1798)

Thread the configured StoreHandler into MqttSessionProxy and coordinate
single-owner session ownership through the store lease primitives. On
session begin the proxy locks clientId#owner for the session.lease TTL;
on success it renews the lease on the session.renew interval via a new
SIGNAL_RENEW_SESSION_OWNERSHIP timer, and releases the lock on end,
abort, and reset.

This is the additive foundation for cooperative takeover: ownership is
recorded in the store alongside the existing highlander group, which
remains until the cooperative takeover and redirect paths replace it.
No wire frames change, so existing session behavior is unaffected.
Build the cooperative-takeover protocol on top of the store-backed
ownership lease added previously.

On session begin the proxy reads the current owner record for the
clientId from the store. When the owner is a different replica with a
strong (serverRef-advertised) identity, the client is redirected to it
via an MQTT reset carrying the server reference; otherwise the proxy
publishes its own ownership record and claims the lease cooperatively.

The owning proxy watches its ownership key; when a different replica
publishes a competing record it stops renewal, releases the lock, and
closes its session so the challenger can take over. If the incumbent is
unresponsive and never yields, the contending proxy steals the lease
once it expires via a bounded retry. Identity strength is derived from
the configured serverRef and published in the ownership record.

This runs alongside the existing highlander group, which remains as the
deprecated fallback until it is removed. Existing publish, subscribe and
session integration tests pass unchanged.
… hostname (#1800)

Source the replica's externally-reachable identity from the engine
'service.hostname' configuration (zilla.engine.service.hostname) rather
than only the per-binding 'serverRef' option, reusing the existing engine
configuration surface already consumed by binding-mcp.

The binding-level 'serverRef' continues to work and takes precedence when
set, emitting a deprecation warning that points to the engine service
hostname. When 'serverRef' is absent, the engine service hostname supplies
the strong-identity address published in the store ownership record; when
neither is configured the replica identity remains weak.
…isenberg-NjXPY

# Conflicts:
#	runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java
#	specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.store.yaml
#	specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/config/SchemaTest.java
Restore 100% instruction coverage on the binding-mqtt-kafka.spec module
by exercising every mismatch branch of the MqttKafkaSessionOffsetsMatcher:
per-field mismatches (partition, consumed offset, producer id/epoch/sequence,
packet ids), version mismatch, entry-count mismatch, the no-constraint
(null) result, version-only match, and the empty-buffer path.
…1798)

Delete the session.group.* scenarios and their IT methods. These exercise
the Kafka consumer-group coordinator (describe-config/session-timeout/
not-authorized validation and server-sent reset) that is being removed in
favor of store-based cooperative session ownership; three were already
@ignore'd. Drops the now-unused @ignore import.
…#1798)

mqtt-kafka now requires a store for cooperative session ownership, which
broke the asyncapi proxy whose composite generates an mqtt-kafka binding
with no store. Generate a memory store in the composite namespace and
reference it from the generated mqtt-kafka binding; add store-memory as a
provided dependency so the type resolves in tests (the dist already
bundles it).
…ator

The OWNERSHIP_FIELD_SEPARATOR char literal embedded a raw NUL byte, which
caused git, grep, and ripgrep to treat MqttKafkaSessionFactory.java as a
binary file. Use the equivalent unicode escape instead so the source stays
text and diffs and searches work.
…ngle identity token

Store one identity token per clientId#owner record instead of a
separator-delimited tuple. A strong (externally-addressable) owner stores its
server reference verbatim; a weak owner stores its opaque replica identity
behind an ETag-style "W/" prefix. This drops the redundant second identity
field and the unused acquisition timestamp, and removes the field separator
entirely so the record is a plain string that test stores can seed from YAML.
The session owner token / redirect server reference is a bare hostname; the
listener port is implicit per service and shared across replicas, so the
service.hostname property carries no port.
…rence

The redirect target is the owning replica's hostname; the listener port is
implicit per service, so the redirect server reference carries no port.
…nership

Remove the pre-Store session-ownership machinery now that Store-based
cooperative ownership is in place:

- Delete the highlander Kafka consumer-group election (KafkaGroupStream) and
  the #migrate compacted-topic takeover signal (KafkaSessionSignalStream,
  sendMigrateSignal, MIGRATE_KEY_POSTFIX, clientIdMigrate).
- Stop creating the mqtt-clients consumer group and stop carrying the redirect
  hint on KafkaResetEx.consumerId across the session, publish, and subscribe
  streams; redirect now flows from the Store ownership record.
- Resolve Store ownership before opening any Kafka stream: onOwnershipLocked
  drives session establishment (qos<2 replies and opens the session-state
  stream; qos2 fetches metadata/offsets first), and the will-fetch flush feeds
  into the same path. A strong remote owner is redirected with no Kafka stream
  opened.
- Drop the "highlander" protocol handling from KafkaClientGroupFactory.

KafkaResetEx.consumerId remains in the Kafka IDL (still used by binding-kafka).
…n.subscribe kafka script

Strip the #migrate produce stream, the highlander group stream, and the
mqtt-clients groupId + #migrate merged-stream filter from the session.subscribe
kafka scripts. The proxy now opens a single session-state stream keyed on the
clientId once it owns the session via the store. Verified against the engine:
MqttKafkaSessionProxyIT.shouldSubscribeSaveSubscriptionsInSession passes.
Seed the test store with a strong remote owner (client-1#owner =
mqtt-2.example.com) so the proxy resolves a different strong-identity owner on
connect and redirects the client, exercising the store-based redirect path
without any Kafka session stream.
…subscribe specs

Remove the #migrate produce streams, highlander group streams, mqtt-clients
groupId, #migrate merged-stream filters, and consumerId redirect hints from the
kafka-side spec scripts across all session/publish/subscribe scenarios, matching
the removed ownership machinery. Verified against the engine: publish (36/36),
subscribe (54/54), and the session scenarios (except the cross-replica ones and
session.exists.clean.start, handled separately) pass; peer-to-peer KafkaIT and
MqttIT are green.

Also drop session.connect.override.{max,min}.session.expiry: the server-side
session-expiry override was a side effect of the Kafka consumer-group
session.timeout negotiation, which no longer exists after removing the group.
claude added 9 commits May 27, 2026 23:23
…-timeout under analysis)

Redesign session.redirect for the store-based ownership model:
- Factory: on a strong remote owner, doRedirect resets the mqtt session stream
  immediately (doMqttReset) without opening any kafka stream (ownership-first).
- Add kafka != null guards to the base KafkaSessionStream do* frame-writers so a
  never-begun session stream (the redirect path) is not written to.
- Specs: delete the kafka-side session.redirect scripts (no kafka activity on
  redirect); mqtt-side client uses connect aborted, server uses accept/rejected.
- ITs: shouldRedirect runs mqtt-side only against proxy.redirect.yaml (store
  seeded with client-1#owner = mqtt-2.example.com); peer MqttIT#shouldRedirect
  passes.

Known issue for analysis: the runtime MqttKafkaSessionProxyIT#shouldRedirect
hits an engine-side close timeout with doMqttReset alone — the proxy's reply
direction is not released. doMqttAbort on the reply clears it but is being
reviewed; committed without it for reproduction.
newStream returns a non-null consumer, so the engine fully establishes the
bidirectional mqtt session stream before ownership is resolved. Because the
redirect decision is deferred behind the async store lookup, the proxy has
accepted the stream and owes a reply terminal; resetting only the initial
leaves the reply undischarged and the engine synthetically resets it on close.
Abort the reply alongside the initial RESET so the stream is fully torn down.

MqttKafkaSessionProxyIT#shouldRedirect and peer MqttIT#shouldRedirect pass.
…on takeover

The ownership challenge previously surrendered only to a different identity
(cross-replica), so two connections for the same clientId on one replica could
not take over from each other — that path had relied on the removed highlander
group. Carry a globally-unique per-connection nonce (replicaId + initialId) in
the ownership record and surrender in onOwnershipChallenged whenever the
challenger's nonce differs from ours. A newer connection's store.put fires the
incumbent's watch, which then unlocks and resets, letting the newcomer acquire
the lock. Identity is still used for the strong-remote redirect decision.
… re-model

The three takeover scenarios (session.client.takeover, session.exists.clean.start,
session.will.message.takeover.deliver.will) relied on the removed highlander
group. Same-replica takeover now works via the per-connection ownership nonce,
but the scripts/ITs will be re-modeled when the ownership logic moves to the
mqtt server binding. Ignore them with a TODO until then so the removal lands
green.
Merge develop (#1804 et al) into the removal branch. develop's #1804 moved the
'store' option and the session-ownership deprecation warning from mqtt-kafka
to the mqtt server binding, so this branch's mqtt-kafka-side store-based
ownership cannot stay where it is. Strip every ownership artifact out of
mqtt-kafka so the merge compiles; the ownership logic will be added to
binding-mqtt in a follow-up commit.

Removed from binding-mqtt-kafka:
- MqttKafkaBindingConfig.store + supplyStore plumbing through MqttKafkaProxyFactory.
- MqttKafkaSessionFactory: OwnershipRecord, all on/do*Ownership* handlers,
  doRedirect, ownerIdentity/Nonce/Record helpers, OWNER_KEY_POSTFIX,
  SIGNAL_RENEW_SESSION_OWNERSHIP, SIGNAL_STEAL_SESSION_OWNERSHIP,
  REDIRECT_AVAILABLE_MASK, hasRedirectCapability, and the MqttSessionProxy
  store/ownerKey/ownerToken/owns/claimed/ownerWatch/renewAt/stealAt/redirect
  fields.
- MqttKafkaConfiguration: SESSION_LEASE/SESSION_RENEW property defs.
- The redirect spec scenario (proxy.redirect.yaml + session.redirect/{client,server}.rpt),
  shouldRedirect runtime + peer ITs.
- Conflict-resolution cleanup: drop 'store' from the mqtt-kafka schema's
  required list and strip 'store: test0' (plus the stores: block) from the
  config-fixture yamls.

mqtt-kafka now assumes ownership is achieved when it receives a session BEGIN
and proceeds straight to doEstablishSession in onMqttBegin. The kafka != null
guards on KafkaSessionStream stay; highlander/migrate removal, will-aware
doEstablishSession, and the bulk spec strip are preserved.
…binding

Move the store-based MQTT session-ownership state machine into the mqtt server
binding (where it conceptually belongs: ownership is a generic MQTT concern,
not a Kafka-mapping detail). mqtt-kafka now bootstraps the Kafka session as
soon as it receives the application BEGIN; binding-mqtt only forwards that
BEGIN after ownership is achieved.

binding-mqtt:
- MqttBindingConfig.store: resolved from options.store via supplyStore, plumbed
  through MqttBinding/MqttBindingContext.
- MqttConfiguration: INSTANCE_ID, SESSION_LEASE (PT30S), SESSION_RENEW (PT10S)
  plus a serviceHostname() accessor for the engine's service hostname.
- New InstanceId helper (mirrors mqtt-kafka's).
- MqttServerFactory: per-connection ownership state on MqttServer
  (ownerKey/Token/Watch, renewAt, stealAt, owns/claimed/ownershipResolved,
  pendingSession* for the deferred-establish), OwnershipRecord with the
  globally-unique per-connection nonce (replicaId + initialId), signal ids
  SIGNAL_RENEW_SESSION_OWNERSHIP (4) + SIGNAL_STEAL_SESSION_OWNERSHIP (5)
  wired into onNetworkSignal. Hook in onDecodeConnectPayload: when store is
  configured, defer session establishment to the lock callback by parking
  decoder = decodeAwaitOwnership. On strong remote owner doEncodeConnack
  SERVER_MOVED + serverReference and never open the downstream session; on
  claim+lock, doEstablishSession (which creates MqttSessionStream and sends
  BEGIN) + register store.watch for cooperative challenge. Same-replica
  takeover via different-nonce challenge. Re-entrant guards on the store
  callbacks for connection-already-closed paths. doReleaseOwnership called
  from cleanupNetwork and closeStreams.

The legacy session.server.redirect.before.connack path (mqtt-kafka can still
send a session RESET with serverRef post-BEGIN) is kept as a complementary
late-redirect mechanism.

specs/binding-mqtt.spec:
- server.redirect.yaml seeds the engine test store with
  client-1#owner = mqtt-2.example.com:1883.
- streams/network/v5/session.redirect/{client,server}.rpt encode the
  CONNACK SERVER_MOVED + Server Reference response.
- v5/SessionIT peer test and the runtime v5/SessionIT both cover shouldRedirect.
…nership

Wire up store-based MQTT session takeover end-to-end at the binding-mqtt layer
and retire the obsolete mqtt-kafka tests.

binding-mqtt:
- MqttServerFactory.onOwnershipChallenged now routes through the existing
  onDecodeError(SESSION_TAKEN_OVER) teardown (DISCONNECT 0x8E to the displaced
  client + END to publishes/subscribes + END to session + doReleaseOwnership),
  with a pre-emptive session.cleanupAbort when a will is present so the
  session ends as ABORT (per the will-on-abort path) and the will fires.
- OwnershipRecord FIELD_SEPARATOR: use the � escape form in source for
  clarity.

binding-mqtt specs:
- New shouldDeliverWillMessageOnSessionTakeover at v5 network + application
  layers; shouldTakeOverSession / shouldRemoveSessionAtCleanStart already
  covered v5; add a runtime IT method for each, on a tight-lease store config.
- Correct session.redirect's reason-code byte to 0x9d (MqttReasonCodes.
  SERVER_MOVED); was 0x87 (NOT_AUTHORIZED) in a pre-existing script bug, so
  the previously-passing shouldRedirect was matching the wrong byte.

mqtt-kafka cleanup:
- Delete the three @ignored takeover ITs in MqttKafkaSessionProxyIT, the
  KafkaIT/MqttIT peer methods, and the corresponding kafka/mqtt-side
  spec scenario dirs.
- Drop the dead KafkaGroup inner class and its uses in MqttKafkaPublishFactory
  / MqttKafkaPublishMetadata (left over from the kafka-side ownership strip;
  removing closes the mqtt-kafka jacoco class-coverage gate).

Verified: ./mvnw clean install across runtime/binding-mqtt, runtime/binding-mqtt-kafka,
specs/binding-mqtt.spec, specs/binding-mqtt-kafka.spec — BUILD SUCCESS with
full ITs and jacoco; coverage checks met on both bindings.
…over

The store-based ownership lock previously left a contended second connection
waiting for the broker's sessionLease to expire when the prior holder was
unresponsive (no watch event fires). Embed the live lock token in the
OwnershipRecord so a contender can pre-empt the unresponsive holder via
store.unlock(ownerKey, observedToken, ...), bounded by sessionRenew instead
of sessionLease.

- OwnershipRecord gains a fourth field, `token`. decode handles 0/1/2
  trailing separators so externally-seeded records (identity only) and
  legacy records (identity + nonce, no token) still parse cleanly.
- Reorder the claim flow: lock-then-put. ownershipRecord(token) is now only
  called from the lock-success path, so every record this code publishes
  carries the live token.
- Extract completeOwnership(traceId, token) shared by onOwnershipLocked
  (initial claim) and the new onStealComplete (post-steal); it writes the
  token-carrying record, installs the watch, schedules renew, runs
  doEstablishSession, and unsticks the decoder.
- onStealOwnership now: store.get → if observedToken present, store.unlock →
  retry store.lock → onStealComplete. The unlock-with-token only succeeds
  if that token is still the live holder; stale tokens are no-ops, leaving
  the cooperative path intact for responsive holders that surrender via the
  watch first.
- onStealComplete: success → completeOwnership; failure → CONNACK
  SERVER_BUSY (0x89) + network end (mirrors the doRedirect teardown
  pattern). No session cleanup needed because doEstablishSession only runs
  on success.
- onOwnershipRenewed: when renew returns null (token invalidated by a
  force-unlock), the displaced holder now surrenders via the same path as
  the watch-driven challenge — extracted as doSurrenderOwnership(traceId)
  (cancel renew, unlock if held, conditional session.cleanupAbort when a
  will is set, onDecodeError SESSION_TAKEN_OVER). onOwnershipChallenged
  also delegates to this helper, removing the duplication.

Tests: shouldTakeOverSession / shouldDeliverWillMessageOnSessionTakeover /
shouldRemoveSessionAtCleanStart / shouldRedirect remain green. The
SERVER_BUSY rejection branch and the renew-failure surrender are not
covered by an IT (would require the test store to surface a "renew refused"
outcome and a contender holding through both lock attempts); jacoco gates
on both bindings still pass.
@jfallows jfallows changed the title Refactor MQTT session management to use store-based persistence [2.0] Refactor MQTT session management to use store-based persistence May 29, 2026
@jfallows jfallows self-assigned this May 29, 2026
@jfallows jfallows requested a review from akrambek May 29, 2026 06:27
@akrambek

akrambek commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

Code review

Found 1 issue:

  1. KafkaOffsetFetchStream does not handle FlushFW, so a QoS2 client with no prior session-offsets record never gets a CONNACK.

The merged offset-fetch stream is opened FETCH_ONLY with a key filter and KafkaEvaluation.EAGER (newOffsetFetchStream). When the keyed #offsets record is absent — a fresh QoS2 client — the cache delivers a FLUSH after catch-up rather than DATA. But onOffsetFetchMessage only switches on Begin/Data/End/Abort/Reset; the FLUSH is silently dropped, so onOffsetFetched (the sole gate into onProducerInit → offset-commit → session establishment) is never called and the session stalls.

{
switch (msgTypeId)
{
case BeginFW.TYPE_ID:
final BeginFW begin = beginRO.wrap(buffer, index, index + length);
onKafkaBegin(begin);
break;
case DataFW.TYPE_ID:
final DataFW data = dataRO.wrap(buffer, index, index + length);
onKafkaData(data);
break;
case EndFW.TYPE_ID:
final EndFW end = endRO.wrap(buffer, index, index + length);
onKafkaEnd(end);
break;
case AbortFW.TYPE_ID:
final AbortFW abort = abortRO.wrap(buffer, index, index + length);
onKafkaAbort(abort);
break;
case ResetFW.TYPE_ID:
final ResetFW reset = resetRO.wrap(buffer, index, index + length);
onKafkaReset(reset);
break;
}
}

The sibling stream with identical FETCH_ONLY + key + EAGER semantics, KafkaFetchWillSignalStream, handles exactly this no-data case by driving establishment from its flush handler — confirming the missing-key path arrives as a FLUSH:

protected void onKafkaFlush(
FlushFW flush)
{
final long sequence = flush.sequence();
final long acknowledge = flush.acknowledge();
final long traceId = flush.traceId();
final long authorization = flush.authorization();
final long reserved = flush.reserved();
assert acknowledge <= sequence;
assert sequence >= replySeq;
replySeq = sequence + reserved;
assert replyAck <= replySeq;
delegate.session.doKafkaEnd(traceId, authorization);
delegate.doEstablishSession(traceId, authorization);

Note onOffsetFetched already anticipates empty entries (initProducer branch), but that branch is only reachable via onKafkaData — so it cannot run when delivery is a FLUSH. Existing QoS2 specs always pre-seed the #offsets record, so this path is untested.

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

public List<KafkaOffsetMetadata> decode(
MqttKafkaSessionOffsetsFW offsets)
{
final List<KafkaOffsetMetadata> entries = new ArrayList<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating garbage.

@Specification({
"${mqtt}/session.server.sent.reset/client",
"${kafka}/session.group.server.sent.reset/server"})
public void shouldGroupStreamReceiveServerSentReset() throws Exception

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to understand if we are just refactoring implementation why do we need to remove spec tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are no longer using consumer groups, so the spec scripts associated with them are no longer needed.

}
}

// Shared success path for both the initial lock acquisition and a steal that succeeded

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like to much of comments.

{
stealAt = NO_CANCEL_ID;

if (MqttState.initialClosed(state) || MqttState.replyClosed(state))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simplified with below condition

}
}

record = new OwnershipRecord(!weak, identity, nonce, token);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is garbage

{
sessionOffsetsRW.wrap(offsetBuffer, 0, offsetBuffer.capacity());
sessionOffsetsRW.version(SESSION_OFFSETS_VERSION);
entries.forEach(metadata -> sessionOffsetsRW.entriesItem(item -> encodeEntry(item, metadata)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up on the "Creating garbage." note here: encode()/encodeEntry() run on the per-QoS2 offset-commit path (MqttKafkaPublishFactory.doCommitOffset*), so these are warm-path allocations rather than one-time setup:

  • L178 entries.forEach(metadata -> sessionOffsetsRW.entriesItem(item -> encodeEntry(item, metadata))) — the inner item -> encodeEntry(item, metadata) captures metadata, so a fresh lambda is allocated per entry, per commit (and the outer lambda captures this).
  • L217 metadata.packetIds.forEach(p -> item.appendPacketIds(p.shortValue()))packetIds is an Agrona IntArrayList; forEach(Consumer) boxes every int to Integer, plus the capturing lambda. IntArrayList.forEachInt(IntConsumer) avoids both.

Both are avoidable with index loops over entries/packetIds (and forEachInt), keeping this on the no-alloc-on-the-hot-path convention. decode() just below already uses the primitive (IntConsumer) packetIds::add for the same reason.

claude added 6 commits June 4, 2026 17:20
- KafkaSessionOffsetsHelper.encode: cache lambdas and stash transient state
  on per-worker fields to drop the per-entry lambda capture and Integer
  boxing on the QoS2 offset-commit warm path; use IntArrayList.forEachInt
  and an index loop over the List signature.
- MqttServerFactory ownership block: trim verbose explanatory comments;
  rename `weak` to `strong` in OwnershipRecord.decode so the constructor
  call carries the positive boolean directly.
…client

The merged offset-fetch stream is opened FETCH_ONLY with a key filter and
KafkaEvaluation.EAGER. When the keyed `#offsets` record is absent (fresh
QoS2 client), the cache delivers a FLUSH after catch-up rather than DATA.
KafkaOffsetFetchStream.onOffsetFetchMessage previously dispatched only
Begin/Data/End/Abort/Reset, silently dropping the FLUSH so session
bring-up stalled and no CONNACK ever reached the client. Add a FlushFW
case that calls delegate.onOffsetFetched with an empty entries list,
taking the same initProducer branch the empty-data path already supports.

Add publish.qos2.no.prior.offsets spec scenario (mqtt + kafka client/
server) whose kafka offsets-fetch arm responds via FLUSH instead of DATA,
plus matching peer-to-peer (KafkaIT, MqttIT) and runtime
(MqttKafkaPublishProxyIT) test methods. Confirmed failing on TestTimedOut
before the fix and passing after.
…enew path

Cache ownerIdentity, ownerNonce, and the ownership-record prefix as final
fields on MqttServer, computed once in the constructor from factory-level
serviceHostname/replicaId and the per-stream initialId. ownershipRecord(token)
now does one concat instead of four; ownerIdentity/ownerNonce comparisons
read fields instead of recomputing. The renew-callback lambda is replaced
by a cached method reference whose pending traceId is stashed in a field.
After catching up develop's PR #1814 (TestStoreHandler dispatch change),
the steal/renew signaler.signalAt deliveries stopped firing on the
peer-initiated initialId stream and the takeover ITs stalled. Switch
all three ownership signalAt sites to replyId, matching the convention
used by every other server-controlled timer in this factory
(keepAliveTimeout, connectTimeout).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

3 participants