sse

package
v1.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: Apache-2.0 Imports: 16 Imported by: 1

README

SSE client

SSE package provide basic functionality to work with Casper events that streamed by SSE server. It connects to the server and collect events to go channel, from other side consumers obtain this stream and delegate the process to specific handlers.

The example of simple usage:

func main() {
	client := sse.NewClient("http://52.3.38.81:9999/events/main")
	defer client.Stop()
	client.RegisterHandler(
		sse.DeployProcessedEventType,
		func(ctx context.Context, rawEvent sse.RawEvent) error {
			log.Printf("eventID: %d, raw data: %s", rawEvent.EventID, rawEvent.Data)
			deploy, err := rawEvent.ParseAsDeployProcessedEvent()
			if err != nil {
				return err
			}
			log.Printf("Deploy hash: %s", deploy.DeployProcessed.DeployHash)
			return nil
		})
	lastEventID := 1234
	client.Start(context.TODO(), lastEventID)
}

For more examples, please check example_test.go

The SSE client is flexible and configurable, for the advanced usage check the advanced doc

Documentation

Index

Constants

View Source
const DefaultBufferSize = 4096

Variables

View Source
var AllEventsNames = map[EventType]string{
	APIVersionEventType:      "ApiVersion",
	BlockAddedEventType:      "BlockAdded",
	DeployProcessedEventType: "DeployProcessed",
	DeployAcceptedEventType:  "DeployAccepted",
	DeployExpiredEventType:   "DeployExpired",
	StepEventType:            "Step",
	FaultEventType:           "Fault",
	FinalitySignatureType:    "FinalitySignature",
	ShutdownType:             "Shutdown",
}
View Source
var ErrFullStreamTimeoutError = errors.New("can't fill the stream, because it full")
View Source
var ErrHandlerNotRegistered = errors.New("handler is not registered")

Functions

func ParseEvent added in v1.0.0

func ParseEvent[T interface{}](data []byte) (T, error)

Types

type APIVersionEvent

type APIVersionEvent struct {
	APIVersion string `json:"ApiVersion"`
}

type BlockAdded

type BlockAdded struct {
	BlockHash string      `json:"block_hash"`
	Block     types.Block `json:"block"`
}

BlockAddedEvent definition

type BlockAddedEvent

type BlockAddedEvent struct {
	BlockAdded BlockAdded `json:"BlockAdded"`
}

BlockAddedEvent definition

type Client

type Client struct {
	Streamer *Streamer
	Consumer *Consumer

	EventStream chan RawEvent

	StreamErrorHandler   func(<-chan error)
	ConsumerErrorHandler func(<-chan error)

	WorkersCount int
	// contains filtered or unexported fields
}

Client is a facade that provide convenient interface to process data from the stream, and unites Streamer and Consumer under implementation. Also, the Client allows to register global middleware that will be applied for all handlers.

func NewClient

func NewClient(url string) *Client

func (*Client) RegisterHandler

func (p *Client) RegisterHandler(eventType EventType, handler HandlerFunc)

func (*Client) RegisterMiddleware

func (p *Client) RegisterMiddleware(one Middleware)

func (*Client) Start

func (p *Client) Start(ctx context.Context, lastEventID int) error

func (*Client) Stop

func (p *Client) Stop()

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a service that registers event handlers and assigns events from the stream to specific handlers.

func NewConsumer

func NewConsumer() *Consumer

func (*Consumer) RegisterHandler

func (c *Consumer) RegisterHandler(eventType EventType, handler HandlerFunc)

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context, events <-chan RawEvent, errCh chan<- error) error

type CtxWorkerID

type CtxWorkerID string
const CtxWorkerIDKey CtxWorkerID = "workerID"

type DeployAcceptedEvent

type DeployAcceptedEvent struct {
	DeployAccepted types.Deploy `json:"DeployAccepted"`
}

type DeployExpiredEvent added in v1.0.0

type DeployExpiredEvent struct {
	DeployExpired DeployExpiredPayload `json:"DeployExpired"`
}

type DeployExpiredPayload added in v1.0.0

type DeployExpiredPayload struct {
	DeployHash key.Hash `json:"deploy_hash"`
}

type DeployProcessedEvent

type DeployProcessedEvent struct {
	DeployProcessed DeployProcessedPayload `json:"DeployProcessed"`
}

type DeployProcessedPayload added in v1.0.0

type DeployProcessedPayload struct {
	DeployHash      key.Hash                    `json:"deploy_hash"`
	Account         string                      `json:"account"`
	Timestamp       time.Time                   `json:"timestamp"`
	TTL             string                      `json:"ttl"`
	BlockHash       key.Hash                    `json:"block_hash"`
	ExecutionResult types.ExecutionResultStatus `json:"execution_result"`
}

type ErrUnknownEventType

type ErrUnknownEventType struct {
	RawData []byte
	// contains filtered or unexported fields
}

func NewErrUnknownEventType

func NewErrUnknownEventType(data []byte) ErrUnknownEventType

func (ErrUnknownEventType) Error

func (e ErrUnknownEventType) Error() string

type EventData

type EventData = json.RawMessage

type EventParser

type EventParser struct {
	// contains filtered or unexported fields
}

func NewEventParser

func NewEventParser() *EventParser

func (*EventParser) ParseRawEvent

func (e *EventParser) ParseRawEvent(data []byte) (RawEvent, error)

func (*EventParser) RegisterEvent

func (e *EventParser) RegisterEvent(eventType EventType)

type EventStreamReader

type EventStreamReader struct {
	MaxBufferSize int
	// contains filtered or unexported fields
}

EventStreamReader scans an io.Reader looking for EventStream messages.

func (*EventStreamReader) ReadEvent

func (e *EventStreamReader) ReadEvent() ([]byte, error)

ReadEvent scans the EventStream for events.

func (*EventStreamReader) RegisterStream

func (e *EventStreamReader) RegisterStream(eventStream io.Reader)

RegisterStream register buffer scanner for stream of EventStreamReader.

type EventType

type EventType = int
const (
	APIVersionEventType EventType = iota + 1
	BlockAddedEventType
	DeployProcessedEventType
	DeployAcceptedEventType
	DeployExpiredEventType
	EventIDEventType
	FinalitySignatureType
	StepEventType
	FaultEventType
	ShutdownType
)

type FaultEvent added in v1.0.0

type FaultEvent struct {
	Fault FaultPayload `json:"Fault"`
}

type FaultPayload added in v1.0.0

type FaultPayload struct {
	EraID     uint64            `json:"era_id"`
	PublicKey keypair.PublicKey `json:"public_key"`
	Timestamp types.Timestamp   `json:"timestamp"`
}

type FinalitySignatureEvent added in v1.0.0

type FinalitySignatureEvent struct {
	FinalitySignature FinalitySignaturePayload `json:"FinalitySignature"`
}

type FinalitySignaturePayload added in v1.0.0

type FinalitySignaturePayload struct {
	BlockHash key.Hash          `json:"block_hash"`
	EraID     uint64            `json:"era_id"`
	Signature types.HexBytes    `json:"signature"`
	PublicKey keypair.PublicKey `json:"public_key"`
}

type Handler

type Handler interface {
	Handle(context.Context, RawEvent) error
}

type HandlerFunc

type HandlerFunc func(context.Context, RawEvent) error

HandlerFunc is the interface of function that should be implemented in each specific event handler.

type HttpConnection

type HttpConnection struct {
	Headers map[string]string
	URL     string
	// contains filtered or unexported fields
}

HttpConnection is responsible to establish connection with SSE server. Create Request, handle http error and provide a response.

func NewHttpConnection

func NewHttpConnection(httpClient *http.Client, sourceUrl string) *HttpConnection

func (*HttpConnection) Request

func (c *HttpConnection) Request(ctx context.Context, lastEventID int) (*http.Response, error)

type Middleware

type Middleware func(handler HandlerFunc) HandlerFunc

type MiddlewareHandler

type MiddlewareHandler interface {
	Process(handler HandlerFunc) HandlerFunc
}

type RawEvent

type RawEvent struct {
	EventType EventType
	Data      EventData
	EventID   uint64
}

func (*RawEvent) ParseAsAPIVersionEvent

func (d *RawEvent) ParseAsAPIVersionEvent() (APIVersionEvent, error)

func (*RawEvent) ParseAsBlockAddedEvent

func (d *RawEvent) ParseAsBlockAddedEvent() (BlockAddedEvent, error)

func (*RawEvent) ParseAsDeployAcceptedEvent added in v1.0.0

func (d *RawEvent) ParseAsDeployAcceptedEvent() (DeployAcceptedEvent, error)

func (*RawEvent) ParseAsDeployExpiredEvent added in v1.0.0

func (d *RawEvent) ParseAsDeployExpiredEvent() (DeployExpiredEvent, error)

func (*RawEvent) ParseAsDeployProcessedEvent

func (d *RawEvent) ParseAsDeployProcessedEvent() (DeployProcessedEvent, error)

func (*RawEvent) ParseAsFaultEvent added in v1.0.0

func (d *RawEvent) ParseAsFaultEvent() (FaultEvent, error)

func (*RawEvent) ParseAsFinalitySignatureEvent added in v1.0.0

func (d *RawEvent) ParseAsFinalitySignatureEvent() (FinalitySignatureEvent, error)

func (*RawEvent) ParseAsStepEvent added in v1.0.0

func (d *RawEvent) ParseAsStepEvent() (StepEvent, error)

type StepEvent added in v1.0.0

type StepEvent struct {
	Step StepPayload `json:"step"`
}

type StepPayload added in v1.0.0

type StepPayload struct {
	EraID           uint64       `json:"era_id"`
	ExecutionEffect types.Effect `json:"execution_effect"`
	// Todo: not sure, didn't found example to test
	Operations *[]types.Operation `json:"operations,omitempty"`
	// Todo: not sure, didn't found example to test
	Transform *types.TransformKey `json:"transform,omitempty"`
}

type Streamer

type Streamer struct {
	Connection *HttpConnection

	StreamReader *EventStreamReader
	// This duration allows the stream's buffer to stay in fill up completely state, which could indicate
	// that the workers are working too slowly and have not received any messages.
	// If this period elapses without any messages being received, an ErrFullStreamTimeoutError will be thrown.
	BlockedStreamLimit time.Duration
	// contains filtered or unexported fields
}

Streamer is a service that main responsibility is to fill the events' channel. The Connection management is isolated in this service. Service uses a HttpConnection to get HTTP response as a stream resource and provides it to the EventStreamReader, that supposes to parse bytes from the response's body. This design assumes to manage the connection and provide reconnection logic above of this service.

func DefaultStreamer

func DefaultStreamer(url string) *Streamer

DefaultStreamer is a shortcut to fast start with Streamer

func NewStreamer

func NewStreamer(
	client *HttpConnection,
	reader *EventStreamReader,
	blockedStreamLimit time.Duration,
) *Streamer

NewStreamer is the idiomatic way to create Streamer

func (*Streamer) FillStream

func (i *Streamer) FillStream(ctx context.Context, lastEventID int, stream chan<- RawEvent, errorsCh chan<- error) error

func (*Streamer) RegisterEvent

func (i *Streamer) RegisterEvent(eventType EventType)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL