-
Notifications
You must be signed in to change notification settings - Fork 177
Bug/semaphore lock #647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Bug/semaphore lock #647
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Summary
Replaces Semaphore(1) with Lock to fix concurrent engine start/stop issues, but introduces critical deadlock bug.
Major Issues:
astart()acquires mutex but never releases it, causing deadlock on any subsequent callastop()logic is broken - checks if mutex is locked but relies on manual acquire/release pattern- The implementation doesn't actually solve the original problem described in the PR
Original Issue (from PR):
The author reports that multiple concurrent async with engine: calls increment a semaphore counter beyond its limit. However, Semaphore(1) acts as a binary semaphore/mutex - it shouldn't allow counter > 1.
Root Cause:
The original code's semaphore pattern was correct. The real issue is likely that multiple tasks call __aenter__() → astart(), and the semaphore correctly serializes these calls. But the implementation needs proper context manager pattern (async with) not manual acquire/release.
Confidence Score: 0/5
- This PR will cause immediate deadlocks in production and must not be merged
- Score reflects critical logic error where mutex is acquired but never released in
astart(), causing permanent deadlock on second engine start. This makes the async context manager pattern (async with engine:) completely unusable. - libs/infinity_emb/infinity_emb/engine.py requires complete reimplementation of the locking mechanism
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| libs/infinity_emb/infinity_emb/engine.py | 1/5 | Critical deadlock bug: mutex acquired in astart() but never released, blocking all subsequent starts |
Sequence Diagram
sequenceDiagram
participant Job1 as Job 1
participant Job2 as Job 2
participant Engine as AsyncEmbeddingEngine
participant Mutex as _running_mutex
participant Handler as BatchHandler
Note over Job1,Job2: Multiple async with engine: contexts
Job1->>Engine: __aenter__()
Engine->>Engine: astart()
Engine->>Mutex: acquire()
Mutex-->>Engine: acquired
Engine->>Engine: Check not self.running
Engine->>Engine: Set self.running = True
Engine->>Handler: spawn()
Handler-->>Engine: started
Note over Engine,Mutex: ❌ DEADLOCK: Lock never released!
Engine-->>Job1: enters context
Job2->>Engine: __aenter__()
Engine->>Engine: astart()
Engine->>Mutex: acquire()
Note over Mutex: ❌ BLOCKED FOREVER: waiting for lock
Job1->>Engine: __aexit__()
Engine->>Engine: astop()
Engine->>Mutex: locked()?
Note over Engine: Lock is held, continues
Engine->>Engine: Check self.running
Engine->>Handler: shutdown()
Engine->>Mutex: release()
Note over Mutex: Lock finally released
Note over Job2: Still blocked in astart()!
Additional Comments (3)
-
libs/infinity_emb/infinity_emb/engine.py, line 84 (link)logic: critical deadlock: mutex acquired but never released in
astart(). any second call will block forever. wrap the entire method body inasync with self._running_mutex:instead of just callingacquire() -
libs/infinity_emb/infinity_emb/engine.py, line 97-104 (link)logic: logic error:
astop()will return early if mutex is not locked, but the mutex should always be locked after a successfulastart(). this check doesn't protect against anything meaningful. should useasync with self._running_mutex:pattern instead -
libs/infinity_emb/infinity_emb/engine.py, line 55 (link)style: the mutex is initialized in
__init__, but in the old code the semaphore was lazily initialized inastart(). if multiple engine instances are created but never started, this pre-allocates locks unnecessarily. consider lazy initialization for consistency with the original design
1 file reviewed, 3 comments
ab3cd53 to
07dfe74
Compare
|
Thanks, looking into it. |
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #647 +/- ##
==========================================
- Coverage 79.54% 79.49% -0.06%
==========================================
Files 43 43
Lines 3495 3501 +6
==========================================
+ Hits 2780 2783 +3
- Misses 715 718 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@vasilypht Can you comment on |
| await self._batch_handler.spawn() | ||
|
|
||
| async def astop(self): | ||
| async def astop(self, *, force: bool = False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is force=True needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be useful when used without a context manager, to ensure the process is terminated. For example, using the on_shutdown event in some framework.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The solution is generally debatable; I've only provided one possible solution.
In my project, I encountered a deadlock when using the context manager in asynchronously launched tasks. Ultimately, I settled on a one-time call to the start method.
The review was on the first commit, and there really was a not very good decision there that could have caused the blocking. |
|
@codex review |
Related Issue
Checklist
Additional Notes
Hi!
When running multiple jobs asynchronously, a semaphore lock occurs. Multiple jobs will increment the semaphore counter to >3, and after the first job completes, the lock will remain.
I suggest using a mutex, so all new jobs will simply wait for it to be released and then proceed quietly.
Example code with locking:
I understand that the behavior with constantly turning on/off the engine is incorrect, but suddenly someone wants to do this)