-
Notifications
You must be signed in to change notification settings - Fork 11
Description
Is your feature request related to a problem? Please describe.
The @stream decorator does not play nicely with class methods. The reason to use class methods as the streaming function is to reuse state from the class instance, removing the need to reinitialize for every event that is processed, or having to keep state in the module context.
Using the @stream decorator next to the method brings the advantage that the declarative part of the streaming function is right next to the code.
For example:
@stream(
"my-topic",
group_id="consumer-invalidate-cache",
)
async def consume_event(self, cr: ConsumerRecord) -> None:
self.do_something(cr)
As the @stream decorator performs the UDF typing check, it will see both the self and cr parameters, and assume it needs to pass both cr and stream to call the stream function.
When this is subsequently passed into stream_engine.add_stream(self.consume_event), the self argument is not bound to the class.
The workaround I found to still get the desired behaviour:
def add_stream(self, stream: Stream):
stream.func = partial(stream.func, self)
stream.udf_type = UDFType.CR_ONLY_TYPING # Note: this removes the dynamic part of this check :(
self.stream_engine.add_stream(stream)
Describe the solution you'd like
It's not directly clear to me what would be a nice approach, especially as by applying the @stream decorator, the method is no longer a method but it becomes a Stream object within the class. This means passing self.consume_event does not bind the self to the method like you may expect.