Fix race and disconnection after candidate replace#913
Conversation
5822740 to
6c79649
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #913 +/- ##
==========================================
- Coverage 88.64% 88.58% -0.06%
==========================================
Files 45 45
Lines 5811 5872 +61
==========================================
+ Hits 5151 5202 +51
- Misses 451 463 +12
+ Partials 209 207 -2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR addresses race conditions and a connectivity regression introduced by peer-reflexive candidate replacement by avoiding in-place mutation of shared candidate pair state, serializing remote-candidate cache access, and preserving candidate activity timestamps during replacement.
Changes:
- Serialize non-STUN remote-candidate validation/cache updates onto the agent task loop.
- Replace candidate pair objects (instead of mutating
pair.Remote) and preserve pair stats/state during replacement. - Preserve
LastReceived/LastSentacross candidate replacement and update tests to assert the new behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| candidate_base.go | Moves non-STUN candidate validation/cache operations into agent.loop.Run to eliminate concurrent map access. |
| agent.go | Adds helpers to clone candidate pair state and copy candidate activity; updates prflx replacement to swap pair objects and update selected pair. |
| agent_test.go | Updates peer-reflexive replacement tests to assert pair object replacement and preserved activity timestamps. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| validRemote := false | ||
| if err := agent.loop.Run(c, func(_ context.Context) { | ||
| if c.validateSTUNTrafficCache(srcAddr) { | ||
| validRemote = true | ||
|
|
||
| return | ||
| } | ||
|
|
||
| remoteCandidate := agent.findRemoteCandidate(c.NetworkType(), srcAddr) | ||
| if remoteCandidate == nil { | ||
| return | ||
| } | ||
|
|
||
| remoteCandidate.seen(false) | ||
| c.addRemoteCandidateCache(remoteCandidate, srcAddr) | ||
| validRemote = true | ||
| }); err != nil { |
There was a problem hiding this comment.
handleInboundPacket now calls agent.loop.Run(...) for every non-STUN packet (including application/data traffic). This makes the recv loop synchronously depend on the agent task loop and can become a throughput/latency bottleneck under media load. Consider making remoteCandidateCaches concurrency-safe (e.g., RWMutex/sync.Map/copy-on-write) so cache-hit validation can stay on the recv goroutine, and only fall back to loop.Run for cache misses/updates.
There was a problem hiding this comment.
Restored previous code and changed remoteCandidateCaches to sync.Map
| remoteCandidate := agent.findRemoteCandidate(c.NetworkType(), srcAddr) | ||
| if remoteCandidate == nil { | ||
| return | ||
| } | ||
|
|
||
| remoteCandidate.seen(false) | ||
| c.addRemoteCandidateCache(remoteCandidate, srcAddr) |
There was a problem hiding this comment.
The new non-STUN validation path duplicates logic that already exists in Agent.validateNonSTUNTraffic and leaves that helper unused (no remaining call sites). To reduce drift and keep the validation rules centralized, consider refactoring so handleInboundPacket reuses a single helper (possibly extending it to also manage remoteCandidateCaches).
There was a problem hiding this comment.
Restored previous code
| } | ||
|
|
||
| if !validRemote { | ||
| agent.log.Warnf("Discarded message from %s, not a valid remote candidate", c.addr()) |
There was a problem hiding this comment.
This warning logs c.addr() (the local candidate address) as the message source. For troubleshooting, it should log the actual remote source address (srcAddr).
| agent.log.Warnf("Discarded message from %s, not a valid remote candidate", c.addr()) | |
| agent.log.Warnf("Discarded message from %s, not a valid remote candidate", srcAddr) |
| replacement := replacePairRemote(pair, cand) | ||
| replacement.setPriorityOverride(oldPriority) | ||
| a.checklist[i] = replacement | ||
| a.pairsByID[replacement.id] = replacement | ||
|
|
||
| if a.getSelectedPair() == pair { | ||
| a.setSelectedPair(replacement) | ||
| } |
There was a problem hiding this comment.
replaceRedundantPeerReflexiveCandidates now replaces *CandidatePair objects in the checklist/map. Any other components holding a pointer to the old pair (notably controllingSelector.nominatedPair) will keep pointing at the stale object and won’t see the updated remote candidate/state. Consider updating selector state when nominatedPair == pair (and any other known pair holders) to point at replacement, similar to the selected-pair update.
| replacement := replacePairRemote(pair, cand) | |
| replacement.setPriorityOverride(oldPriority) | |
| a.checklist[i] = replacement | |
| a.pairsByID[replacement.id] = replacement | |
| if a.getSelectedPair() == pair { | |
| a.setSelectedPair(replacement) | |
| } | |
| oldID := pair.id | |
| replacement := replacePairRemote(pair, cand) | |
| replacement.setPriorityOverride(oldPriority) | |
| // Preserve the original pointer so any other component | |
| // holding a reference to this pair observes the updated | |
| // remote candidate and subsequent state changes. | |
| *pair = *replacement | |
| a.checklist[i] = pair | |
| if oldID != pair.id { | |
| delete(a.pairsByID, oldID) | |
| } | |
| a.pairsByID[pair.id] = pair |
There was a problem hiding this comment.
There was more to fix here, added retargetKnownPairHolders to address it.
6c79649 to
42eb7d5
Compare
Fixes two race conditions introduced by prflx candidate replacement and regression where an agent could transition from Connected to Disconnected/Failed shortly after replacement.
42eb7d5 to
66bb964
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ace on republish
When the same MediaMTX path is republished within a few seconds, MediaMTX
1.17.1 creates WebRTC sessions for it but ICE never completes ("deadline
exceeded while waiting connection") for 1-2 minutes, so viewers stare at the
"Verbinde mit dem Stream…" overlay through a long retry loop. Root cause is
upstream in pion/ice — fixed by v4.2.5 / MediaMTX 1.18.0+ ("Fix race and
disconnection after candidate replace", pion/ice#913). We can't take that
bump because of mediamtx#5728 (AV1 RTMP regression still open as of 1.18.1),
so side-step the bug by giving every publish a brand-new path: append an
8-hex nonce, so `channel-<cid>-<uid>` becomes `channel-<cid>-<uid>-<nonce>`.
The nonce lives in the stream-token (`stream:token:<token>.nonce`) and is
verified by the auth-hook against the path. On a successful publish-auth
the auth-hook now writes the *full path* (with nonce) into
`stream:active:channel-<cid>-<uid>.path`; media-svc's WHEP-URL lookup reads
that key instead of computing the path from (cid, uid), so viewers always
hit the currently-live path. Poller stays keyed by (cid, uid) — the nonce is
per-publish, not per-streamer-presence.
nginx WHEP resource regex updated for the new shape so DELETE on the
session resource still routes to MediaMTX.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This PR fixes two race conditions introduced by peer-reflexive candidate replacement and addresses a connectivity regression where an agent could transition from Connected to Disconnected/Failed shortly after replacement.
Problems
replaceRedundantPeerReflexiveCandidatesupdatedpair.Remotein place.CandidatePair.Write) could readpair.Remotewithout synchronization.candidateBase.handleInboundPacket(non-STUN path) read/wroteremoteCandidateCachesfrom recv loop goroutines.replaceRemoteCandidateCacheValues).LastReceived/LastSent.validateSelectedPairusesselectedPair.Remote.LastReceived(), which could cause quickConnected -> Disconnected -> Failedtransitions.Root Cause
Replacement logic preserved pair priority but mutated shared objects in place and did not preserve candidate activity timestamps across candidate object replacement.
Fixes
1) Replace candidate pair objects instead of mutating
pair.RemotereplacePairRemote(pair, remote)to clone pair state into a newCandidatePair.id, role, state, nomination flags, binding counters.atomic.Valuefields.a.checklist[i]a.pairsByID[pair.id]a.setSelectedPair(replacement).2) Serialize remote cache access on agent loop
candidateBase.handleInboundPacket:seen(false), and cache insert now run insideagent.loop.Run(...).3) Preserve candidate activity on replacement
copyCandidateActivity(dst, src)to transfer:LastReceivedLastSent