Skip to content

Conversation

@vasilypht
Copy link

Related Issue

Checklist

  • I have read the CONTRIBUTING guidelines.
  • I have added tests to cover my changes.
  • I have updated the documentation (docs folder) accordingly.

Additional Notes

Hi!

When running multiple jobs asynchronously, a semaphore lock occurs. Multiple jobs will increment the semaphore counter to >3, and after the first job completes, the lock will remain.

I suggest using a mutex, so all new jobs will simply wait for it to be released and then proceed quietly.

Example code with locking:

import uuid
import asyncio

from infinity_emb import AsyncEmbeddingEngine, EngineArgs


async def run_job_a(engine: AsyncEmbeddingEngine, texts: list[str]) -> list[list[float]]:
    task_id = uuid.uuid4()
    print(f"{task_id=} started")
    async with engine:
        result, usage = await engine.embed(sentences=texts)
    print(f"{task_id=} completed")
    return result


async def main() -> None:
    args = EngineArgs(
        model_name_or_path="/home/models/e5-small-v2",
    )
    engine = AsyncEmbeddingEngine.from_args(engine_args=args)
    
    outputs = await asyncio.gather(
        run_job_a(engine=engine, texts=["a", "b", "c", "d"]),
        run_job_a(engine=engine, texts=["a", "b", "c", "d"]),
        run_job_a(engine=engine, texts=["a", "b", "c", "d"])
    )

if __name__ == "__main__":
    asyncio.run(main())

I understand that the behavior with constantly turning on/off the engine is incorrect, but suddenly someone wants to do this)

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Summary

Replaces Semaphore(1) with Lock to fix concurrent engine start/stop issues, but introduces critical deadlock bug.

Major Issues:

  • astart() acquires mutex but never releases it, causing deadlock on any subsequent call
  • astop() logic is broken - checks if mutex is locked but relies on manual acquire/release pattern
  • The implementation doesn't actually solve the original problem described in the PR

Original Issue (from PR):
The author reports that multiple concurrent async with engine: calls increment a semaphore counter beyond its limit. However, Semaphore(1) acts as a binary semaphore/mutex - it shouldn't allow counter > 1.

Root Cause:
The original code's semaphore pattern was correct. The real issue is likely that multiple tasks call __aenter__()astart(), and the semaphore correctly serializes these calls. But the implementation needs proper context manager pattern (async with) not manual acquire/release.

Confidence Score: 0/5

  • This PR will cause immediate deadlocks in production and must not be merged
  • Score reflects critical logic error where mutex is acquired but never released in astart(), causing permanent deadlock on second engine start. This makes the async context manager pattern (async with engine:) completely unusable.
  • libs/infinity_emb/infinity_emb/engine.py requires complete reimplementation of the locking mechanism

Important Files Changed

File Analysis

Filename Score Overview
libs/infinity_emb/infinity_emb/engine.py 1/5 Critical deadlock bug: mutex acquired in astart() but never released, blocking all subsequent starts

Sequence Diagram

sequenceDiagram
    participant Job1 as Job 1
    participant Job2 as Job 2
    participant Engine as AsyncEmbeddingEngine
    participant Mutex as _running_mutex
    participant Handler as BatchHandler
    
    Note over Job1,Job2: Multiple async with engine: contexts
    
    Job1->>Engine: __aenter__()
    Engine->>Engine: astart()
    Engine->>Mutex: acquire()
    Mutex-->>Engine: acquired
    Engine->>Engine: Check not self.running
    Engine->>Engine: Set self.running = True
    Engine->>Handler: spawn()
    Handler-->>Engine: started
    Note over Engine,Mutex: ❌ DEADLOCK: Lock never released!
    Engine-->>Job1: enters context
    
    Job2->>Engine: __aenter__()
    Engine->>Engine: astart()
    Engine->>Mutex: acquire()
    Note over Mutex: ❌ BLOCKED FOREVER: waiting for lock
    
    Job1->>Engine: __aexit__()
    Engine->>Engine: astop()
    Engine->>Mutex: locked()?
    Note over Engine: Lock is held, continues
    Engine->>Engine: Check self.running
    Engine->>Handler: shutdown()
    Engine->>Mutex: release()
    Note over Mutex: Lock finally released
    
    Note over Job2: Still blocked in astart()!
Loading

Additional Comments (3)

  1. libs/infinity_emb/infinity_emb/engine.py, line 84 (link)

    logic: critical deadlock: mutex acquired but never released in astart(). any second call will block forever. wrap the entire method body in async with self._running_mutex: instead of just calling acquire()

  2. libs/infinity_emb/infinity_emb/engine.py, line 97-104 (link)

    logic: logic error: astop() will return early if mutex is not locked, but the mutex should always be locked after a successful astart(). this check doesn't protect against anything meaningful. should use async with self._running_mutex: pattern instead

  3. libs/infinity_emb/infinity_emb/engine.py, line 55 (link)

    style: the mutex is initialized in __init__, but in the old code the semaphore was lazily initialized in astart(). if multiple engine instances are created but never started, this pre-allocates locks unnecessarily. consider lazy initialization for consistency with the original design

1 file reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@michaelfeil
Copy link
Owner

Thanks, looking into it.

@codecov-commenter
Copy link

codecov-commenter commented Oct 19, 2025

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 93.33333% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 79.49%. Comparing base (fcb951c) to head (07dfe74).

Files with missing lines Patch % Lines
libs/infinity_emb/infinity_emb/engine.py 93.33% 1 Missing ⚠️
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #647      +/-   ##
==========================================
- Coverage   79.54%   79.49%   -0.06%     
==========================================
  Files          43       43              
  Lines        3495     3501       +6     
==========================================
+ Hits         2780     2783       +3     
- Misses        715      718       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@michaelfeil
Copy link
Owner

@vasilypht Can you comment on This PR will cause immediate deadlocks in production and must not be merged?

await self._batch_handler.spawn()

async def astop(self):
async def astop(self, *, force: bool = False):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is force=True needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be useful when used without a context manager, to ensure the process is terminated. For example, using the on_shutdown event in some framework.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution is generally debatable; I've only provided one possible solution.

In my project, I encountered a deadlock when using the context manager in asynchronously launched tasks. Ultimately, I settled on a one-time call to the start method.

@vasilypht
Copy link
Author

@vasilypht Can you comment on This PR will cause immediate deadlocks in production and must not be merged?

The review was on the first commit, and there really was a not very good decision there that could have caused the blocking.

@michaelfeil
Copy link
Owner

@codex review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants