Releases: airtai/faststream
v0.5.8
What's Changed
This is the time for a new NATS features! FastStream supports NATS Key-Value and Object Storage subscribption features in a native way now (big thx for @sheldygg)!
-
KeyValue creation and watching API added (you can read updated documentation section for changes):
from faststream import FastStream, Logger from faststream.nats import NatsBroker broker = NatsBroker() app = FastStream(broker) @broker.subscriber("some-key", kv_watch="bucket") async def handler(msg: int, logger: Logger): logger.info(msg) @app.after_startup async def test(): kv = await broker.key_value("bucket") await kv.put("some-key", b"1")
-
ObjectStore API added as well (you can read updated documentation section for changes):
from faststream import FastStream, Logger from faststream.nats import NatsBroker broker = NatsBroker() app = FastStream(broker) @broker.subscriber("file-bucket", obj_watch=True) async def handler(filename: str, logger: Logger): logger.info(filename) @app.after_startup async def test(): object_store = await broker.object_storage("file-bucket") await object_store.put("some-file.txt", b"1")
-
Also now you can use just
pull_sub=True
instead ofpull_sub=PullSub()
in basic case:from faststream import FastStream, Logger from faststream.nats import NatsBroker broker = NatsBroker() app = FastStream(broker) @broker.subscriber("test", stream="stream", pull_sub=True) async def handler(msg, logger: Logger): logger.info(msg)
Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:
@broker.subscriber("tests", no_reply=True)
async def handler():
....
# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)
- fix: when headers() returns None in AsyncConfluentParser, replace it with an empty tuple by @andreaimprovised in #1460
- Implement Kv/Obj watch. by @sheldygg in #1383
- feat: add subscriber no-reply option by @Lancetnik in #1461
New Contributors
- @andreaimprovised made their first contribution in #1460
Full Changelog: 0.5.7...0.5.8
v0.5.7
What's Changed
Finally, FastStream supports OpenTelemetry in a native way to collect the full trace of your services! Big thanks for @draincoder for that!
First of all you need to install required dependencies to support OpenTelemetry:
pip install faststream[otel]
Then you can just add a middleware for your broker and that's it!
from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.nats.opentelemetry import NatsTelemetryMiddleware
broker = NatsBroker(
middlewares=(
NatsTelemetryMiddleware(),
)
)
app = FastStream(broker)
To find detailt information just visit our documentation aboout telemetry
P.S. The release includes basic OpenTelemetry support - messages tracing & basic metrics. Baggage support and correct spans linking in batch processing case will be added soon.
- fix: serialize TestClient rpc output to mock the real message by @Lancetnik in #1452
- feature (#916): Observability by @draincoder in #1398
New Contributors
- @draincoder made their first contribution in #1398
Full Changelog: 0.5.6...0.5.7
v0.5.6
What's Changed
- feature: add --factory param by @Sehat1137 in #1440
- feat: add RMQ channels options, support for prefix for routing_key, a… by @Lancetnik in #1448
- feature: Add
from faststream.rabbit.annotations import Connection, Channel
shortcuts - Bugfix: RabbitMQ RabbitRouter prefix now affects to queue routing key as well
- Feature (close #1402): add
broker.add_middleware
public API to append a middleware to already created broker - Feature: add
RabbitBroker(channel_number: int, publisher_confirms: bool, on_return_raises: bool)
options to setup channel settings - Feature (close #1447): add
StreamMessage.batch_headers
attribute to provide with access to whole batch messages headers
New Contributors
- @Sehat1137 made their first contribution in #1440
Full Changelog: 0.5.5...0.5.6
v0.5.5
What's Changed
Add support for explicit partition assignment in aiokafka KafkaBroker
(special thanks to @spataphore1337):
from faststream import FastStream
from faststream.kafka import KafkaBroker, TopicPartition
broker = KafkaBroker()
topic_partition_fisrt = TopicPartition("my_topic", 1)
topic_partition_second = TopicPartition("my_topic", 2)
@broker.subscribe(partitions=[topic_partition_fisrt, topic_partition_second])
async def some_consumer(msg):
...
- Update Release Notes for 0.5.4 by @faststream-release-notes-updater in #1421
- feature: manual partition assignment to Kafka by @spataphore1337 in #1422
- Chore/update deps by @Lancetnik in #1429
- Fix/correct dynamic subscriber registration by @Lancetnik in #1433
- chore: bump version by @Lancetnik in #1435
Full Changelog: 0.5.4...0.5.5
v0.5.4
What's Changed
- Update Release Notes for 0.5.3 by @faststream-release-notes-updater in #1400
- fix (#1415): raise SetupError if rpc and reply_to are using in TestCL… by @Lancetnik in #1419
- Chore/update deps2 by @Lancetnik in #1418
- refactor: correct security with kwarg params merging by @Lancetnik in #1417
- fix (#1414): correct Messag.ack error processing by @Lancetnik in #1420
Full Changelog: 0.5.3...0.5.4
0.5.3
What's Changed
- Update Release Notes for 0.5.2 by @faststream-release-notes-updater in #1382
- Fix/setup at broker connection instead of starting by @Lancetnik in #1385
- Tests/add path tests by @Lancetnik in #1388
- Fix/path with router prefix by @Lancetnik in #1395
- chore: update dependencies by @Lancetnik in #1396
- chore: bump version by @Lancetnik in #1397
- chore: polishing by @davorrunje in #1399
Full Changelog: 0.5.2...0.5.3
v0.5.2
What's Changed
Just a little bugfix patch. Fixes #1379 and #1376.
- Update Release Notes for 0.5.1 by @faststream-release-notes-updater in #1378
- Tests/fastapi background by @Lancetnik in #1380
- Fix/0.5.2 by @Lancetnik in #1381
Full Changelog: 0.5.1...0.5.2
v0.5.1
What's Changed
We already have some fixes related to RedisBroker
(#1375, #1376) and some new features for you:
- Now
broke.include_router(...)
allows to pass some arguments to setup router at including moment instead of creation
broker.include_router(
router,
prefix="test_",
dependencies=[Depends(...)],
middlewares=[BrokerMiddleware],
include_in_schema=False,
)
KafkaBroker().subscriber(...)
now consumesaiokafka.ConsumerRebalanceListener
object.
You can find more information about it in the official aiokafka doc
(close #1319)
broker = KafkaBroker()
broker.subscriber(..., listener=MyRebalancer())
pattern
option was added too, but it is still experimental and does not support Path
Path
feature perfomance was increased. Also,Path
is suitable for NATSPullSub
batch subscribtion as well now.
from faststream import NatsBroker, PullSub
broker = NastBroker()
@broker.subscriber(
"logs.{level}",
steam="test-stream",
pull_sub=PullSub(batch=True),
)
async def base_handler(
...,
level: str = Path(),
):
...
- Update Release Notes for 0.5.0 by @faststream-release-notes-updater in #1366
- chore: bump version by @Lancetnik in #1372
- feat: kafka listener, extended include_router by @Lancetnik in #1374
- Fix/1375 by @Lancetnik in #1377
Full Changelog: 0.5.0...0.5.1
0.5.0
What's Changed
This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.
New features:
-
await FastStream.stop()
method andStopApplication
exception to stop aFastStream
worker are added. -
broker.subscriber()
androuter.subscriber()
functions now return aSubscriber
object you can use later.
subscriber = broker.subscriber("test")
@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
...
@subscriber()
async def handler(msg: dict[str, Any]):
...
This is the preferred syntax for filtering now (the old one will be removed in 0.6.0
)
- The
router.publisher()
function now returns the correctPublisher
object you can use later (after broker startup).
publisher = router.publisher("test")
@router.subscriber("in")
async def handler():
await publisher.publish("msg")
(Until 0.5.0
you could use it in this way with broker.publisher
only)
- A list of
middlewares
can be passed to abroker.publisher
as well:
broker = Broker(..., middlewares=())
@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=()) # new feature
async def handler():
...
-
Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.
-
⚠️ BREAKING CHANGE⚠️ : bothsubscriber
andpublisher
middlewares should be async context manager type
async def subscriber_middleware(call_next, msg):
return await call_next(msg)
async def publisher_middleware(call_next, msg, **kwargs):
return await call_next(msg, **kwargs)
@broker.subscriber(
"in",
middlewares=(subscriber_middleware,),
)
@broker.publisher(
"out",
middlewares=(publisher_middleware,),
)
async def handler(msg):
return msg
Such changes allow you two previously unavailable features:
- suppress any exceptions and pass fall-back message body to publishers, and
- patch any outgoing message headers and other parameters.
Without those features we could not implement Observability Middleware or any similar tool, so it is the job that just had to be done.
7. A better FastAPI compatibility: fastapi.BackgroundTasks
and response_class
subscriber option are supported.
-
All
.pyi
files are removed, and explicit docstrings and methods options are added. -
New subscribers can be registered in runtime (with an already-started broker):
subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
faststream[docs]
distribution is removed.
- Update Release Notes for 0.4.7 by @faststream-release-notes-updater in #1295
- 1129 - Create a publish command for the CLI by @MRLab12 in #1151
- Chore: packages upgraded by @davorrunje in #1306
- docs: fix typos by @omahs in #1309
- chore: update dependencies by @Lancetnik in #1323
- docs: fix misc by @Lancetnik in #1324
- docs (#1327): correct RMQ exhcanges behavior by @Lancetnik in #1328
- fix: typer 0.12 exclude by @Lancetnik in #1341
- 0.5.0 by @Lancetnik in #1326
- Generate docs and linter fixes by @davorrunje in #1348
- Fix types by @davorrunje in #1349
- chore: update dependencies by @Lancetnik in #1358
- feat: final middlewares by @Lancetnik in #1357
- Docs/0.5.0 features by @Lancetnik in #1360
New Contributors
Full Changelog: 0.4.7...0.5.0
v0.5.0rc2
What's Changed
This is the final API change before stable 0.5.0
release
In it, we stabilize the behavior of publishers & subscribers middlewares
async def subscriber_middleware(call_next, msg):
return await call_next(msg)
async def publisher_middleware(call_next, msg, **kwargs):
return await call_next(msg, **kwargs)
@broker.subscriber(
"in",
middlewares=(subscriber_middleware,),
)
@broker.publisher(
"out",
middlewares=(publisher_middleware,),
)
async def handler(msg):
return msg
Such changes allows you two features previously unavailable
- suppress any exceptions and pas fall-back message body to publishers
- patch any outgoing message headers and other parameters
Without these features we just can't impelement Observability Middleware or any similar tool, so it is the job to be done.
Now you are free to get access at any message processing stage and we are one step closer to the framework we would like to create!
- Update Release Notes for 0.5.0rc0 by @faststream-release-notes-updater in #1347
- Generate docs and linter fixes by @davorrunje in #1348
- Fix types by @davorrunje in #1349
- chore: update dependencies by @Lancetnik in #1358
- feat: final middlewares by @Lancetnik in #1357
Full Changelog: 0.5.0rc0...0.5.0rc2