Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: unbound error on base middleware handler #1379

Closed
chrisgoddard opened this issue Apr 17, 2024 · 4 comments · Fixed by #1381
Closed

Bug: unbound error on base middleware handler #1379

chrisgoddard opened this issue Apr 17, 2024 · 4 comments · Fixed by #1381
Labels
bug Something isn't working

Comments

@chrisgoddard
Copy link

I'm currently using the TaskIQ scheduler integration (see my fork here - I've made the changes for 0.5x compatibility).

When I'm running the app using taskiq scheduler module:app - when I cancel the process, I get this:

^C[2024-04-17 13:06:09,053][WARNING][run:run_scheduler:221] Shutting down scheduler.
2024-04-17 13:06:09,053 INFO     - FastStream app shutting down...
close!
2024-04-17 13:06:24,053 ERROR    - schedule:util-every-minute | fe498679-e - UnboundLocalError: cannot access local variable 'err' where it is not associated with a value
Traceback (most recent call last):
  File ".venv/lib/python3.12/site-packages/faststream/broker/middlewares/base.py", line 64, in consume_scope
    result = await call_next(await self.on_consume(msg))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/faststream/broker/wrapper/call.py", line 206, in decode_wrapper
    return await func(msg)
           ^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/fast_depends/use.py", line 146, in injected_wrapper
    r = await real_model.asolve(
        ^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/fast_depends/core/model.py", line 529, in asolve
    response = await run_async(call, *final_args, **final_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/fast_depends/utils.py", line 48, in run_async
    return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/faststream/utils/functions.py", line 53, in to_async_wrapper
    return await call_or_await(func, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/fast_depends/utils.py", line 48, in run_async
    return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "PROJECT/services/scheduler.py", line 123, in minute_utility_tasks
    await redis.psexpire()
  File "PROJECT/components/data/storage/redis/api.py", line 485, in psexpire
    await self.hdel(f'{key}:t', ekey.decode('utf-8'))
  File ".venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 610, in execute_command
    return await conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 584, in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 631, in parse_response
    response = await connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/redis/asyncio/connection.py", line 537, in read_response
    response = await self._parser.read_response(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/redis/_parsers/resp3.py", line 153, in read_response
    response = await self._read_response(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/redis/_parsers/resp3.py", line 165, in _read_response
    raw = await self._readline()
          ^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/redis/_parsers/base.py", line 219, in _readline
    data = await self._stream.readline()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/asyncio/streams.py", line 660, in readuntil
    await self._wait_for_data('readuntil')
  File ".venv/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".venv/lib/python3.12/site-packages/faststream/broker/subscriber/usecase.py", line 331, in consume
    result_msg = await h.call(
                 ^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/faststream/broker/subscriber/call_item.py", line 172, in call
    raise e
  File ".venv/lib/python3.12/site-packages/faststream/broker/subscriber/call_item.py", line 164, in call
    result = await call(message)
             ^^^^^^^^^^^^^^^^^^^
  File ".venv/lib/python3.12/site-packages/faststream/broker/middlewares/base.py", line 74, in consume_scope
    await self.after_consume(err)
                             ^^^
UnboundLocalError: cannot access local variable 'err' where it is not associated with a value
2024-04-17 13:06:24,059 INFO     - schedule:util-every-minute | fe498679-e - Processed

Unrelated to the implementation, I'm wondering if on broker/middlwares/base.py:consume_scope, we should add = None so we don't get the scope error?

async def consume_scope(
        self,
        call_next: "AsyncFuncAny",
        msg: "StreamMessage[Any]",
    ) -> Any:
        """Asynchronously consumes a message and returns an asynchronous iterator of decoded messages."""
        err: Optional[Exception] = None  # ADDED = None here
        try:
            result = await call_next(await self.on_consume(msg))

        except Exception as e:
            err = e

        else:
            err = None
            return result

        finally:
            await self.after_consume(err)
@chrisgoddard chrisgoddard added the bug Something isn't working label Apr 17, 2024
@Lancetnik
Copy link
Member

Looks so, but we already have err=None in else block and it should works. I have to check, but you can remove it from else and make PR with presetup case, if you want

@Lancetnik Lancetnik mentioned this issue Apr 18, 2024
13 tasks
github-merge-queue bot pushed a commit that referenced this issue Apr 18, 2024
* chore: update dependencies

* fix (#1376): fix Redis connection options priority in FastAPI integration

* docs: fix typo:

* fix (#1379): unbound BaseModdleware consume scope error

* chore: bump version
@Lancetnik
Copy link
Member

Btw, can I wait for your PR with 0.5.0 compatibility in taskiq-faststream @chrisgoddard ?

@chrisgoddard
Copy link
Author

Wait? Not sure I follow. I haven't seen any activity on my PR. I'll bump the version to 0.5.2 when I'm back at my computer this evening.

@Lancetnik
Copy link
Member

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants