-
Notifications
You must be signed in to change notification settings - Fork 58
An option for Ollama streaming on non-openai compatible endpoints #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
An option for Ollama streaming on non-openai compatible endpoints #43
Conversation
Reviewer's GuideThis PR adds NDJSON streaming support for the Ollama provider by introducing a unified StreamDecoder interface (covering SSE and NDJSON), updating providerStream selection logic, extending the Ollama ParseStreamResponse to emit EOF on completion, and providing unit tests for both decoders. Class diagram for StreamDecoder abstraction and decodersclassDiagram
class StreamDecoder {
<<interface>>
+Next() bool
+Event() Event
+Err() error
}
class SSEDecoder {
+Next() bool
+Event() Event
+Err() error
}
class NDJSONDecoder {
+Next() bool
+Event() Event
+Err() error
}
StreamDecoder <|.. SSEDecoder
StreamDecoder <|.. NDJSONDecoder
Class diagram for providerStream changesclassDiagram
class providerStream {
-decoder: StreamDecoder
-provider: Provider
-config: StreamConfig
-buffer: []byte
}
class Provider {
<<interface>>
+Name() string
}
class StreamConfig
providerStream --> Provider
providerStream --> StreamConfig
providerStream o-- StreamDecoder
Class diagram for OllamaProvider ParseStreamResponse updateclassDiagram
class OllamaProvider {
+ParseStreamResponse(chunk []byte) (string, error)
}
OllamaProvider : +ParseStreamResponse(chunk []byte) (string, error)
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Ollama's default endpoint uses newline delimited json for streaming.
69e4538 to
ebc48c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @martinemde - I've reviewed your changes and they look great!
Here's what I looked at during the review
- 🟡 General issues: 2 issues found
- 🟢 Security: all looks good
- 🟡 Testing: 3 issues found
- 🟢 Complexity: all looks good
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| if !d.scanner.Scan() { | ||
| d.err = d.scanner.Err() | ||
| return false | ||
| } | ||
|
|
||
| line := d.scanner.Bytes() | ||
| if len(line) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): Avoid recursion when skipping empty lines
Using recursion here may cause stack overflow with many empty lines. Use an iterative loop to skip blank lines instead.
| if !d.scanner.Scan() { | |
| d.err = d.scanner.Err() | |
| return false | |
| } | |
| line := d.scanner.Bytes() | |
| if len(line) == 0 { | |
| for { | |
| if !d.scanner.Scan() { | |
| d.err = d.scanner.Err() | |
| return false | |
| } | |
| line := d.scanner.Bytes() | |
| if len(line) != 0 { | |
| break | |
| } | |
| // continue scanning for next non-empty line | |
| } |
| func NewNDJSONDecoder(reader io.Reader) *NDJSONDecoder { | ||
| return &NDJSONDecoder{ | ||
| scanner: bufio.NewScanner(reader), | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Increase bufio.Scanner buffer or switch reader for long lines
The default 64 KB buffer may truncate large JSON lines. Increase the buffer size with scanner.Buffer(...) or use bufio.Reader with json.Decoder for larger payloads.
| func NewNDJSONDecoder(reader io.Reader) *NDJSONDecoder { | |
| return &NDJSONDecoder{ | |
| scanner: bufio.NewScanner(reader), | |
| } | |
| } | |
| func NewNDJSONDecoder(reader io.Reader) *NDJSONDecoder { | |
| scanner := bufio.NewScanner(reader) | |
| // Increase buffer size to 1MB to handle large JSON lines | |
| const maxCapacity = 1024 * 1024 // 1MB | |
| buf := make([]byte, maxCapacity) | |
| scanner.Buffer(buf, maxCapacity) | |
| return &NDJSONDecoder{ | |
| scanner: scanner, | |
| } | |
| } |
| "testing" | ||
| ) | ||
|
|
||
| func TestSSEDecoder(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Enhance TestSSEDecoder with empty input, error conditions, and Err() validation.
Please add tests for empty input, reader errors, malformed SSE data, and check the return value of decoder.Err() in these cases.
Suggested implementation:
func TestSSEDecoder(t *testing.T) {
sseData := `event: message
data: Hello
event: message
data: World
`
reader := strings.NewReader(sseData)
decoder := NewSSEDecoder(reader)
// First event
if !decoder.Next() { // First event
if !decoder.Next() {
t.Fatal("expected first event")
}
if decoder.Event() != "message" {
t.Errorf("expected event 'message', got %q", decoder.Event())
}
if decoder.Data() != "Hello" {
t.Errorf("expected data 'Hello', got %q", decoder.Data())
}
// Second event
if !decoder.Next() {
t.Fatal("expected second event")
}
if decoder.Event() != "message" {
t.Errorf("expected event 'message', got %q", decoder.Event())
}
if decoder.Data() != "World" {
t.Errorf("expected data 'World', got %q", decoder.Data())
}
// No more events
if decoder.Next() {
t.Fatal("expected no more events")
}
if err := decoder.Err(); err != nil {
t.Errorf("expected no error, got %v", err)
}
// Test empty input
emptyReader := strings.NewReader("")
emptyDecoder := NewSSEDecoder(emptyReader)
if emptyDecoder.Next() {
t.Error("expected no events for empty input")
}
if err := emptyDecoder.Err(); err != nil {
t.Errorf("expected no error for empty input, got %v", err)
}
// Test malformed SSE data (missing colon)
malformedData := `event message
data Hello
`
malformedReader := strings.NewReader(malformedData)
malformedDecoder := NewSSEDecoder(malformedReader)
if malformedDecoder.Next() {
t.Error("expected no events for malformed input")
}
if err := malformedDecoder.Err(); err == nil {
t.Error("expected error for malformed input, got nil")
}
// Test reader error
type errorReader struct{}
func (e *errorReader) Read(p []byte) (int, error) {
return 0, io.ErrUnexpectedEOF
}
errReader := &errorReader{}
errDecoder := NewSSEDecoder(errReader)
if errDecoder.Next() {
t.Error("expected no events for error reader")
}
if err := errDecoder.Err(); err == nil {
t.Error("expected error from error reader, got nil")
}| } | ||
| } | ||
|
|
||
| func TestNDJSONDecoder(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Enhance TestNDJSONDecoder with empty input, error conditions, and Err() validation.
Please add tests for empty input, error scenarios from the underlying scanner, and ensure decoder.Err() is validated in these cases.
| if decoder.Next() { | ||
| t.Error("Expected no more events") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Add test for NDJSONDecoder with input containing only empty lines.
Please also add a test where the input is only empty lines (e.g., strings.NewReader("\n\n\n")). In this case, decoder.Next() should always return false and decoder.Err() should remain nil.
| } | |
| } | |
| func TestNDJSONDecoderOnlyEmptyLines(t *testing.T) { | |
| ndjsonData := "\n\n\n" | |
| decoder := NewNDJSONDecoder(strings.NewReader(ndjsonData)) | |
| if decoder.Next() { | |
| t.Error("Expected Next() to return false for only empty lines") | |
| } | |
| if decoder.Err() != nil { | |
| t.Errorf("Expected Err() to be nil, got %v", decoder.Err()) | |
| } | |
| } |
|
@sourcery-ai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @martinemde - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `llm/stream.go:174` </location>
<code_context>
+ }
+}
+
+func (d *NDJSONDecoder) Next() bool {
+ if d.err != nil {
+ return false
</code_context>
<issue_to_address>
Recursive call in Next() could cause stack overflow on many empty lines.
Replace the recursive call with a loop to skip empty lines and prevent potential stack overflow.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
By design, Ollama does not return SSE format from the standard /api/generate or /api/chat/completions endpoints when streaming. Instead it returns NDJSON (newline delimited JSON).
Gollm is currently designed to handle ollama streaming via the openai compatible endpoint
/v1/api/generate(which emits SSE).Unfortunately, initializing an ollama provider with the openai compatible ollama endpoint
http://localhost:11434/v1, causes gollm to fail checking/v1/api/tagsbecause the tags endpoint doesn't exist under the/v1namespace. Initializing without that endpoint, using the default, results in streaming responses being ignored because they don’t match the SSEdata:start.Therefore, it seems clear to me that no one is using ollama streaming under the current conditions.
We should be careful not to break any current usage (which only makes sense if people are avoiding streaming). I see a few options for solving it.
Advantage: Doesn't break exiting users.
Advantage: Adds streaming without requiring changes to anyone's code.
Downside: Adds NDJSONDecoder which is only used for Ollama.
Advantage: Compatible with most of the rest of gollm, since it uses openai pattern
Downside: ollama declares this compatibility experimental and notes that it could have breaking changes and provides incomplete feature set
Downside: Need to add a whole new provider.
Details
I guess we would do something like this, which also allows us to add json schema support.I'm a little confused about if there's some reason this isn't working for me. Is it working for others? It seems like there's an xOR between using the stream compatible openai version and passing the
/api/tagscheck. If I'm wrong about this, let me know and feel free to close this.Summary by Sourcery
Enable streaming support for Ollama by implementing a StreamDecoder abstraction and an NDJSONDecoder, updating the Ollama provider to signal completion, and covering both SSE and NDJSON streams with unit tests.
New Features:
Enhancements:
Tests:
Summary by Sourcery
Enable streaming support for the Ollama provider by adding an NDJSON decoder, integrating it via a new StreamDecoder interface, and updating the provider to signal completion correctly.
New Features:
Enhancements:
Tests: