-
Notifications
You must be signed in to change notification settings - Fork 204
Description
Summary
While I understand that the API is currently not documented / supported and in-flux, I think it is worth pointing out some potential improvements to the event streaming that it currently employs.
The cagent API currently uses SSE (Server-Sent Events) for streaming agent responses, but the implementation could be more aligned with SSE best practices and the SSE specification. This proposal suggests two changes:
- Persistent stream endpoint: Use a single
GET /api/sessions/:id/streamendpoint per session for SSE events, with separatePOST/PUTendpoints for sending messages and commands - Use
event:field: Include the SSEevent:field in addition todata:to enable efficient event routing without requiring JSON parsing of payload
This approach will would enable better client library support and more idiomatic use and is consistent with the Model Context Protocol (MCP) event streaming pattern.
Current Implementation
Current Endpoint Pattern
POST /api/sessions/:id/agent/:agent
Body: [{ role: 'user', content: 'Hello' }]
→ Returns SSE stream (closes when this agent execution completes)
→ Session persists - can POST again to continue conversation, but generates new connection and event stream
Current SSE Format
data: {"type":"agent_choice","content":"Hello!","agent_name":"root"}
data: {"type":"tool_call","tool_call":{...}}
Issues:
- Requires client POST support (many client libraries, especially EventSource, don't support POST for streaming events)
- New connection per message
- Event type embedded in JSON payload (must parse JSON to determine event type)
- Not leveraging SSE's native event typing (can't use addListener pattern, various middleware patterns, etc)
Proposed Implementation
New Endpoint Pattern
GET /api/sessions/:id/stream
→ Persistent SSE stream (stays open for session lifetime)
→ Receives all events for the session
POST /api/sessions/:id/messages
Body: [{ role: 'user', content: 'Hello' }]
→ Adds messages to session, triggers agent response
→ Returns immediately (no stream) - events come through /stream endpoint
POST /api/sessions/:id/agent/*
Body: [{ role: 'user', content: 'Hello' }]
→ Sets agent configuration and/or adds messages to session
→ Returns JSON (unchanged response format) - events come through /stream endpoint
POST /api/sessions/:id/resume
Body: { confirmation: 'approve' | 'reject' | 'approve-session' }
→ Continues after tool call confirmation
→ Returns JSON (unchanged) - events come through /stream endpoint
POST /api/sessions/:id/elicitation
Body: { action: 'accept' | 'decline' | 'cancel', content: {...} }
→ Responds to elicitation request
→ Returns JSON (unchanged) - events come through /stream endpoint
POST /api/sessions/:id/tools/toggle
→ Toggles YOLO mode (auto-approve all tools) for session
→ Returns JSON (unchanged) - no events
Note: All endpoints follow a consistent pattern: they return JSON responses, and events flow through the persistent /stream endpoint. The updated /messages endpoint follows this pattern. Existing endpoints (/resume, /elicitation, /tools/toggle) continue to return their existing JSON responses. The /agent/* endpoint changes from returning an SSE stream to returning a JSON response (consistent with other endpoints). The only functional change is that events now flow through the dedicated stream endpoint instead of being returned directly.
New SSE Payload Format
event: agent_choice
data: {"content":"Hello!","agent_name":"root"}
event: tool_call
data: {"tool_call":{...}}
event: stream_stopped
data: {"agent_name":"root"}
Note: Removed type attribute from data payload and promoted it to standard top-level SSE event attribute.
Benefits
1. Works well with cllent APIs (especially EventSource)
Current (doesn't work):
// EventSource doesn't support POST
const eventSource = new EventSource(url); // ❌ Can't use POSTProposed (works):
// Connect once with EventSource
const eventSource = new EventSource(`/api/sessions/${sessionId}/stream`); // This now works because it's a GET endpoint
eventSource.addEventListener('agent_choice', (event) => { // This now works because the payload contains the event type in `event`
const data = JSON.parse(event.data);
console.log(data.content);
});
// Send messages separately
await fetch(`/api/sessions/${sessionId}/messages`, {
method: 'POST',
body: JSON.stringify([{ role: 'user', content: 'Hello' }])
});2. Efficient Event Routing
Current (must parse JSON for every event):
for (const line of lines) {
if (line.startsWith('data: ')) {
// Must parse JSON for EVERY event to check type
const data = JSON.parse(line.slice(6));
if (data.type === 'agent_choice') {
// Handle it
}
// Wasted: parsed JSON for tool_call, stream_started, etc.
}
}Proposed (check event type before parsing JSON, if not using a client library that does this for you):
let currentEvent = null;
for (const line of lines) {
if (line.startsWith('event: ')) {
// Just a string slice - no JSON parsing!
currentEvent = line.slice(7);
if (currentEvent !== 'agent_choice') {
continue; // Skip this event
}
} else if (line.startsWith('data: ') && currentEvent === 'agent_choice') {
// Only parse JSON for events we care about
const data = JSON.parse(line.slice(6));
console.log(data.content);
}
}Performance Impact: In many scenarios this allows us to avoid JSON parsing the payload of the majority of events.
3. Better Library Support
SSE libraries across languages can work with GET endpoints and leverage the event: field:
Python (sseclient):
for event in client.events():
if event.event == 'agent_choice': # Check event type (no JSON parsing)
data = json.loads(event.data) # Only parse JSON for wanted eventsGo:
for event := range sseClient.Events() {
if event.Event == "agent_choice" { // Check event type (no JSON parsing)
var data map[string]interface{}
json.Unmarshal([]byte(event.Data), &data) // Only parse JSON for wanted events
}
}Rust (eventsource-stream):
while let Some(event) = stream.next().await {
if event.event_type == "agent_choice" { // Check event type (no JSON parsing)
let data: Value = serde_json::from_str(&event.data)?; // Only parse JSON for wanted events
}
}4. Single Persistent Connection
Current: New HTTP connection per message
Proposed: One connection for entire session lifecycle
This is more efficient and matches how the TUI works (one runtime per session). It also greatly simplifies client implementation (not having to manage the connection and stream listener per message).
5. Cleaner Separation of Concerns
- Stream endpoint: Handles all events
- Message endpoint: Adds messages to session
- Command endpoints: Agent configuration, tool approvals, elicitation responses, and session settings
This separation makes the API more intuitive and easier to use - one endpoint for events, separate endpoints for different types of commands.
Consistency with Model Context Protocol (MCP)
The Model Context Protocol uses a similar pattern:
- GET
/sseor/mcp: For event stream - POST/PUT: Submit requests/commands
- Events use
event:field: MCP events include theevent:field for routing, allowing efficient processing by EventSource or other clients
This proposal aligns cagent's API with the same pattern, making it familiar to developers who work with MCP (and other SSE / streamable protocols). Note: Many of the reasons for these changes (given above) are the same reasons MCP uses this pattern.
Implementation Details
Server-Side Changes
-
New endpoint:
GET /api/sessions/:id/stream- Returns persistent SSE stream per session
- Emits all events for the session
- Stays open until session ends or client disconnects
-
Modify event emission: Include
event:field// Current fmt.Fprintf(c.Response(), "data: %s\n\n", string(data)) // Proposed eventType := getEventType(event) // Extract from event.Type field fmt.Fprintf(c.Response(), "event: %s\ndata: %s\n\n", eventType, string(data))
-
Submit message endpoint:
POST /api/sessions/:id/messages- Adds messages to session
- Triggers agent processing
- Events flow through the stream endpoint
-
Update existing agent endpoint:
POST /api/sessions/:id/agent/*- Currently returns SSE stream
- Would return JSON immediately (like other endpoints)
- Events flow through the stream endpoint
-
Existing per-session endpoints (unchanged request/response format, but events flow through stream):
POST /api/sessions/:id/resume- Continues after tool approvalPOST /api/sessions/:id/elicitation- Responds to elicitation requestsPOST /api/sessions/:id/tools/toggle- Toggles YOLO mode
Client-Side Example
// Connect to stream once
const eventSource = new EventSource(`/api/sessions/${sessionId}/stream`);
// Register handlers for specific events
eventSource.addEventListener('agent_choice', (event) => {
const data = JSON.parse(event.data);
displayMessage(data.content);
});
eventSource.addEventListener('tool_call_confirmation', async (event) => {
const data = JSON.parse(event.data);
const approved = await promptToolApproval(data);
// Respond via per-session endpoint
await fetch(`/api/sessions/${sessionId}/resume`, {
method: 'POST',
body: JSON.stringify({ confirmation: approved ? 'approve' : 'reject' })
});
// Subsequent events come through the stream
});
eventSource.addEventListener('elicitation_request', async (event) => {
const data = JSON.parse(event.data);
const response = await promptElicitation(data);
// Respond via per-session endpoint
await fetch(`/api/sessions/${sessionId}/elicitation`, {
method: 'POST',
body: JSON.stringify({ action: 'accept', content: response })
});
// Subsequent events come through the stream
});
eventSource.addEventListener('stream_stopped', (event) => {
console.log('Agent finished');
});
// Send messages as needed
async function sendMessage(message) {
await fetch(`/api/sessions/${sessionId}/messages`, {
method: 'POST',
body: JSON.stringify([{ role: 'user', content: message }])
});
// Response comes through EventSource stream
}Backward Compatibility
All existing endpoints remain functionally the same - they accept the same requests. The response format changes:
- Current: Some endpoints return SSE streams with events, others return JSON responses
- Proposed: All endpoints return JSON responses. All events flow through the persistent
GET /api/sessions/:id/streamendpoint
The request format and functionality remain unchanged - only endpoints that currently return streams change to return JSON responses, with events now flowing through the dedicated stream endpoint.
References
- W3C Server-Sent Events Specification
- Model Context Protocol - Server overview
- MDN: Using server-sent events
Conclusion
This proposal would make cagent's API:
- More spec-compliant: Uses SSE
event:field as intended - More idiomatic: Follows common SSE patterns (persistent stream + separate commands)
- More efficient: Avoids unnecessary JSON parsing
- More accessible: Works with EventSource and standard SSE libraries
- More consistent: Aligns with MCP's SSE / streamable pattern
The changes are relatively straightforward to implement and provide significant benefits for API consumers.
If the maintainers are in agreement with these changes, I'd be happy to take a stab at a PR, and equally happy to yield to someone more familiar with the code.