Skip to content

Commit

Permalink
Replace while-sleep with Event (airtai#1683)
Browse files Browse the repository at this point in the history
* Replace while-sleep with event

* should_exit can't be reverted

* rm should_exit property

* update deprecate version

* apply linter
  • Loading branch information
Olegt0rr authored Sep 10, 2024
1 parent 27125d8 commit 98e94aa
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions faststream/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import logging.config
from typing import (
TYPE_CHECKING,
Expand All @@ -14,7 +13,7 @@
)

import anyio
from typing_extensions import ParamSpec
from typing_extensions import Annotated, ParamSpec, deprecated

from faststream._compat import ExceptionGroup
from faststream.asyncapi.proto import AsyncAPIApplication
Expand Down Expand Up @@ -104,7 +103,7 @@ def __init__(
else fake_context
)

self.should_exit = False
self._should_exit = anyio.Event()

# AsyncAPI information
self.title = title
Expand Down Expand Up @@ -163,7 +162,13 @@ async def run(
self,
log_level: int = logging.INFO,
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
sleep_time: float = 0.1,
sleep_time: Annotated[
float,
deprecated(
"Deprecated in **FastStream 0.5.24**. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = 0.1,
) -> None:
"""Run FastStream Application."""
assert self.broker, "You should setup a broker" # nosec B101
Expand All @@ -176,11 +181,7 @@ async def run(
try:
async with anyio.create_task_group() as tg:
tg.start_soon(self._startup, log_level, run_extra_options)

# TODO: mv it to event trigger after nats-py fixing
while not self.should_exit: # noqa: ASYNC110
await anyio.sleep(sleep_time)

await self._should_exit.wait()
await self._shutdown(log_level)
tg.cancel_scope.cancel()
except ExceptionGroup as e:
Expand All @@ -189,7 +190,7 @@ async def run(

def exit(self) -> None:
"""Stop application manually."""
self.should_exit = True
self._should_exit.set()

async def start(
self,
Expand Down

0 comments on commit 98e94aa

Please sign in to comment.