Skip to content

API: Proposal for More Spec-Compliant and Standard SSE Implementation #1089

@BobDickinson

Description

@BobDickinson

Summary

While I understand that the API is currently not documented / supported and in-flux, I think it is worth pointing out some potential improvements to the event streaming that it currently employs.

The cagent API currently uses SSE (Server-Sent Events) for streaming agent responses, but the implementation could be more aligned with SSE best practices and the SSE specification. This proposal suggests two changes:

  1. Persistent stream endpoint: Use a single GET /api/sessions/:id/stream endpoint per session for SSE events, with separate POST/PUT endpoints for sending messages and commands
  2. Use event: field: Include the SSE event: field in addition to data: to enable efficient event routing without requiring JSON parsing of payload

This approach will would enable better client library support and more idiomatic use and is consistent with the Model Context Protocol (MCP) event streaming pattern.

Current Implementation

Current Endpoint Pattern

POST /api/sessions/:id/agent/:agent
Body: [{ role: 'user', content: 'Hello' }]
→ Returns SSE stream (closes when this agent execution completes)
→ Session persists - can POST again to continue conversation, but generates new connection and event stream

Current SSE Format

data: {"type":"agent_choice","content":"Hello!","agent_name":"root"}

data: {"type":"tool_call","tool_call":{...}}

Issues:

  • Requires client POST support (many client libraries, especially EventSource, don't support POST for streaming events)
  • New connection per message
  • Event type embedded in JSON payload (must parse JSON to determine event type)
  • Not leveraging SSE's native event typing (can't use addListener pattern, various middleware patterns, etc)

Proposed Implementation

New Endpoint Pattern

GET  /api/sessions/:id/stream
→ Persistent SSE stream (stays open for session lifetime)
→ Receives all events for the session

POST /api/sessions/:id/messages
Body: [{ role: 'user', content: 'Hello' }]
→ Adds messages to session, triggers agent response
→ Returns immediately (no stream) - events come through /stream endpoint

POST /api/sessions/:id/agent/*
Body: [{ role: 'user', content: 'Hello' }]
→ Sets agent configuration and/or adds messages to session
→ Returns JSON (unchanged response format) - events come through /stream endpoint

POST /api/sessions/:id/resume
Body: { confirmation: 'approve' | 'reject' | 'approve-session' }
→ Continues after tool call confirmation
→ Returns JSON (unchanged) - events come through /stream endpoint

POST /api/sessions/:id/elicitation
Body: { action: 'accept' | 'decline' | 'cancel', content: {...} }
→ Responds to elicitation request
→ Returns JSON (unchanged) - events come through /stream endpoint

POST /api/sessions/:id/tools/toggle
→ Toggles YOLO mode (auto-approve all tools) for session
→ Returns JSON (unchanged) - no events

Note: All endpoints follow a consistent pattern: they return JSON responses, and events flow through the persistent /stream endpoint. The updated /messages endpoint follows this pattern. Existing endpoints (/resume, /elicitation, /tools/toggle) continue to return their existing JSON responses. The /agent/* endpoint changes from returning an SSE stream to returning a JSON response (consistent with other endpoints). The only functional change is that events now flow through the dedicated stream endpoint instead of being returned directly.

New SSE Payload Format

event: agent_choice
data: {"content":"Hello!","agent_name":"root"}

event: tool_call
data: {"tool_call":{...}}

event: stream_stopped
data: {"agent_name":"root"}

Note: Removed type attribute from data payload and promoted it to standard top-level SSE event attribute.

Benefits

1. Works well with cllent APIs (especially EventSource)

Current (doesn't work):

// EventSource doesn't support POST
const eventSource = new EventSource(url); // ❌ Can't use POST

Proposed (works):

// Connect once with EventSource
const eventSource = new EventSource(`/api/sessions/${sessionId}/stream`); // This now works because it's a GET endpoint

eventSource.addEventListener('agent_choice', (event) => { // This now works because the payload contains the event type in `event`
  const data = JSON.parse(event.data);
  console.log(data.content);
});

// Send messages separately
await fetch(`/api/sessions/${sessionId}/messages`, {
  method: 'POST',
  body: JSON.stringify([{ role: 'user', content: 'Hello' }])
});

2. Efficient Event Routing

Current (must parse JSON for every event):

for (const line of lines) {
  if (line.startsWith('data: ')) {
    // Must parse JSON for EVERY event to check type
    const data = JSON.parse(line.slice(6));
    if (data.type === 'agent_choice') {
      // Handle it
    }
    // Wasted: parsed JSON for tool_call, stream_started, etc.
  }
}

Proposed (check event type before parsing JSON, if not using a client library that does this for you):

let currentEvent = null;
for (const line of lines) {
  if (line.startsWith('event: ')) {
    // Just a string slice - no JSON parsing!
    currentEvent = line.slice(7);
    if (currentEvent !== 'agent_choice') {
      continue; // Skip this event
    }
  } else if (line.startsWith('data: ') && currentEvent === 'agent_choice') {
    // Only parse JSON for events we care about
    const data = JSON.parse(line.slice(6));
    console.log(data.content);
  }
}

Performance Impact: In many scenarios this allows us to avoid JSON parsing the payload of the majority of events.

3. Better Library Support

SSE libraries across languages can work with GET endpoints and leverage the event: field:

Python (sseclient):

for event in client.events():
    if event.event == 'agent_choice':  # Check event type (no JSON parsing)
        data = json.loads(event.data)  # Only parse JSON for wanted events

Go:

for event := range sseClient.Events() {
    if event.Event == "agent_choice" {  // Check event type (no JSON parsing)
        var data map[string]interface{}
        json.Unmarshal([]byte(event.Data), &data)  // Only parse JSON for wanted events
    }
}

Rust (eventsource-stream):

while let Some(event) = stream.next().await {
    if event.event_type == "agent_choice" {  // Check event type (no JSON parsing)
        let data: Value = serde_json::from_str(&event.data)?;  // Only parse JSON for wanted events
    }
}

4. Single Persistent Connection

Current: New HTTP connection per message
Proposed: One connection for entire session lifecycle

This is more efficient and matches how the TUI works (one runtime per session). It also greatly simplifies client implementation (not having to manage the connection and stream listener per message).

5. Cleaner Separation of Concerns

  • Stream endpoint: Handles all events
  • Message endpoint: Adds messages to session
  • Command endpoints: Agent configuration, tool approvals, elicitation responses, and session settings

This separation makes the API more intuitive and easier to use - one endpoint for events, separate endpoints for different types of commands.

Consistency with Model Context Protocol (MCP)

The Model Context Protocol uses a similar pattern:

  • GET /sse or /mcp: For event stream
  • POST/PUT: Submit requests/commands
  • Events use event: field: MCP events include the event: field for routing, allowing efficient processing by EventSource or other clients

This proposal aligns cagent's API with the same pattern, making it familiar to developers who work with MCP (and other SSE / streamable protocols). Note: Many of the reasons for these changes (given above) are the same reasons MCP uses this pattern.

Implementation Details

Server-Side Changes

  1. New endpoint: GET /api/sessions/:id/stream

    • Returns persistent SSE stream per session
    • Emits all events for the session
    • Stays open until session ends or client disconnects
  2. Modify event emission: Include event: field

    // Current
    fmt.Fprintf(c.Response(), "data: %s\n\n", string(data))
    
    // Proposed
    eventType := getEventType(event) // Extract from event.Type field
    fmt.Fprintf(c.Response(), "event: %s\ndata: %s\n\n", eventType, string(data))
  3. Submit message endpoint: POST /api/sessions/:id/messages

    • Adds messages to session
    • Triggers agent processing
    • Events flow through the stream endpoint
  4. Update existing agent endpoint: POST /api/sessions/:id/agent/*

    • Currently returns SSE stream
    • Would return JSON immediately (like other endpoints)
    • Events flow through the stream endpoint
  5. Existing per-session endpoints (unchanged request/response format, but events flow through stream):

    • POST /api/sessions/:id/resume - Continues after tool approval
    • POST /api/sessions/:id/elicitation - Responds to elicitation requests
    • POST /api/sessions/:id/tools/toggle - Toggles YOLO mode

Client-Side Example

// Connect to stream once
const eventSource = new EventSource(`/api/sessions/${sessionId}/stream`);

// Register handlers for specific events
eventSource.addEventListener('agent_choice', (event) => {
  const data = JSON.parse(event.data);
  displayMessage(data.content);
});

eventSource.addEventListener('tool_call_confirmation', async (event) => {
  const data = JSON.parse(event.data);
  const approved = await promptToolApproval(data);
  // Respond via per-session endpoint
  await fetch(`/api/sessions/${sessionId}/resume`, {
    method: 'POST',
    body: JSON.stringify({ confirmation: approved ? 'approve' : 'reject' })
  });
  // Subsequent events come through the stream
});

eventSource.addEventListener('elicitation_request', async (event) => {
  const data = JSON.parse(event.data);
  const response = await promptElicitation(data);
  // Respond via per-session endpoint
  await fetch(`/api/sessions/${sessionId}/elicitation`, {
    method: 'POST',
    body: JSON.stringify({ action: 'accept', content: response })
  });
  // Subsequent events come through the stream
});

eventSource.addEventListener('stream_stopped', (event) => {
  console.log('Agent finished');
});

// Send messages as needed
async function sendMessage(message) {
  await fetch(`/api/sessions/${sessionId}/messages`, {
    method: 'POST',
    body: JSON.stringify([{ role: 'user', content: message }])
  });
  // Response comes through EventSource stream
}

Backward Compatibility

All existing endpoints remain functionally the same - they accept the same requests. The response format changes:

  • Current: Some endpoints return SSE streams with events, others return JSON responses
  • Proposed: All endpoints return JSON responses. All events flow through the persistent GET /api/sessions/:id/stream endpoint

The request format and functionality remain unchanged - only endpoints that currently return streams change to return JSON responses, with events now flowing through the dedicated stream endpoint.

References

Conclusion

This proposal would make cagent's API:

  • More spec-compliant: Uses SSE event: field as intended
  • More idiomatic: Follows common SSE patterns (persistent stream + separate commands)
  • More efficient: Avoids unnecessary JSON parsing
  • More accessible: Works with EventSource and standard SSE libraries
  • More consistent: Aligns with MCP's SSE / streamable pattern

The changes are relatively straightforward to implement and provide significant benefits for API consumers.

If the maintainers are in agreement with these changes, I'd be happy to take a stab at a PR, and equally happy to yield to someone more familiar with the code.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions