Skip to content

Conversation

@martinemde
Copy link

@martinemde martinemde commented May 27, 2025

By design, Ollama does not return SSE format from the standard /api/generate or /api/chat/completions endpoints when streaming. Instead it returns NDJSON (newline delimited JSON).

Gollm is currently designed to handle ollama streaming via the openai compatible endpoint /v1/api/generate (which emits SSE).

Unfortunately, initializing an ollama provider with the openai compatible ollama endpoint http://localhost:11434/v1 , causes gollm to fail checking /v1/api/tags because the tags endpoint doesn't exist under the /v1 namespace. Initializing without that endpoint, using the default, results in streaming responses being ignored because they don’t match the SSE data: start.

Therefore, it seems clear to me that no one is using ollama streaming under the current conditions.

We should be careful not to break any current usage (which only makes sense if people are avoiding streaming). I see a few options for solving it.

  1. Add NDJSON parsing for ollama streaming (since it doesn't seem plausible that anyone is currently using ollama streaming)
    Advantage: Doesn't break exiting users.
    Advantage: Adds streaming without requiring changes to anyone's code.
    Downside: Adds NDJSONDecoder which is only used for Ollama.
  2. Add a new ollama-openai provider.
    Advantage: Compatible with most of the rest of gollm, since it uses openai pattern
    Downside: ollama declares this compatibility experimental and notes that it could have breaking changes and provides incomplete feature set
    Downside: Need to add a whole new provider.
Details I guess we would do something like this, which also allows us to add json schema support.
@@ -239,6 +240,16 @@ func NewProviderRegistry(providerNames ...string) *ProviderRegistry {
                        SupportsSchema:    false,
                        SupportsStreaming: true,
                },
+               "ollama-openai": {
+                       Name:              "ollama-openai",
+                       Type:              TypeOpenAI,
+                       Endpoint:          "http://localhost:11434/v1/api/generate",
+                       AuthHeader:        "", // Ollama doesn't require authentication
+                       AuthPrefix:        "",
+                       RequiredHeaders:   map[string]string{"Content-Type": "application/json"},
+                       SupportsSchema:    true,
+                       SupportsStreaming: true,
+               },
                "deepseek": {
                        Name:              "deepseek",
                        Type:              TypeOpenAI,

I'm a little confused about if there's some reason this isn't working for me. Is it working for others? It seems like there's an xOR between using the stream compatible openai version and passing the /api/tags check. If I'm wrong about this, let me know and feel free to close this.

Summary by Sourcery

Enable streaming support for Ollama by implementing a StreamDecoder abstraction and an NDJSONDecoder, updating the Ollama provider to signal completion, and covering both SSE and NDJSON streams with unit tests.

New Features:

  • Add NDJSONDecoder to parse newline-delimited JSON streaming for the Ollama provider
  • Emit EOF when Ollama provider signals completion via its done flag

Enhancements:

  • Introduce StreamDecoder interface to unify SSE and NDJSON decoders
  • Select the appropriate decoder in providerStream based on provider name

Tests:

  • Add unit tests for SSEDecoder event parsing
  • Add unit tests for NDJSONDecoder, including skipping empty lines

Summary by Sourcery

Enable streaming support for the Ollama provider by adding an NDJSON decoder, integrating it via a new StreamDecoder interface, and updating the provider to signal completion correctly.

New Features:

  • Add NDJSONDecoder to support newline-delimited JSON streaming for the Ollama provider
  • Emit EOF on Ollama provider’s done flag to signal completion during streaming

Enhancements:

  • Introduce StreamDecoder interface to unify SSE and NDJSON decoding
  • Select decoder implementation based on provider name in providerStream

Tests:

  • Add unit tests for SSEDecoder event parsing
  • Add unit tests for NDJSONDecoder including skipping empty lines

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented May 27, 2025

Reviewer's Guide

This PR adds NDJSON streaming support for the Ollama provider by introducing a unified StreamDecoder interface (covering SSE and NDJSON), updating providerStream selection logic, extending the Ollama ParseStreamResponse to emit EOF on completion, and providing unit tests for both decoders.

Class diagram for StreamDecoder abstraction and decoders

classDiagram
    class StreamDecoder {
        <<interface>>
        +Next() bool
        +Event() Event
        +Err() error
    }
    class SSEDecoder {
        +Next() bool
        +Event() Event
        +Err() error
    }
    class NDJSONDecoder {
        +Next() bool
        +Event() Event
        +Err() error
    }
    StreamDecoder <|.. SSEDecoder
    StreamDecoder <|.. NDJSONDecoder
Loading

Class diagram for providerStream changes

classDiagram
    class providerStream {
        -decoder: StreamDecoder
        -provider: Provider
        -config: StreamConfig
        -buffer: []byte
    }
    class Provider {
        <<interface>>
        +Name() string
    }
    class StreamConfig
    providerStream --> Provider
    providerStream --> StreamConfig
    providerStream o-- StreamDecoder
Loading

Class diagram for OllamaProvider ParseStreamResponse update

classDiagram
    class OllamaProvider {
        +ParseStreamResponse(chunk []byte) (string, error)
    }
    OllamaProvider : +ParseStreamResponse(chunk []byte) (string, error)
Loading

File-Level Changes

Change Details Files
Introduce a StreamDecoder abstraction and support both SSE and NDJSON formats
  • Define StreamDecoder interface
  • Retain SSEDecoder implementation
  • Implement NewNDJSONDecoder, Next, Event, Err methods
llm/stream.go
Select decoder at runtime based on provider type
  • Change providerStream.decoder type to StreamDecoder
  • In newProviderStream, use NDJSONDecoder for "ollama" and SSEDecoder otherwise
llm/llm.go
Emit EOF when Ollama signals streaming completion
  • Check response.Done flag in ParseStreamResponse
  • Return io.EOF on final chunk
providers/ollama.go
Add unit tests for streaming decoders
  • Test SSEDecoder event parsing
  • Test NDJSONDecoder basic parsing and skipping empty lines
llm/stream_test.go

Possibly linked issues

  • #0: PR adds NDJSON parsing for Ollama streaming, implementing a type of streaming requested in the issue.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Ollama's default endpoint uses newline delimited json for streaming.
@martinemde martinemde force-pushed the martinemde/ollama-streaming branch from 69e4538 to ebc48c9 Compare May 27, 2025 16:09
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @martinemde - I've reviewed your changes and they look great!

Here's what I looked at during the review
  • 🟡 General issues: 2 issues found
  • 🟢 Security: all looks good
  • 🟡 Testing: 3 issues found
  • 🟢 Complexity: all looks good
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +179 to +185
if !d.scanner.Scan() {
d.err = d.scanner.Err()
return false
}

line := d.scanner.Bytes()
if len(line) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Avoid recursion when skipping empty lines

Using recursion here may cause stack overflow with many empty lines. Use an iterative loop to skip blank lines instead.

Suggested change
if !d.scanner.Scan() {
d.err = d.scanner.Err()
return false
}
line := d.scanner.Bytes()
if len(line) == 0 {
for {
if !d.scanner.Scan() {
d.err = d.scanner.Err()
return false
}
line := d.scanner.Bytes()
if len(line) != 0 {
break
}
// continue scanning for next non-empty line
}

Comment on lines +168 to +172
func NewNDJSONDecoder(reader io.Reader) *NDJSONDecoder {
return &NDJSONDecoder{
scanner: bufio.NewScanner(reader),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Increase bufio.Scanner buffer or switch reader for long lines

The default 64 KB buffer may truncate large JSON lines. Increase the buffer size with scanner.Buffer(...) or use bufio.Reader with json.Decoder for larger payloads.

Suggested change
func NewNDJSONDecoder(reader io.Reader) *NDJSONDecoder {
return &NDJSONDecoder{
scanner: bufio.NewScanner(reader),
}
}
func NewNDJSONDecoder(reader io.Reader) *NDJSONDecoder {
scanner := bufio.NewScanner(reader)
// Increase buffer size to 1MB to handle large JSON lines
const maxCapacity = 1024 * 1024 // 1MB
buf := make([]byte, maxCapacity)
scanner.Buffer(buf, maxCapacity)
return &NDJSONDecoder{
scanner: scanner,
}
}

"testing"
)

func TestSSEDecoder(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Enhance TestSSEDecoder with empty input, error conditions, and Err() validation.

Please add tests for empty input, reader errors, malformed SSE data, and check the return value of decoder.Err() in these cases.

Suggested implementation:

func TestSSEDecoder(t *testing.T) {
	sseData := `event: message
data: Hello

event: message
data: World

`
	reader := strings.NewReader(sseData)
	decoder := NewSSEDecoder(reader)

	// First event
	if !decoder.Next() {
	// First event
	if !decoder.Next() {
		t.Fatal("expected first event")
	}
	if decoder.Event() != "message" {
		t.Errorf("expected event 'message', got %q", decoder.Event())
	}
	if decoder.Data() != "Hello" {
		t.Errorf("expected data 'Hello', got %q", decoder.Data())
	}

	// Second event
	if !decoder.Next() {
		t.Fatal("expected second event")
	}
	if decoder.Event() != "message" {
		t.Errorf("expected event 'message', got %q", decoder.Event())
	}
	if decoder.Data() != "World" {
		t.Errorf("expected data 'World', got %q", decoder.Data())
	}

	// No more events
	if decoder.Next() {
		t.Fatal("expected no more events")
	}
	if err := decoder.Err(); err != nil {
		t.Errorf("expected no error, got %v", err)
	}

	// Test empty input
	emptyReader := strings.NewReader("")
	emptyDecoder := NewSSEDecoder(emptyReader)
	if emptyDecoder.Next() {
		t.Error("expected no events for empty input")
	}
	if err := emptyDecoder.Err(); err != nil {
		t.Errorf("expected no error for empty input, got %v", err)
	}

	// Test malformed SSE data (missing colon)
	malformedData := `event message
data Hello
`
	malformedReader := strings.NewReader(malformedData)
	malformedDecoder := NewSSEDecoder(malformedReader)
	if malformedDecoder.Next() {
		t.Error("expected no events for malformed input")
	}
	if err := malformedDecoder.Err(); err == nil {
		t.Error("expected error for malformed input, got nil")
	}

	// Test reader error
	type errorReader struct{}
	func (e *errorReader) Read(p []byte) (int, error) {
		return 0, io.ErrUnexpectedEOF
	}
	errReader := &errorReader{}
	errDecoder := NewSSEDecoder(errReader)
	if errDecoder.Next() {
		t.Error("expected no events for error reader")
	}
	if err := errDecoder.Err(); err == nil {
		t.Error("expected error from error reader, got nil")
	}

}
}

func TestNDJSONDecoder(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Enhance TestNDJSONDecoder with empty input, error conditions, and Err() validation.

Please add tests for empty input, error scenarios from the underlying scanner, and ensure decoder.Err() is validated in these cases.

if decoder.Next() {
t.Error("Expected no more events")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add test for NDJSONDecoder with input containing only empty lines.

Please also add a test where the input is only empty lines (e.g., strings.NewReader("\n\n\n")). In this case, decoder.Next() should always return false and decoder.Err() should remain nil.

Suggested change
}
}
func TestNDJSONDecoderOnlyEmptyLines(t *testing.T) {
ndjsonData := "\n\n\n"
decoder := NewNDJSONDecoder(strings.NewReader(ndjsonData))
if decoder.Next() {
t.Error("Expected Next() to return false for only empty lines")
}
if decoder.Err() != nil {
t.Errorf("Expected Err() to be nil, got %v", decoder.Err())
}
}

@teilomillet
Copy link
Owner

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @martinemde - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `llm/stream.go:174` </location>
<code_context>
+	}
+}
+
+func (d *NDJSONDecoder) Next() bool {
+	if d.err != nil {
+		return false
</code_context>

<issue_to_address>
Recursive call in Next() could cause stack overflow on many empty lines.

Replace the recursive call with a loop to skip empty lines and prevent potential stack overflow.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants