Skip to content

Commit

Permalink
feat (airtai#1252): respect Redis StreamSub last_id with consumer gro…
Browse files Browse the repository at this point in the history
…up (airtai#1256)

Co-authored-by: Kumaran Rajendhiran <kumaran@airt.ai>
  • Loading branch information
Lancetnik and kumaranvpl authored Feb 23, 2024
1 parent da825e4 commit e5ec0cb
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 11 deletions.
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""
__version__ = "0.4.3"
__version__ = "0.4.4"


INSTALL_YAML = """
Expand Down
4 changes: 2 additions & 2 deletions faststream/redis/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(
self.subscription = None
self.task = None

self.last_id = stream.last_id if stream else "$"
self.last_id = getattr(stream, "last_id", "$")

super().__init__(
log_context_builder=log_context_builder,
Expand Down Expand Up @@ -228,7 +228,7 @@ async def _consume_stream_msg(
read = client.xreadgroup(
groupname=stream.group,
consumername=stream.consumer,
streams={stream.name: ">"},
streams={stream.name: self.last_id},
block=stream.polling_interval,
noack=stream.no_ack,
)
Expand Down
11 changes: 3 additions & 8 deletions faststream/redis/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,12 @@ def __init__(
raise ValueError("You should specify `group` and `consumer` both")

if group and consumer:
msg: Optional[str] = None

if last_id:
msg = "`last_id` has no effect with consumer group"
if last_id is None:
last_id = ">"

if no_ack:
msg = "`no_ack` has no effect with consumer group"

if msg:
warnings.warn(
message=msg,
message="`no_ack` has no effect with consumer group",
category=RuntimeWarning,
stacklevel=1,
)
Expand Down
24 changes: 24 additions & 0 deletions tests/brokers/redis/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,30 @@ async def handler(msg):

mock.assert_called_once_with("hello")

@pytest.mark.asyncio()
async def test_consume_group(
self,
queue: str,
full_broker: RedisBroker,
):
@full_broker.subscriber(stream=StreamSub(queue, group="group", consumer=queue))
async def handler(msg: RedisMessage):
...

assert next(iter(full_broker.handlers.values())).last_id == ">"

@pytest.mark.asyncio()
async def test_consume_group_with_last_id(
self,
queue: str,
full_broker: RedisBroker,
):
@full_broker.subscriber(stream=StreamSub(queue, group="group", consumer=queue, last_id="1"))
async def handler(msg: RedisMessage):
...

assert next(iter(full_broker.handlers.values())).last_id == "1"

async def test_consume_stream_native(
self,
broker: RedisBroker,
Expand Down

0 comments on commit e5ec0cb

Please sign in to comment.