summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKhang Ha (Kelvin) <[email protected]>2025-10-30 13:04:06 +0700
committerGitHub <[email protected]>2025-10-30 01:04:06 -0500
commit4b5e44796156a39f386b26b88f16743011b66323 (patch)
treef36cb0f4079ed197f1129e94384f1fcc0fc3b700
parent7a2b8eae76380780d7e9e0a1e57c2e3a3afb1f64 (diff)
downloadopencode-4b5e44796156a39f386b26b88f16743011b66323.tar.gz
opencode-4b5e44796156a39f386b26b88f16743011b66323.zip
Fix "bufio.Scanner token too long" error by replacing Scanner with Reader in SSE (#3531)
-rw-r--r--packages/tui/cmd/opencode/main.go7
-rw-r--r--packages/tui/internal/decoders/decoder.go118
-rw-r--r--packages/tui/internal/decoders/decoder_test.go194
3 files changed, 319 insertions, 0 deletions
diff --git a/packages/tui/cmd/opencode/main.go b/packages/tui/cmd/opencode/main.go
index 22841fc89..3a7d1848a 100644
--- a/packages/tui/cmd/opencode/main.go
+++ b/packages/tui/cmd/opencode/main.go
@@ -13,9 +13,11 @@ import (
flag "github.com/spf13/pflag"
"github.com/sst/opencode-sdk-go"
"github.com/sst/opencode-sdk-go/option"
+ "github.com/sst/opencode-sdk-go/packages/ssestream"
"github.com/sst/opencode/internal/api"
"github.com/sst/opencode/internal/app"
"github.com/sst/opencode/internal/clipboard"
+ "github.com/sst/opencode/internal/decoders"
"github.com/sst/opencode/internal/tui"
"github.com/sst/opencode/internal/util"
"golang.org/x/sync/errgroup"
@@ -61,6 +63,11 @@ func main() {
}
}
+ // Register custom SSE decoder to handle large events (>32MB)
+ // This is a workaround for the bufio.Scanner token size limit in the auto-generated SDK
+ // See: packages/tui/internal/decoders/decoder.go
+ ssestream.RegisterDecoder("text/event-stream", decoders.NewUnboundedDecoder)
+
httpClient := opencode.NewClient(
option.WithBaseURL(url),
)
diff --git a/packages/tui/internal/decoders/decoder.go b/packages/tui/internal/decoders/decoder.go
new file mode 100644
index 000000000..efb699202
--- /dev/null
+++ b/packages/tui/internal/decoders/decoder.go
@@ -0,0 +1,118 @@
+package decoders
+
+import (
+ "bufio"
+ "bytes"
+ "io"
+
+ "github.com/sst/opencode-sdk-go/packages/ssestream"
+)
+
+// UnboundedDecoder is an SSE decoder that uses bufio.Reader instead of bufio.Scanner
+// to avoid the 32MB token size limit. This is a workaround for large SSE events until
+// the upstream Stainless SDK is fixed.
+//
+// This decoder handles SSE events of unlimited size by reading line-by-line with
+// bufio.Reader.ReadBytes('\n'), which dynamically grows the buffer as needed.
+type UnboundedDecoder struct {
+ reader *bufio.Reader
+ closer io.ReadCloser
+ evt ssestream.Event
+ err error
+}
+
+// NewUnboundedDecoder creates a new unbounded SSE decoder with a 1MB initial buffer size
+func NewUnboundedDecoder(rc io.ReadCloser) ssestream.Decoder {
+ reader := bufio.NewReaderSize(rc, 1024*1024) // 1MB initial buffer
+ return &UnboundedDecoder{
+ reader: reader,
+ closer: rc,
+ }
+}
+
+// Next reads and decodes the next SSE event from the stream
+func (d *UnboundedDecoder) Next() bool {
+ if d.err != nil {
+ return false
+ }
+
+ event := ""
+ data := bytes.NewBuffer(nil)
+
+ for {
+ line, err := d.reader.ReadBytes('\n')
+ if err != nil {
+ if err == io.EOF && len(line) == 0 {
+ return false
+ }
+ if err != io.EOF {
+ d.err = err
+ return false
+ }
+ }
+
+ // Remove trailing newline characters
+ line = bytes.TrimRight(line, "\r\n")
+
+ // Empty line indicates end of event
+ if len(line) == 0 {
+ if data.Len() > 0 || event != "" {
+ d.evt = ssestream.Event{
+ Type: event,
+ Data: data.Bytes(),
+ }
+ return true
+ }
+ continue
+ }
+
+ // Skip comments (lines starting with ':')
+ if line[0] == ':' {
+ continue
+ }
+
+ // Parse field
+ name, value, found := bytes.Cut(line, []byte(":"))
+ if !found {
+ // Field with no value
+ continue
+ }
+
+ // Remove leading space from value
+ if len(value) > 0 && value[0] == ' ' {
+ value = value[1:]
+ }
+
+ switch string(name) {
+ case "":
+ // An empty line in the form ": something" is a comment and should be ignored
+ continue
+ case "event":
+ event = string(value)
+ case "data":
+ _, d.err = data.Write(value)
+ if d.err != nil {
+ return false
+ }
+ _, d.err = data.WriteRune('\n')
+ if d.err != nil {
+ return false
+ }
+ }
+ }
+}
+
+// Event returns the current event
+func (d *UnboundedDecoder) Event() ssestream.Event {
+ return d.evt
+}
+
+// Close closes the underlying reader
+func (d *UnboundedDecoder) Close() error {
+ return d.closer.Close()
+}
+
+// Err returns any error that occurred during decoding
+func (d *UnboundedDecoder) Err() error {
+ return d.err
+}
diff --git a/packages/tui/internal/decoders/decoder_test.go b/packages/tui/internal/decoders/decoder_test.go
new file mode 100644
index 000000000..e5ad1d55a
--- /dev/null
+++ b/packages/tui/internal/decoders/decoder_test.go
@@ -0,0 +1,194 @@
+package decoders
+
+import (
+ "bytes"
+ "io"
+ "strings"
+ "testing"
+)
+
+func TestUnboundedDecoder_SmallEvent(t *testing.T) {
+ data := "event: test\ndata: hello world\n\n"
+ rc := io.NopCloser(strings.NewReader(data))
+ decoder := NewUnboundedDecoder(rc)
+
+ if !decoder.Next() {
+ t.Fatal("Expected Next() to return true")
+ }
+
+ evt := decoder.Event()
+ if evt.Type != "test" {
+ t.Errorf("Expected event type 'test', got '%s'", evt.Type)
+ }
+ if string(evt.Data) != "hello world\n" {
+ t.Errorf("Expected data 'hello world\\n', got '%s'", string(evt.Data))
+ }
+
+ if decoder.Next() {
+ t.Error("Expected Next() to return false at end of stream")
+ }
+
+ if err := decoder.Err(); err != nil {
+ t.Errorf("Expected no error, got %v", err)
+ }
+}
+
+func TestUnboundedDecoder_LargeEvent(t *testing.T) {
+ // Create a large event (50MB)
+ size := 50 * 1024 * 1024
+ largeData := strings.Repeat("x", size)
+
+ var buf bytes.Buffer
+ buf.WriteString("event: large\n")
+ buf.WriteString("data: ")
+ buf.WriteString(largeData)
+ buf.WriteString("\n\n")
+
+ rc := io.NopCloser(&buf)
+ decoder := NewUnboundedDecoder(rc)
+
+ if !decoder.Next() {
+ t.Fatal("Expected Next() to return true")
+ }
+
+ evt := decoder.Event()
+ if evt.Type != "large" {
+ t.Errorf("Expected event type 'large', got '%s'", evt.Type)
+ }
+
+ expectedData := largeData + "\n"
+ if string(evt.Data) != expectedData {
+ t.Errorf("Data size mismatch: expected %d, got %d", len(expectedData), len(evt.Data))
+ }
+
+ if decoder.Next() {
+ t.Error("Expected Next() to return false at end of stream")
+ }
+
+ if err := decoder.Err(); err != nil {
+ t.Errorf("Expected no error, got %v", err)
+ }
+}
+
+func TestUnboundedDecoder_MultipleEvents(t *testing.T) {
+ data := "event: first\ndata: data1\n\nevent: second\ndata: data2\n\n"
+ rc := io.NopCloser(strings.NewReader(data))
+ decoder := NewUnboundedDecoder(rc)
+
+ // First event
+ if !decoder.Next() {
+ t.Fatal("Expected Next() to return true for first event")
+ }
+ evt := decoder.Event()
+ if evt.Type != "first" {
+ t.Errorf("Expected event type 'first', got '%s'", evt.Type)
+ }
+ if string(evt.Data) != "data1\n" {
+ t.Errorf("Expected data 'data1\\n', got '%s'", string(evt.Data))
+ }
+
+ // Second event
+ if !decoder.Next() {
+ t.Fatal("Expected Next() to return true for second event")
+ }
+ evt = decoder.Event()
+ if evt.Type != "second" {
+ t.Errorf("Expected event type 'second', got '%s'", evt.Type)
+ }
+ if string(evt.Data) != "data2\n" {
+ t.Errorf("Expected data 'data2\\n', got '%s'", string(evt.Data))
+ }
+
+ // No more events
+ if decoder.Next() {
+ t.Error("Expected Next() to return false at end of stream")
+ }
+
+ if err := decoder.Err(); err != nil {
+ t.Errorf("Expected no error, got %v", err)
+ }
+}
+
+func TestUnboundedDecoder_MultilineData(t *testing.T) {
+ data := "event: multiline\ndata: line1\ndata: line2\ndata: line3\n\n"
+ rc := io.NopCloser(strings.NewReader(data))
+ decoder := NewUnboundedDecoder(rc)
+
+ if !decoder.Next() {
+ t.Fatal("Expected Next() to return true")
+ }
+
+ evt := decoder.Event()
+ if evt.Type != "multiline" {
+ t.Errorf("Expected event type 'multiline', got '%s'", evt.Type)
+ }
+
+ expectedData := "line1\nline2\nline3\n"
+ if string(evt.Data) != expectedData {
+ t.Errorf("Expected data '%s', got '%s'", expectedData, string(evt.Data))
+ }
+}
+
+func TestUnboundedDecoder_Comments(t *testing.T) {
+ data := ": this is a comment\nevent: test\n: another comment\ndata: hello\n\n"
+ rc := io.NopCloser(strings.NewReader(data))
+ decoder := NewUnboundedDecoder(rc)
+
+ if !decoder.Next() {
+ t.Fatal("Expected Next() to return true")
+ }
+
+ evt := decoder.Event()
+ if evt.Type != "test" {
+ t.Errorf("Expected event type 'test', got '%s'", evt.Type)
+ }
+ if string(evt.Data) != "hello\n" {
+ t.Errorf("Expected data 'hello\\n', got '%s'", string(evt.Data))
+ }
+}
+
+func TestUnboundedDecoder_NoEventType(t *testing.T) {
+ data := "data: hello\n\n"
+ rc := io.NopCloser(strings.NewReader(data))
+ decoder := NewUnboundedDecoder(rc)
+
+ if !decoder.Next() {
+ t.Fatal("Expected Next() to return true")
+ }
+
+ evt := decoder.Event()
+ if evt.Type != "" {
+ t.Errorf("Expected empty event type, got '%s'", evt.Type)
+ }
+ if string(evt.Data) != "hello\n" {
+ t.Errorf("Expected data 'hello\\n', got '%s'", string(evt.Data))
+ }
+}
+
+func BenchmarkUnboundedDecoder_LargeEvent(b *testing.B) {
+ // Create a 10MB event for benchmarking
+ size := 10 * 1024 * 1024
+ largeData := strings.Repeat("x", size)
+
+ var buf bytes.Buffer
+ buf.WriteString("event: bench\n")
+ buf.WriteString("data: ")
+ buf.WriteString(largeData)
+ buf.WriteString("\n\n")
+
+ data := buf.Bytes()
+
+ b.ResetTimer()
+ b.SetBytes(int64(len(data)))
+
+ for i := 0; i < b.N; i++ {
+ rc := io.NopCloser(bytes.NewReader(data))
+ decoder := NewUnboundedDecoder(rc)
+
+ if !decoder.Next() {
+ b.Fatal("Expected Next() to return true")
+ }
+
+ _ = decoder.Event()
+ }
+}