forked from airtai/faststream
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add OTel baggage support (airtai#1692)
* feat: add current span in faststream context * feat: add custom baggage * test: add baggage test * feat: refactor baggage * test: fix confluent tests * docs: generate API References * feat: refactor * fix: linters * fix: mypy * docs: add baggage and CurrentSpan docs * docs: fix trailing whitespaces --------- Co-authored-by: draincoder <draincoder@users.noreply.github.com>
- Loading branch information
1 parent
7aaafdd
commit e2e3023
Showing
12 changed files
with
480 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
--- | ||
# 0.5 - API | ||
# 2 - Release | ||
# 3 - Contributing | ||
# 5 - Template Page | ||
# 10 - Default | ||
search: | ||
boost: 0.5 | ||
--- | ||
|
||
::: faststream.opentelemetry.Baggage |
11 changes: 11 additions & 0 deletions
11
docs/docs/en/api/faststream/opentelemetry/baggage/Baggage.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
--- | ||
# 0.5 - API | ||
# 2 - Release | ||
# 3 - Contributing | ||
# 5 - Template Page | ||
# 10 - Default | ||
search: | ||
boost: 0.5 | ||
--- | ||
|
||
::: faststream.opentelemetry.baggage.Baggage |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,12 @@ | ||
from faststream.opentelemetry.annotations import CurrentBaggage, CurrentSpan | ||
from faststream.opentelemetry.baggage import Baggage | ||
from faststream.opentelemetry.middleware import TelemetryMiddleware | ||
from faststream.opentelemetry.provider import TelemetrySettingsProvider | ||
|
||
__all__ = ( | ||
"Baggage", | ||
"CurrentBaggage", | ||
"CurrentSpan", | ||
"TelemetryMiddleware", | ||
"TelemetrySettingsProvider", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from opentelemetry.trace import Span | ||
from typing_extensions import Annotated | ||
|
||
from faststream import Context | ||
from faststream.opentelemetry.baggage import Baggage | ||
|
||
CurrentSpan = Annotated[Span, Context("span")] | ||
CurrentBaggage = Annotated[Baggage, Context("baggage")] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
from typing import TYPE_CHECKING, Any, List, Optional, cast | ||
|
||
from opentelemetry import baggage, context | ||
from opentelemetry.baggage.propagation import W3CBaggagePropagator | ||
from typing_extensions import Self | ||
|
||
if TYPE_CHECKING: | ||
from faststream.broker.message import StreamMessage | ||
from faststream.types import AnyDict | ||
|
||
_BAGGAGE_PROPAGATOR = W3CBaggagePropagator() | ||
|
||
|
||
class Baggage: | ||
__slots__ = ("_baggage", "_batch_baggage") | ||
|
||
def __init__( | ||
self, payload: "AnyDict", batch_payload: Optional[List["AnyDict"]] = None | ||
) -> None: | ||
self._baggage = dict(payload) | ||
self._batch_baggage = [dict(b) for b in batch_payload] if batch_payload else [] | ||
|
||
def get_all(self) -> "AnyDict": | ||
"""Get a copy of the current baggage.""" | ||
return self._baggage.copy() | ||
|
||
def get_all_batch(self) -> List["AnyDict"]: | ||
"""Get a copy of all batch baggage if exists.""" | ||
return self._batch_baggage.copy() | ||
|
||
def get(self, key: str) -> Optional[Any]: | ||
"""Get a value from the baggage by key.""" | ||
return self._baggage.get(key) | ||
|
||
def remove(self, key: str) -> None: | ||
"""Remove a baggage item by key.""" | ||
self._baggage.pop(key, None) | ||
|
||
def set(self, key: str, value: Any) -> None: | ||
"""Set a key-value pair in the baggage.""" | ||
self._baggage[key] = value | ||
|
||
def clear(self) -> None: | ||
"""Clear the current baggage.""" | ||
self._baggage.clear() | ||
|
||
def to_headers(self, headers: Optional["AnyDict"] = None) -> "AnyDict": | ||
"""Convert baggage items to headers format for propagation.""" | ||
current_context = context.get_current() | ||
if headers is None: | ||
headers = {} | ||
|
||
for k, v in self._baggage.items(): | ||
current_context = baggage.set_baggage(k, v, context=current_context) | ||
|
||
_BAGGAGE_PROPAGATOR.inject(headers, current_context) | ||
return headers | ||
|
||
@classmethod | ||
def from_msg(cls, msg: "StreamMessage[Any]") -> Self: | ||
"""Create a Baggage instance from a StreamMessage.""" | ||
if len(msg.batch_headers) <= 1: | ||
payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(msg.headers)) | ||
return cls(cast("AnyDict", payload)) | ||
|
||
cumulative_baggage: AnyDict = {} | ||
batch_baggage: List[AnyDict] = [] | ||
|
||
for headers in msg.batch_headers: | ||
payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(headers)) | ||
cumulative_baggage.update(payload) | ||
batch_baggage.append(cast("AnyDict", payload)) | ||
|
||
return cls(cumulative_baggage, batch_baggage) | ||
|
||
def __repr__(self) -> str: | ||
return self._baggage.__repr__() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.