sseparser

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2024 License: MIT Imports: 6 Imported by: 1

README

SSE Parser

This package provides functionality for parsing server-sent event streams, defined by the SSE specification.

Usage

In this example, we make a streaming chat completion request to the OpenAI platform API, and scan for response chunks:

package main

import (
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"net/http"
	"os"

	"github.com/jclem/sseparser"
)

func main() {
	openaiKey := os.Getenv("OPENAI_KEY")

	if openaiKey == "" {
		log.Fatal("OPENAI_KEY environment variable must be set")
	}

	params := map[string]interface{}{
		"model":  "gpt-3.5-turbo",
		"stream": true,
		"messages": []map[string]string{
			{
				"role":    "user",
				"content": "Hello, how are you?",
			},
		},
	}

	body, err := json.Marshal(params)
	if err != nil {
		log.Fatal(err)
	}

	req, err := http.NewRequest("POST", "https://api.openai.com/v1/chat/completions", bytes.NewReader(body))
	if err != nil {
		log.Fatal(err)
	}

	req.Header.Set("Authorization", "Bearer "+openaiKey)
	req.Header.Set("Content-Type", "application/json")

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		log.Fatal(err)
	}

	if resp.StatusCode != 200 {
		log.Fatal(resp.Status)
	}

	defer resp.Body.Close()

	// We create a new stream scanner that reads the HTTP response body.
	scanner := sseparser.NewStreamScanner(resp.Body)

	for {
		// Then, we call `UnmarshalNext`, and log each completion chunk, until we
		// encounter an error or reach the end of the stream.
		var e event
		_, err := scanner.UnmarshalNext(&e)
		if err != nil {
			if errors.Is(err, sseparser.ErrStreamEOF) {
				os.Exit(0)
			}

			log.Fatal(err)
		}

		if len(e.Data.Choices) > 0 {
			fmt.Print(e.Data.Choices[0].Delta.Content)
		}
	}
}

// The event struct uses tags to specify how to unmarshal the event data.
type event struct {
	Data chunk `sse:"data"`
}

type chunk struct {
	Choices []choice
}

// The chunk struct implements the `sseparser.UnmarshalerSSEValue` interface,
// which makes it easy to unmarshal event field values which in this case are
// complete JSON payloads.
func (c *chunk) UnmarshalSSEValue(v string) error {
	if v == "[DONE]" {
		return nil
	}

	return json.Unmarshal([]byte(v), c)
}

type choice struct {
	Delta delta
}

type delta struct {
	Content string
}

Documentation

Overview

Package sseparser provides a parser for Server-Sent Events (SSE). The SSE specification this package is modeled on can be found here: https://html.spec.whatwg.org/multipage/server-sent-events.html

The primary means of utilizing this package is through the StreamScanner type, which scans an io.Reader for SSEs.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrStreamEOF = streamEOFError{io.EOF}

ErrStreamEOF is returned when the end of the stream is reached. It wraps io.EOF.

Functions

This section is empty.

Types

type Comment

type Comment string

Comment is a comment in an SSE.

func ParseComment

func ParseComment(input []byte) (Comment, error)

ParseComment parses a byte slice into a Comment.

Example
package main

import (
	"fmt"

	"github.com/jclem/sseparser"
)

func main() {
	comment, err := sseparser.ParseComment([]byte(":hello\n"))
	if err != nil {
		panic(err)
	}

	_, _ = fmt.Println(comment)
}
Output:

hello

type Event

type Event []any

Event is a server-sent event (SSE), which is a set of zero or more comments and/or fields.

The underlying type is a []any, but each value will be either a Field or a Comment.

func ParseEvent

func ParseEvent(input []byte) (Event, error)

ParseEvent parses an input into an Event.

Example
package main

import (
	"fmt"

	"github.com/jclem/sseparser"
)

func main() {
	event, err := sseparser.ParseEvent([]byte(":hello\n:bar\nfoo:bar\n\n"))
	if err != nil {
		panic(err)
	}

	_, _ = fmt.Printf("%+v\n", event)
}
Output:

[hello bar {Name:foo Value:bar}]

func (Event) Comments

func (e Event) Comments() []Comment

Comments returns the comments in an SSE.

func (Event) Fields

func (e Event) Fields() []Field

Fields returns the fields in an SSE.

type Field

type Field struct {
	// Name is the field name.
	Name string
	// Value is the field value. This may be an empty string.
	Value string
}

Field is an SSE field: A field name and an optional value.

func ParseField

func ParseField(input []byte) (Field, error)

ParseField parses a byte slice into a Field.

Example
package main

import (
	"fmt"

	"github.com/jclem/sseparser"
)

func main() {
	field, err := sseparser.ParseField([]byte("foo: bar\n"))
	if err != nil {
		panic(err)
	}

	_, _ = fmt.Println(field.Name)
	_, _ = fmt.Println(field.Value)
}
Output:

foo
bar

type Stream

type Stream []Event

Stream is a SSE stream, which is a set of zero or more Event.

func ParseStream

func ParseStream(input []byte) (Stream, error)

ParseStream parses a byte slice into a Stream.

Example
package main

import (
	"fmt"

	"github.com/jclem/sseparser"
)

func main() {
	stream, err := sseparser.ParseStream([]byte(":hello\n:bar\nfoo:bar\n\nbaz:qux\n\n"))
	if err != nil {
		panic(err)
	}

	_, _ = fmt.Printf("%+v\n", stream)
}
Output:

[[hello bar {Name:foo Value:bar}] [{Name:baz Value:qux}]]

type StreamScanner

type StreamScanner struct {
	// contains filtered or unexported fields
}

StreamScanner scans an io.Reader for SSEs.

Example
package main

import (
	"bytes"
	"errors"
	"fmt"

	"github.com/jclem/sseparser"
)

func main() {
	input := []byte(`:event-1
field-1: value-1
field-2: value-2

:event-2
field-3: value-3

`)
	reader := bytes.NewReader(input)

	scanner := sseparser.NewStreamScanner(reader)

	for {
		e, _, err := scanner.Next()
		if err != nil {
			if errors.Is(err, sseparser.ErrStreamEOF) {
				break
			}

			panic(err)
		}

		_, _ = fmt.Printf("%+v\n", e)
	}

}
Output:

[event-1 {Name:field-1 Value:value-1} {Name:field-2 Value:value-2}]
[event-2 {Name:field-3 Value:value-3}]

func NewStreamScanner

func NewStreamScanner(reader io.Reader) *StreamScanner

NewStreamScanner scans a io.Reader for SSEs.

func (*StreamScanner) Next

func (s *StreamScanner) Next() (Event, []byte, error)

Next returns the next event in the stream. There are three possible return states:

  1. An Event, a byte slice, and nil are returned if an event was parsed.
  2. nil, a byte slice, and ErrStreamEOF are returned if the end of the stream was reached.
  3. nil, a byte slice, and an error are returned if an error occurred while reading from the stream.

In all three cases, the byte slice contains any data that was read from the reader but was not part of the event. This data can be ignored while making subsequent calls to Next, but may be used to recover from errors, or just when not scanning the full stream.

func (*StreamScanner) UnmarshalNext

func (s *StreamScanner) UnmarshalNext(v any) ([]byte, error)

UnmarshalNext unmarshals the next event in the stream into the provided struct. See StreamScanner.Next for details on the []byte and error return values.

Example
package main

import (
	"bytes"
	"errors"
	"fmt"

	"github.com/jclem/sseparser"
)

func main() {
	input := []byte(`:event-1
foo: 1
bar: hello
field-1: value-1
field-2: value-2

:event-2
foo: 1
field-3: value-3

foo: true

`)

	type testStruct struct {
		Foo string `sse:"foo"`
		Bar string `sse:"bar"`
	}

	reader := bytes.NewReader(input)
	scanner := sseparser.NewStreamScanner(reader)

	for {
		var event testStruct
		_, err := scanner.UnmarshalNext(&event)
		if err != nil {
			if errors.Is(err, sseparser.ErrStreamEOF) {
				break
			}

			panic(err)
		}

		_, _ = fmt.Printf("%+v\n", event)
	}

}
Output:

{Foo:1 Bar:hello}
{Foo:1 Bar:}
{Foo:true Bar:}

type UnmarshalerSSE

type UnmarshalerSSE interface {
	// UnmarshalSSE unmarshals the given event into the type.
	UnmarshalSSE(event Event) error
}

UnmarshalerSSE is an interface implemented by types into which an SSE can be unmarshaled.

Example
package main

import (
	"bytes"
	"fmt"

	"github.com/jclem/sseparser"
)

func main() {
	input := []byte(`:event-1
foo: 1
bar: hello

`)

	type testStruct struct {
		Foo string `sse:"foo"`
		Bar string `sse:"bar"`
	}

	reader := bytes.NewReader(input)
	scanner := sseparser.NewStreamScanner(reader)

	var s testStruct
	_, err := scanner.UnmarshalNext(&s)
	if err != nil {
		panic(err)
	}

	_, _ = fmt.Printf("%+v\n", s)
}
Output:

{Foo:1 Bar:hello}

type UnmarshalerSSEValue

type UnmarshalerSSEValue interface {
	// UnmarshalSSEValue unmarshals the given event field value into the type.
	UnmarshalSSEValue(value string) error
}

UnmarshalerSSEValue is an interface implemented by types into which an SSE field value can be unmarshaled.

This is useful for custom unmarshaling of field values, such as when a field value contains a complete JSON payload.

Example
package main

import (
	"bytes"
	"encoding/json"
	"fmt"

	"github.com/jclem/sseparser"
)

func main() {
	input := []byte(`:event-1
meta: {"foo":"bar"}

`)

	// Meta implements the UnmarshalerSSEValue interface:
	//
	// type meta struct {
	// 	Foo string `json:"foo"`
	// }
	//
	// func (m *meta) UnmarshalSSEValue(v string) error {
	// 	return json.Unmarshal([]byte(v), m)
	// }

	type testStruct struct {
		Meta meta `sse:"meta"`
	}

	reader := bytes.NewReader(input)
	scanner := sseparser.NewStreamScanner(reader)

	var s testStruct
	_, err := scanner.UnmarshalNext(&s)
	if err != nil {
		panic(err)
	}

	_, _ = fmt.Printf("%+v\n", s)
}

type meta struct {
	Foo string `json:"foo"`
}

func (m *meta) UnmarshalSSEValue(v string) error {
	if err := json.Unmarshal([]byte(v), m); err != nil {
		return fmt.Errorf("failed to unmarshal meta: %w", err)
	}

	return nil
}
Output:

{Meta:{Foo:bar}}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL