goro

package module
v0.0.0-...-055fb2d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2019 License: MIT Imports: 11 Imported by: 1

README

Goro

GoDoc Build Status Go Report Card Coverage Status

Goro is a Go client library for Event Store.

Godoc

Example:

package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    
    "github.com/vectorhacker/goro"
)

func main() {
    // create a client to use the Event Store
    client := goro.Connect("http://localhost:2113", goro.WithBasicAuth("admin", "changeit"))


    writer := client.Writer("messages")
    reader := client.FowardsReader("messages")
    catchupSubscription := client.CatchupSubscription("messages", 0) // start from 0

    data := []byte("{\"message\": \"hello world\"}")

    // write the event
    ctx := context.Background()
    event := goro.CreateEvent(
        "message",
        data,
        nil, // nil metadata
        0,
    )
    err = writer.Write(ctx, goro.ExpectedVersionAny, event)
    if err != nil {
        panic(err)
    }

    // subscribe to a stream of events
    go func() {
        ctx := context.Background()
        messages := catchupSubscription.Subscribe(ctx)

        for message := range messages {
            fmt.Printf("%s\n", messages.Event.Data)
        }
    }()

    // read events
    events, err := reader.Read(ctx, 0, 1)
    if err != nil {
        panic(err)
    }

    for _, event := range events {
        fmt.Printf("%s\n", event.Data)
    }
}

TODO

  • Tests
  • Competing Consumers
  • Projections
  • Read Events
  • Stream Events
  • Write Events
  • User Management

Documentation

Overview

Package goro is pure Go client library for dealing with Event Store (versions 3.2.0 and later). It includes a high-level API for reading and writing events. Usage examples for the high-level APIs are provided inline with their full documentation.

Index

Constants

View Source
const (
	ActionPark  Action = "park"
	ActionRetry        = "retry"
	ActionSkip         = "skip"
	ActionStop         = "stop"
)

Action enum

View Source
const (
	ExpectedVersionAny   int64 = -2
	ExpectedVersionNone  int64 = -1
	ExpectedVersionEmpty int64 = 0
)

ExpectedVersions

Variables

View Source
var (
	ErrStreamNeverCreated = errors.New("stream never created")
	ErrInvalidContentType = errors.New("invalid content type")
	ErrStreamNotFound     = errors.New("the stream was not found")
	ErrUnauthorized       = errors.New("no access")
	ErrInternalError      = errors.New("internall error has occurred")
)

errors

Functions

This section is empty.

Types

type Acknowledger

type Acknowledger interface {
	Ack() error
	Nack(action Action) error
}

Acknowledger can acknowledge or Not-Acknowledge an Event in a Persistent Subscription

type Action

type Action string

Action represents the action to take when Nacking an Event

type Author

type Author struct {
	Name string `json:"name"`
}

Author represents the Author of an Event

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a connection to an event store

func Connect

func Connect(host string, options ...ClientOption) *Client

Connect creates a new client

func (Client) BackwardsReader

func (c Client) BackwardsReader(stream string) Reader

BackwardsReader creates a new Reader that reads backwards on a stream

func (Client) CatchupSubscription

func (c Client) CatchupSubscription(stream string, start int64) Subscriber

CatchupSubscription creates a new catchup style subscription that starts reading at an event number and continues forwards

func (Client) FowardsReader

func (c Client) FowardsReader(stream string) Reader

FowardsReader creates a new Reader that reads forwards on a stream

func (Client) PersistentSubscription

func (c Client) PersistentSubscription(stream, subscriptionName string, settings PersistentSubscriptionSettings) (Subscriber, error)

PersistentSubscription creates a new competing consumer style subscription with the given settings

func (Client) Sling

func (c Client) Sling() *sling.Sling

Sling creates a new Sling object

func (Client) Writer

func (c Client) Writer(stream string) Writer

Writer creates a new Writer for a stream

type ClientOption

type ClientOption func(*Client)

ClientOption applies options to a client

func WithBasicAuth

func WithBasicAuth(username, password string) ClientOption

WithBasicAuth adds basic authentication to the Event Store

func WithHTTPClient

func WithHTTPClient(httpClient *http.Client) ClientOption

WithHTTPClient sets the http.Client for the Client

type Event

type Event struct {
	At             time.Time       `json:"updated,omitempty"`
	Author         Author          `json:"author,omitempty"`
	Stream         string          `json:"streamId,omitempty"`
	Type           string          `json:"eventType"`
	PositionStream string          `json:"positionStreamId,omitempty"`
	Data           json.RawMessage `json:"data,omitempty"`
	Metadata       json.RawMessage `json:"metadata,omitempty"`
	ID             uuid.UUID       `json:"eventID"`
	Version        int64           `json:"eventNumber"`
	Position       int64           `json:"positionEventNumber,omitempty"`
}

Event represents an Event in Event Store the data and Metadata must be json encoded

func CreateEvent

func CreateEvent(eventType string, data, metadata json.RawMessage, version int64) Event

CreateEvent initializes a new Event with an event type, some data, metadata, and a version you specify. It then creates a random uuid and sets the time it was created at.

type Events

type Events []Event

Events is an array of events that implements the sort.Interface interface.

func (Events) Len

func (e Events) Len() int

Len implements sort.Interface

func (Events) Less

func (e Events) Less(a, b int) bool

Less implements sort.Interface

func (Events) Swap

func (e Events) Swap(a, b int)

Swap implements sort.Interface

type PersistentSubscriptionSettings

type PersistentSubscriptionSettings struct {
	ResolveLinkTos              bool   `json:"resolveLinktos,omitempty"`
	StartFrom                   int64  `json:"startFrom,omitempty"`
	ExtraStatistics             bool   `json:"extraStatistics,omitempty"`
	CheckPointAfterMilliseconds int64  `json:"checkPointAfterMilliseconds,omitempty"`
	LiveBufferSize              int    `json:"liveBufferSize,omitempty"`
	ReadBatchSize               int    `json:"readBatchSize,omitempty"`
	BufferSize                  int    `json:"bufferSize,omitempty"`
	MaxCheckPointCount          int    `json:"maxCheckPointCount,omitempty"`
	MaxRetryCount               int    `json:"maxRetryCount,omitempty"`
	MaxSubscriberCount          int    `json:"maxSubscriberCount,omitempty"`
	MessageTimeoutMilliseconds  int64  `json:"messageTimeoutMilliseconds,omitempty"`
	MinCheckPointCount          int    `json:"minCheckPointCount,omitempty"`
	NamedConsumerStrategy       string `json:"namedConsumerStrategy,omitempty"`
}

PersistentSubscriptionSettings represents the settings for creating and updating a Persistent subscription. You can read more about those settings in the Event Store documentation [here](https://eventstore.org/docs/http-api/4.0.2/competing-consumers/#creating-a-persistent-subscription).

type Reader

type Reader interface {
	Read(ctx context.Context, start int64, count int) (Events, error)
}

Reader reads a couple of Events from a stream

func NewBackwardsReader

func NewBackwardsReader(slinger Slinger, stream string) Reader

NewBackwardsReader creates a Reader that reads events backwards

func NewForwardsReader

func NewForwardsReader(slinger Slinger, stream string) Reader

NewForwardsReader creates a Reader that reads events forwards

type Slinger

type Slinger interface {
	Sling() *sling.Sling
}

Slinger is something that can return a sling object

type SlingerFunc

type SlingerFunc func() *sling.Sling

SlingerFunc is something that can return a sling object

func (SlingerFunc) Sling

func (f SlingerFunc) Sling() *sling.Sling

Sling implements the Slinger interface

type StreamMessage

type StreamMessage struct {
	Event        Event
	Acknowledger Acknowledger
	Error        error
}

StreamMessage contains an Event or an error

func (StreamMessage) Ack

func (m StreamMessage) Ack() error

Ack acknowledges an Event or fails

func (StreamMessage) Nack

func (m StreamMessage) Nack(action Action) error

Nack rejects“ an Event or fails

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context) <-chan StreamMessage
}

Subscriber streams events

func NewCatchupSubscription

func NewCatchupSubscription(slinger Slinger, stream string, startFrom int64) Subscriber

NewCatchupSubscription creates a Subscriber that starts reading a stream from a specific event and then catches up to the head of the stream.

func NewPersistentSubscription

func NewPersistentSubscription(slinger Slinger, stream, subscriptionName string, settings PersistentSubscriptionSettings) (Subscriber, error)

NewPersistentSubscription creates a new subscription that implements the competing consumers pattern

func UpdatePersistentSubscription

func UpdatePersistentSubscription(subscription Subscriber, newSettings PersistentSubscriptionSettings) (Subscriber, error)

UpdatePersistentSubscription updates an existing subscription if it's Persistent

type Writer

type Writer interface {
	Write(ctx context.Context, expectedVersion int64, events ...Event) error
}

Writer writes events to a stream

func NewWriter

func NewWriter(slinger Slinger, stream string) Writer

NewWriter creates a new Writer for a stream

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL