Skip to content

Releases: airtai/faststream

v0.5.8

23 May 19:36
82853db
Compare
Choose a tag to compare

What's Changed

This is the time for a new NATS features! FastStream supports NATS Key-Value and Object Storage subscribption features in a native way now (big thx for @sheldygg)!

  1. KeyValue creation and watching API added (you can read updated documentation section for changes):

     from faststream import FastStream, Logger
     from faststream.nats import NatsBroker
     
     broker = NatsBroker()
     app = FastStream(broker)
     
     @broker.subscriber("some-key", kv_watch="bucket")
     async def handler(msg: int, logger: Logger):
         logger.info(msg)
     
     @app.after_startup
     async def test():
         kv = await broker.key_value("bucket")
         await kv.put("some-key", b"1")
  2. ObjectStore API added as well (you can read updated documentation section for changes):

    from faststream import FastStream, Logger
    from faststream.nats import NatsBroker
    
    broker = NatsBroker()
    app = FastStream(broker)
    
    @broker.subscriber("file-bucket", obj_watch=True)
    async def handler(filename: str, logger: Logger):
        logger.info(filename)
    
    @app.after_startup
    async def test():
        object_store = await broker.object_storage("file-bucket")
        await object_store.put("some-file.txt", b"1")
  3. Also now you can use just pull_sub=True instead of pull_sub=PullSub() in basic case:

     from faststream import FastStream, Logger
     from faststream.nats import NatsBroker
     
     broker = NatsBroker()
     app = FastStream(broker)
     
     @broker.subscriber("test", stream="stream", pull_sub=True)
     async def handler(msg, logger: Logger):
         logger.info(msg)

Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:

@broker.subscriber("tests", no_reply=True)
async def handler():
    ....
 
# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)

New Contributors

Full Changelog: 0.5.7...0.5.8

v0.5.7

19 May 12:11
e0ec055
Compare
Choose a tag to compare

What's Changed

Finally, FastStream supports OpenTelemetry in a native way to collect the full trace of your services! Big thanks for @draincoder for that!

First of all you need to install required dependencies to support OpenTelemetry:

pip install faststream[otel]

Then you can just add a middleware for your broker and that's it!

from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.nats.opentelemetry import NatsTelemetryMiddleware

broker = NatsBroker(
    middlewares=(
        NatsTelemetryMiddleware(),
    )
)
app = FastStream(broker)

To find detailt information just visit our documentation aboout telemetry

P.S. The release includes basic OpenTelemetry support - messages tracing & basic metrics. Baggage support and correct spans linking in batch processing case will be added soon.

New Contributors

Full Changelog: 0.5.6...0.5.7

v0.5.6

16 May 17:52
1bcbcf5
Compare
Choose a tag to compare

What's Changed

  • feature: add --factory param by @Sehat1137 in #1440
  • feat: add RMQ channels options, support for prefix for routing_key, a… by @Lancetnik in #1448
  • feature: Add from faststream.rabbit.annotations import Connection, Channel shortcuts
  • Bugfix: RabbitMQ RabbitRouter prefix now affects to queue routing key as well
  • Feature (close #1402): add broker.add_middleware public API to append a middleware to already created broker
  • Feature: add RabbitBroker(channel_number: int, publisher_confirms: bool, on_return_raises: bool) options to setup channel settings
  • Feature (close #1447): add StreamMessage.batch_headers attribute to provide with access to whole batch messages headers

New Contributors

Full Changelog: 0.5.5...0.5.6

v0.5.5

09 May 11:47
1d32af5
Compare
Choose a tag to compare

What's Changed

Add support for explicit partition assignment in aiokafka KafkaBroker (special thanks to @spataphore1337):

from faststream import FastStream
from faststream.kafka import KafkaBroker, TopicPartition

broker = KafkaBroker()

topic_partition_fisrt = TopicPartition("my_topic", 1)
topic_partition_second = TopicPartition("my_topic", 2)

@broker.subscribe(partitions=[topic_partition_fisrt, topic_partition_second])
async def some_consumer(msg):
   ...

Full Changelog: 0.5.4...0.5.5

v0.5.4

04 May 18:15
8b8a82e
Compare
Choose a tag to compare

What's Changed

Full Changelog: 0.5.3...0.5.4

0.5.3

23 Apr 12:15
08e91d2
Compare
Choose a tag to compare

What's Changed

Full Changelog: 0.5.2...0.5.3

v0.5.2

18 Apr 15:00
3e39c88
Compare
Choose a tag to compare

What's Changed

Just a little bugfix patch. Fixes #1379 and #1376.

Full Changelog: 0.5.1...0.5.2

v0.5.1

17 Apr 11:53
a8d3142
Compare
Choose a tag to compare

What's Changed

We already have some fixes related to RedisBroker (#1375, #1376) and some new features for you:

  1. Now broke.include_router(...) allows to pass some arguments to setup router at including moment instead of creation
broker.include_router(
   router,
   prefix="test_",
   dependencies=[Depends(...)],
   middlewares=[BrokerMiddleware],
   include_in_schema=False,
)
  1. KafkaBroker().subscriber(...) now consumes aiokafka.ConsumerRebalanceListener object.
    You can find more information about it in the official aiokafka doc

(close #1319)

broker = KafkaBroker()

broker.subscriber(..., listener=MyRebalancer())

pattern option was added too, but it is still experimental and does not support Path

  1. Path feature perfomance was increased. Also, Path is suitable for NATS PullSub batch subscribtion as well now.
from faststream import NatsBroker, PullSub

broker = NastBroker()

@broker.subscriber(
    "logs.{level}",
    steam="test-stream",
    pull_sub=PullSub(batch=True),
)
async def base_handler(
    ...,
    level: str = Path(),
):
  ...

Full Changelog: 0.5.0...0.5.1

0.5.0

15 Apr 09:12
7d757dc
Compare
Choose a tag to compare

What's Changed

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...
 
@subscriber()
async def handler(msg: dict[str, Any]):
    ...

This is the preferred syntax for filtering now (the old one will be removed in 0.6.0)

  1. The router.publisher() function now returns the correct Publisher object you can use later (after broker startup).
publisher = router.publisher("test")

@router.subscriber("in")
async def handler():
    await publisher.publish("msg")

(Until 0.5.0 you could use it in this way with broker.publisher only)

  1. A list of middlewares can be passed to a broker.publisher as well:
broker = Broker(..., middlewares=())

@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=())  # new feature
async def handler():
    ...
  1. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

  2. ⚠️ BREAKING CHANGE ⚠️ : both subscriber and publisher middlewares should be async context manager type

async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allow you two previously unavailable features:

  • suppress any exceptions and pass fall-back message body to publishers, and
  • patch any outgoing message headers and other parameters.

Without those features we could not implement Observability Middleware or any similar tool, so it is the job that just had to be done.
7. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  1. All .pyi files are removed, and explicit docstrings and methods options are added.

  2. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

New Contributors

Full Changelog: 0.4.7...0.5.0

v0.5.0rc2

09 Apr 13:08
9656f4b
Compare
Choose a tag to compare
v0.5.0rc2 Pre-release
Pre-release

What's Changed

This is the final API change before stable 0.5.0 release

⚠️ HAS BREAKING CHANGE

In it, we stabilize the behavior of publishers & subscribers middlewares

async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allows you two features previously unavailable

  • suppress any exceptions and pas fall-back message body to publishers
  • patch any outgoing message headers and other parameters

Without these features we just can't impelement Observability Middleware or any similar tool, so it is the job to be done.

Now you are free to get access at any message processing stage and we are one step closer to the framework we would like to create!

Full Changelog: 0.5.0rc0...0.5.0rc2