Skip to content

Conversation

@vshekar
Copy link
Contributor

@vshekar vshekar commented Nov 6, 2025

Checklist

  • Add a Changelog entry
  • Add the ticket number which this PR closes to the comment section

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new in-memory TTLCache-based streaming datastore as an alternative to Redis, along with a refactoring that extracts common WebSocket handler logic. The implementation provides a lightweight option for development and testing scenarios where Redis is not available.

Key Changes:

  • Added TTLCacheDatastore class using cachetools.TTLCache for in-memory sequence and data storage
  • Introduced PubSub class for in-process pub/sub messaging using weakrefs
  • Refactored common WebSocket handler logic into _make_ws_handler_common function to reduce code duplication between Redis and TTLCache backends

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 223 to 237
def __init__(self, settings: Dict[str, Any]):
self._settings = settings
maxsize = self._settings.get("maxsize", 1000)
seq_ttl = self._settings["seq_ttl"]
self._seq_cache = cachetools.TTLCache(
maxsize=maxsize,
ttl=seq_ttl,
)
self._seq_counters = cachetools.TTLCache(
maxsize=maxsize,
ttl=seq_ttl,
)
self._data_cache = cachetools.TTLCache(
maxsize=maxsize, ttl=self._settings["data_ttl"]
)
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing required configuration validation. The TTLCacheDatastore expects seq_ttl and data_ttl settings but doesn't handle the case where they're missing. Unlike RedisStreamingDatastore which requires these settings, TTLCacheDatastore should either provide defaults or validate their presence.

The code will raise a KeyError on line 226 and 236 if these settings are not provided. Consider either:

  1. Adding validation to raise a clearer error message
  2. Providing sensible defaults like the StreamingCache config does (data_ttl=3600, seq_ttl=2592000)

Copilot uses AI. Check for mistakes.
@danielballan
Copy link
Member

The feedback from Copilot is solid.

@vshekar
Copy link
Contributor Author

vshekar commented Dec 9, 2025

The feedback from Copilot is solid.

Agreed, will start addressing them

@vshekar vshekar requested a review from Copilot December 16, 2025 15:17
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 138 to 142
async def publish(self, topic: str, message):
for ref in list(self._topics.get(topic, ())):
q = ref()
if q:
q.put_nowait(message)
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The put_nowait call could raise asyncio.QueueFull if a subscriber's queue is full. This would prevent messages from being delivered to other subscribers. Consider wrapping this in a try-except block to log the error and continue delivery to other subscribers.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants