event_streams

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2021 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Package event_streams provides interaction with EventStoreDb event streams. Before accessing streams a grpc connection needs to be established with EventStore through github.com/pivonroll/EventStore-Client-Go/core/connection package.

Index

Examples

Constants

View Source
const (
	// AppendToStream_FailedToObtainWriterErr indicates that client failed to receive a protobuf append client.
	AppendToStream_FailedToObtainWriterErr errors.ErrorCode = "AppendToStream_FailedToObtainWriterErr"
	// AppendToStream_FailedSendHeaderErr indicates that client received an unknown error
	// when it tried to send a header to a protobuf stream.
	// Header is sent before client can append any events to a stream.
	AppendToStream_FailedSendHeaderErr errors.ErrorCode = "AppendToStream_FailedSendHeaderErr"
	// AppendToStream_FailedSendMessageErr indicates that there was an unknown error received when client
	// tried to append an event to a EventStoreDB stream.
	AppendToStream_FailedSendMessageErr errors.ErrorCode = "AppendToStream_FailedSendMessageErr"
	// AppendToStream_FailedToCloseStreamErr indicates that there was an unknown error when client
	// tried to close the protobuf stream after it has written all events to an EventStoreDB stream.
	AppendToStream_FailedToCloseStreamErr errors.ErrorCode = "AppendToStream_FailedToCloseStreamErr"
)
View Source
const (
	// BatchAppendToStream_FailedToObtainWriterErr indicates that client failed to receive a protobuf append client.
	BatchAppendToStream_FailedToObtainWriterErr errors.ErrorCode = "BatchAppendToStream_FailedToObtainWriterErr"
	// BatchAppendToStream_FailedSendMessageErr indicates that there was an unknown error received when client
	// tried to append a chunk of events to a EventStoreDB stream.
	BatchAppendToStream_FailedSendMessageErr errors.ErrorCode = "BatchAppendToStream_FailedSendMessageErr"
	// BatchAppendToStream_FailedToCloseStreamErr indicates that there was an unknown error when client
	// tried to close the protobuf stream after it has written all event chunks to an EventStoreDB stream.
	BatchAppendToStream_FailedToCloseStreamErr errors.ErrorCode = "BatchAppendToStream_FailedToCloseStreamErr"
)
View Source
const (
	FailedToCreateReaderErr                errors.ErrorCode = "FailedToCreateReaderErr"
	FailedToReceiveSubscriptionResponseErr errors.ErrorCode = "FailedToReceiveSubscriptionResponseErr"
)
View Source
const (
	DefaultCheckpointIntervalMultiplier = uint32(1)
)
View Source
const FailedToDeleteStreamErr errors.ErrorCode = "FailedToDeleteStreamErr"

FailedToDeleteStreamErr indicates that client's Client.DeleteStream received an unknown error when it tried to soft-delete an EventStoreDB stream.

View Source
const (
	FailedToObtainStreamReaderErr errors.ErrorCode = "FailedToObtainStreamReaderErr"
)

FailedToObtainStreamReaderErr indicates that client received an unknown error when it tried to construct a protobuf stream reader client.

View Source
const FailedToTombstoneStreamErr errors.ErrorCode = "FailedToTombstoneStreamErr"

FailedToTombstoneStreamErr indicates that client's Client.TombstoneStream received an unknown error when it tried to soft-delete an EventStoreDB stream.

View Source
const ReadCountMax = ^uint64(0)
View Source
const StreamMetadataType = "$metadata"
View Source
const WrongExpectedVersionErr errors.ErrorCode = "WrongExpectedVersionErr"

Variables

This section is empty.

Functions

This section is empty.

Types

type AppendResponse

type AppendResponse struct {
	// contains filtered or unexported fields
}

AppendResponse is returned from Client.AppendToStream when events are written successfully.

func (AppendResponse) GetCurrentRevision

func (this AppendResponse) GetCurrentRevision() uint64

GetCurrentRevision returns stream's current revision if current revision is not NoStream. If currentRevision is NoStream, it returns 0. Note that stream can have a valid revision 0 if it contains only one event. Use IsCurrentRevisionNoStream to check if current revision of a stream is NoStream.

func (AppendResponse) GetPosition

func (this AppendResponse) GetPosition() (Position, bool)

GetPosition returns a position of last appended event in a stream and a boolean value which indicates if position for last written event was received. If no position was received a zero initialized Position and a false will be returned.

func (AppendResponse) IsCurrentRevisionNoStream

func (this AppendResponse) IsCurrentRevisionNoStream() bool

IsCurrentRevisionNoStream returns true if current revision in append response was set to NoStream. Current revision in response can be NoStream if no events are appended to a non-existing stream.

type BatchAppendResponse

type BatchAppendResponse struct {
	// contains filtered or unexported fields
}

BatchAppendResponse is a response returned by EventStoreDB after an entire batch of events (all chunks) were appended to a stream in EventStoreDB.

func (BatchAppendResponse) GetCorrelationId added in v0.11.0

func (response BatchAppendResponse) GetCorrelationId() uuid.UUID

GetCorrelationId returns a correlation id the client has sent along with a chunk of events to append to EventStoreDB stream.

func (BatchAppendResponse) GetCurrentRevision added in v0.11.0

func (response BatchAppendResponse) GetCurrentRevision() uint64

GetCurrentRevision returns a current revision of a stream after events were appended to a stream in EventStoreDB.

func (BatchAppendResponse) GetExpectedStreamPosition added in v0.11.0

func (response BatchAppendResponse) GetExpectedStreamPosition() (uint64, bool)

GetExpectedStreamPosition returns true if response contains finite expected stream position.

func (BatchAppendResponse) GetPosition added in v0.10.0

func (response BatchAppendResponse) GetPosition() (Position, bool)

GetPosition returns a position of the last appended event to a stream along with a boolean which indicates if EventStoreDB has returned a position or if it has not returned a position. If EventStoreDB has not returned a position, GetPosition returns a zero initialized Position and false.

func (BatchAppendResponse) HasExpectedStreamPosition added in v0.11.0

func (response BatchAppendResponse) HasExpectedStreamPosition() bool

HasExpectedStreamPosition returns true is response contains expected stream position.

func (BatchAppendResponse) IsCurrentRevisionNoStream added in v0.11.0

func (response BatchAppendResponse) IsCurrentRevisionNoStream() bool

IsCurrentRevisionNoStream returns true if current revision of a stream returned by EventStore is NoStream.

func (BatchAppendResponse) IsExpectedStreamPositionAny added in v0.11.0

func (response BatchAppendResponse) IsExpectedStreamPositionAny() bool

IsExpectedStreamPositionAny returns true if expected stream position is Any.

func (BatchAppendResponse) IsExpectedStreamPositionNoStream added in v0.11.0

func (response BatchAppendResponse) IsExpectedStreamPositionNoStream() bool

IsExpectedStreamPositionNoStream returns true if expected stream position is NoStream.

func (BatchAppendResponse) IsExpectedStreamPositionStreamExists added in v0.11.0

func (response BatchAppendResponse) IsExpectedStreamPositionStreamExists() bool

IsExpectedStreamPositionStreamExists returns true if expected stream position is StreamExists.

type BatchError added in v0.10.0

type BatchError struct {
	ProtoCode     int32          // an error code returned by EventStoreDB
	Message       string         // informative message of an error returned by EventStoreDB
	Details       []ErrorDetails // various details about an error
	CorrelationId uuid.UUID      // correlation id sent by a client when it tried to append a batch of events to a stream at EventStoreDB
	StreamId      string         // stream to which we wanted to append events to
	// contains filtered or unexported fields
}

BatchError is an error which can occur when chunks of events are appended to a stream in EventStoreDB.

func (BatchError) Code added in v0.10.0

func (b BatchError) Code() errors.ErrorCode

Code returns a code of an error.

func (BatchError) Error added in v0.10.0

func (b BatchError) Error() string

Error returns a string representation of an error.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client which can interact with EventStoreDB streams.

func NewClient added in v0.11.0

func NewClient(grpcClient connection.GrpcClient) *Client

NewClient create a new event streams client

func (*Client) AppendToStream

func (client *Client) AppendToStream(
	ctx context.Context,
	streamID string,
	expectedStreamRevision stream_revision.IsWriteStreamRevision,
	events []ProposedEvent,
) (AppendResponse, errors.Error)

AppendToStream appends a slice of events to a stream.

Events are sent to a stream one by one.

If appending of one event fails EventStoreDb will roll back the whole transaction.

If any error occurs error will be returned with appropriate code set.

Example (WithAnyStreamRevisionWhenStreamExist)

Example of appending an event to an existing stream with WriteStreamRevisionAny.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"

	"github.com/google/uuid"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	firstEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	// Create a stream by appending one event to it
	writeResult, err := client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{firstEvent})
	if err != nil {
		log.Fatalln(err)
	}

	if writeResult.GetCurrentRevision() != 0 {
		log.Fatalln(writeResult.GetCurrentRevision())
	}

	// Append an event to an existing stream
	secondEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}
	writeResult, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionAny{},
		[]event_streams.ProposedEvent{secondEvent})
	if err != nil {
		log.Fatalln(err)
	}

	if writeResult.GetCurrentRevision() != 1 {
		log.Fatalln(writeResult.GetCurrentRevision())
	}
}
Output:

Example (WithAnyWhenStreamDoesNotExist)

Example of appending an event to a stream which does not exist with WriteStreamRevisionAny.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"

	"github.com/google/uuid"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"
	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	writeResult, err := client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionAny{},
		[]event_streams.ProposedEvent{proposedEvent})
	if err != nil {
		log.Fatalln(err)
	}

	if writeResult.GetCurrentRevision() != 0 {
		log.Fatalln(writeResult.GetCurrentRevision())
	}
}
Output:

Example (WithExactStreamRevisionWhenStreamExist)

Example of appending an event to an existing stream with exact expected stream revision.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"

	"github.com/google/uuid"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	firstEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	// Create a stream by appending one event to it
	writeResult, err := client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{firstEvent})
	if err != nil {
		log.Fatalln(err)
	}

	if writeResult.GetCurrentRevision() != 0 {
		log.Fatalln(writeResult.GetCurrentRevision())
	}

	// Append an event to an existing stream
	secondEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}
	writeResult, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevision{Revision: 0}, // 0 because stream has one event, like an index in a slice
		[]event_streams.ProposedEvent{secondEvent})
	if err != nil {
		log.Fatalln(err)
	}

	if writeResult.GetCurrentRevision() != 1 {
		log.Fatalln(writeResult.GetCurrentRevision())
	}
}
Output:

Example (WithNoStreamWhenStreamDoesNotExist)

Example of appending an event to a stream which does not exist with WriteStreamRevisionNoStream.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"

	"github.com/google/uuid"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"
	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	writeResult, err := client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{proposedEvent})
	if err != nil {
		log.Fatalln(err)
	}

	if writeResult.GetCurrentRevision() != 0 {
		log.Fatalln(writeResult.GetCurrentRevision())
	}
}
Output:

func (*Client) BatchAppendToStream added in v0.9.2

func (client *Client) BatchAppendToStream(ctx context.Context,
	streamId string,
	expectedStreamRevision stream_revision.IsWriteStreamRevision,
	events ProposedEventList,
	chunkSize uint64,
	deadline time.Time,
) (BatchAppendResponse, errors.Error)

BatchAppendToStream appends events to a stream in chunks.

Correlation ID for events will be auto generated.

If batch append of one chunk fails EventStoreDb will roll back the whole transaction.

If any error occurs error will be returned with appropriate code set.

Example (StreamDoesNotExist)

Example demonstrates how to do a batch append to a stream which does not exist. Correlation id for events will auto be generated.

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)
	streamId := "some_stream"

	firstEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	deadline := time.Now().Add(time.Second * 10)
	// batch append to a stream which does not exist
	writeResult, err := client.BatchAppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{firstEvent},
		1,
		deadline)

	if writeResult.IsCurrentRevisionNoStream() {
		log.Fatalln("IsCurrentRevisionNoStream should return false")
	}

	if writeResult.GetCurrentRevision() != 0 {
		log.Fatalln("Current revision should be 0")
	}
}
Output:

Example (StreamExists)

Example demonstrates how to do a batch append to a stream which does exist. Correlation id for events will auto be generated.

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)
	streamId := "some_stream"

	firstEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	// create a stream by appending one event to it
	writeResult, err := client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{firstEvent})
	if err != nil {
		log.Fatalln(err)
	}

	if writeResult.GetCurrentRevision() != 0 {
		log.Fatalln("Current revision must be 0")
	}

	secondEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	deadline := time.Now().Add(time.Second * 10)
	// batch append to a stream which exists with expected revision
	batchWriteResult, err := client.BatchAppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevision{Revision: 0}, // there is already one event in the stream
		[]event_streams.ProposedEvent{secondEvent},
		1,
		deadline)
	if err != nil {
		log.Fatalln(err)
	}

	if batchWriteResult.IsCurrentRevisionNoStream() {
		log.Fatalln("IsCurrentRevisionNoStream should return false")
	}

	if batchWriteResult.GetCurrentRevision() != 0 {
		log.Fatalln("Current revision should be 0")
	}
}
Output:

func (*Client) BatchAppendToStreamWithCorrelationId added in v0.11.0

func (client *Client) BatchAppendToStreamWithCorrelationId(ctx context.Context,
	streamId string,
	expectedStreamRevision stream_revision.IsWriteStreamRevision,
	correlationId uuid.UUID,
	events ProposedEventList,
	chunkSize uint64,
	deadline time.Time,
) (BatchAppendResponse, errors.Error)

BatchAppendToStreamWithCorrelationId appends events to a stream in chunks.

CorrelationId for events must be provided.

If batch append of one chunk fails EventStoreDb will roll back the whole transaction.

If any error occurs error will be returned with appropriate code set.

Example

Example demonstrates how to do a batch append to a stream with correlation id.

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)
	streamId := "some_stream"

	firstEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	deadline := time.Now().Add(time.Second * 10)
	correlationId := uuid.New()
	// batch append to a stream with correlation id
	writeResult, err := client.BatchAppendToStreamWithCorrelationId(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		correlationId,
		[]event_streams.ProposedEvent{firstEvent},
		1,
		deadline)

	if writeResult.IsCurrentRevisionNoStream() {
		log.Fatalln("IsCurrentRevisionNoStream should return false")
	}

	if writeResult.GetCurrentRevision() != 0 {
		log.Fatalln("Current revision should be 0")
	}
}
Output:

func (*Client) DeleteStream

func (client *Client) DeleteStream(
	ctx context.Context,
	streamID string,
	revision stream_revision.IsWriteStreamRevision,
) (DeleteResponse, errors.Error)

DeleteStream performs a soft delete on a stream.

Appending events to soft-deleted stream with WriteStreamRevisionStreamExists will fail with error errors.StreamDeleted.

Soft-deleted stream is a stream to which events can be appended using for example WriteStreamRevisionNoStream and WriteStreamRevisionAny.

The only events which can be read from a soft-deleted stream are only the ones which were written after a soft-delete. Any events written previous to soft-delete are out of reach.

Example (StreamDoesNotExist)

Example of soft-deleting a stream which does not exist.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	// delete a non-existing stream
	deleteResult, err := client.DeleteStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{})
	deletePosition, isPosition := deleteResult.GetPosition()

	// result of a soft-delete must be a position
	if !isPosition {
		log.Fatalln("Must be a position")
	}

	// position returned by soft-delete must not be zero
	if deletePosition.CommitPosition == 0 || deletePosition.PreparePosition == 0 {
		log.Fatalln("Commit and Prepare position must not be zero")
	}
}
Output:

Example (StreamExists)

Example of sot-deleting a stream which exists.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"
	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	// create a stream with one event
	writeResult, err := client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{proposedEvent})
	if err != nil {
		log.Fatalln(err)
	}

	// soft-delete a stream
	deleteResult, err := client.DeleteStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevision{Revision: writeResult.GetCurrentRevision()})

	deletePosition, isPosition := deleteResult.GetPosition()

	if !isPosition {
		log.Fatalln("Must be a position")
	}

	writePosition, _ := writeResult.GetPosition()

	// position returned by soft-delete must be greater than the one of the last event
	if !deletePosition.GreaterThan(writePosition) {
		log.Fatalln("Delete position must be greater than last event's write position")
	}
}
Output:

func (*Client) GetStreamMetadata

func (client *Client) GetStreamMetadata(
	ctx context.Context,
	streamId string) (StreamMetadataResult, errors.Error)

GetStreamMetadata reads stream's latest metadata.

Example (IsEmptyIfStreamHasNoMetadata)

Example of reading metadata for a stream which has no metadata set.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	// create a stream with one event
	_, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{proposedEvent})
	if err != nil {
		log.Fatalln(err)
	}

	// read stream's metadata
	metaDataResponse, err := client.GetStreamMetadata(context.Background(), streamId)
	if err != nil {
		log.Fatalln(err)
	}

	// Stream's metadata stream must not contains any metadata
	if !metaDataResponse.IsEmpty() {
		log.Fatalln("Stream's must have no metadata")
	}
}
Output:

Example (StreamHasMetadata)

Example of reading metadata for a stream which has metadata.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/ptr"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	expectedStreamMetadata := event_streams.StreamMetadata{
		MaxCount:              ptr.Int(17),
		TruncateBefore:        ptr.UInt64(10),
		CacheControlInSeconds: ptr.UInt64(17),
		MaxAgeInSeconds:       ptr.UInt64(15),
	}

	// write metadata for a stream
	_, err = client.SetStreamMetadata(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		expectedStreamMetadata,
	)
	if err != nil {
		log.Fatalln(err)
	}

	// read metadata for a stream
	metaData, err := client.GetStreamMetadata(context.Background(), streamId)
	if err != nil {
		log.Fatalln(err)
	}

	if metaData.IsEmpty() {
		log.Fatalln("Stream must have metadata")
	}

	if metaData.GetMetaStreamRevision() != 0 {
		log.Fatalln("Metadata must be at index 0 in stream's metadata stream")
	}

	if !reflect.DeepEqual(expectedStreamMetadata, metaData.GetStreamMetadata()) {
		log.Fatalln("Metadata received must be the same as the metadata written")
	}
}
Output:

func (*Client) GetStreamReader

func (client *Client) GetStreamReader(
	ctx context.Context,
	streamID string,
	direction ReadDirection,
	revision stream_revision.IsReadStreamRevision,
	count uint64,
	resolveLinks bool,
) (StreamReader, errors.Error)

GetStreamReader returns a stream reader for a stream which will read events from a given revision towards a given direction.

For example, you can read events from the end towards the start of a stream by setting revision to ReadStreamRevisionEnd and direction to ReadDirectionBackward.

Use count to specify how many events you want to be able to read through a reader. Maximum number of events to read is ReadCountMax.

func (*Client) GetStreamReaderForStreamAll added in v0.11.0

func (client *Client) GetStreamReaderForStreamAll(
	ctx context.Context,
	direction ReadDirection,
	position stream_revision.IsReadPositionAll,
	count uint64,
	resolveLinks bool,
) (StreamReader, errors.Error)

GetStreamReaderForStreamAll returns a reader for a stream $all which will read events from a given position towards a given direction.

For example, you can read events from the end towards the start of a stream $all by setting revision to ReadPositionAllEnd and direction to ReadDirectionBackward.

Use count to specify how many events you want to be able to read through a reader. Maximum number of events to read is ReadCountMax.

func (*Client) ReadEventsFromStreamAll added in v0.11.0

func (client *Client) ReadEventsFromStreamAll(
	ctx context.Context,
	direction ReadDirection,
	position stream_revision.IsReadPositionAll,
	count uint64,
	resolveLinks bool,
) (ResolvedEventList, errors.Error)

ReadEventsFromStreamAll reads events from stream $all.

Read is performed by starting from a position and reading all events towards a given direction.

For example, you can read events from the end towards the start of a stream $all by setting revision to ReadPositionAllEnd and direction to ReadDirectionBackward.

Use count to specify how many events you want to read. Maximum number of events read is ReadCountMax.

Example (ReadEventsBackwardsFromEnd)

Example of reading events backwards from the end of a stream $all. We will append some user defined events to demonstrate that user events will be read from the end of stream $all in reversed order.

That does not guarantee that system events are not going to be appended after user events if some system operation is triggered.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	// create 10 events to write (EventId must be unique)
	eventsToWrite := make(event_streams.ProposedEventList, 10)
	for i := uint32(0); i < 10; i++ {
		eventsToWrite[i] = event_streams.ProposedEvent{
			EventId:      uuid.Must(uuid.NewRandom()),
			EventType:    "TestEvent",
			ContentType:  "application/octet-stream",
			UserMetadata: []byte{},
			Data:         []byte("some event data"),
		}
	}

	// create a stream with 10 events
	_, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		eventsToWrite)
	if err != nil {
		log.Fatalln(err)
	}

	count := uint64(len(eventsToWrite))

	readEvents, err := client.ReadEventsFromStreamAll(context.Background(),
		event_streams.ReadDirectionForward,
		stream_revision.ReadPositionAllStart{},
		count,
		false)
	if err != nil {
		log.Fatalln(err)
	}

	// Number of events read must equal to count
	if uint64(len(readEvents)) != count {
		log.Fatalln(`Number of events read from stream $all must 
					be greater than number of user defined events`)
	}

	// since events are read backwards from the end they are received in reversed order
	if !reflect.DeepEqual(eventsToWrite, readEvents.Reverse().ToProposedEvents()) {
		log.Fatalln("Events read from the end must match user defined events")
	}
}
Output:

Example (ReadEventsFromStart)

Example of reading events from start of stream $all. At the beginning of stream $all we can find system events.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	// read events from stream $all
	// at the beginning of stream $all are some system events
	// as admin user we can access them too
	readEvents, err := client.ReadEventsFromStreamAll(context.Background(),
		event_streams.ReadDirectionForward,
		stream_revision.ReadPositionAllStart{},
		5,
		false)
	if err != nil {
		log.Fatalln(err)
	}

	if len(readEvents) < 5 {
		log.Fatalln("Not enough system events read from stream $all")
	}
}
Output:

func (*Client) ReadStreamEvents

func (client *Client) ReadStreamEvents(
	ctx context.Context,
	streamID string,
	direction ReadDirection,
	revision stream_revision.IsReadStreamRevision,
	count uint64,
	resolveLinks bool,
) (ResolvedEventList, errors.Error)

ReadStreamEvents reads events from a given stream.

Read is performed by starting from a revision and reading all events towards a given direction.

For example, you can read events from the end towards the start of a stream by setting revision to ReadStreamRevisionEnd and direction to ReadDirectionBackward.

Use count to specify how many events you want to read. Maximum number of events read is ReadCountMax.

Example (ReadEventsBackwardsFromEnd)

Example of reading events backwards from the end of a stream.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	// create 10 events to write (EventId must be unique)
	eventsToWrite := make(event_streams.ProposedEventList, 10)
	for i := uint32(0); i < 10; i++ {
		eventsToWrite[i] = event_streams.ProposedEvent{
			EventId:      uuid.Must(uuid.NewRandom()),
			EventType:    "TestEvent",
			ContentType:  "application/octet-stream",
			UserMetadata: []byte{},
			Data:         []byte("some event data"),
		}
	}

	// create a stream with 10 events
	_, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		eventsToWrite)
	if err != nil {
		log.Fatalln(err)
	}

	// read events from existing stream
	readEvents, err := client.ReadStreamEvents(context.Background(),
		streamId,
		event_streams.ReadDirectionBackward,
		stream_revision.ReadStreamRevisionEnd{},
		10, // set to be bigger than current number of events in a stream
		false)
	if err != nil {
		log.Fatalln(err)
	}

	// Event read must be in reversed order
	// since readEvents are of type ResolvedEvent we must convert them to slice of ProposedEvents
	if !reflect.DeepEqual(eventsToWrite, readEvents.Reverse().ToProposedEvents()) {
		log.Fatalln("Events read from a stream must match")
	}
}
Output:

Example (ReadEventsFromStart)

Example of reading events from the start of a stream.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"
	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	eventsToWrite := []event_streams.ProposedEvent{proposedEvent}

	// create a stream with one event
	_, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		eventsToWrite)
	if err != nil {
		log.Fatalln(err)
	}

	// read events from existing stream
	readEvents, err := client.ReadStreamEvents(context.Background(),
		streamId,
		event_streams.ReadDirectionForward,
		stream_revision.ReadStreamRevisionStart{},
		10, // set to be bigger than current number of events in a stream
		false)
	if err != nil {
		log.Fatalln(err)
	}

	// since readEvents are of type ResolvedEvent we must convert them to slice of ProposedEvents
	if !reflect.DeepEqual(eventsToWrite, readEvents.ToProposedEvents()) {
		log.Fatalln("Events read from a stream must match")
	}
}
Output:

Example (StreamDoesNotExist)

Example of trying to read events from a stream which does not exist.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/errors"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, stdErr := connection.ParseConnectionString(clientURI)
	if stdErr != nil {
		log.Fatalln(stdErr)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	_, err := client.ReadStreamEvents(context.Background(),
		streamId,
		event_streams.ReadDirectionBackward,
		stream_revision.ReadStreamRevisionEnd{},
		1,
		false)

	if err.Code() != errors.StreamNotFoundErr {
		log.Fatalln("Stream must not exist")
	}
}
Output:

Example (StreamIsSoftDeleted)

Example of trying to read events from a stream which is soft-deleted.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/errors"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, stdErr := connection.ParseConnectionString(clientURI)
	if stdErr != nil {
		log.Fatalln(stdErr)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	// create a stream with one event
	writeResult, err := client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{proposedEvent})
	if err != nil {
		log.Fatalln(err)
	}

	// soft-delete a stream
	_, err = client.DeleteStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevision{Revision: writeResult.GetCurrentRevision()})
	if err != nil {
		log.Fatalln(err)
	}

	// reading a soft-deleted stream fails with error code StreamNotFoundErr
	_, err = client.ReadStreamEvents(context.Background(),
		streamId,
		event_streams.ReadDirectionBackward,
		stream_revision.ReadStreamRevisionEnd{},
		event_streams.ReadCountMax,
		false)

	if err.Code() != errors.StreamNotFoundErr {
		log.Fatalln("Stream must not exist")
	}
}
Output:

func (*Client) SetStreamMetadata

func (client *Client) SetStreamMetadata(
	ctx context.Context,
	streamID string,
	expectedStreamRevision stream_revision.IsWriteStreamRevision,
	metadata StreamMetadata) (AppendResponse, errors.Error)

SetStreamMetadata writes stream's metadata. Streams metadata are a series of events, each event represented by StreamMetadata.

Stream's metadata are kept in a separate stream which begins with a prefix $$.

For example: for stream my_card, it's metadata stream will be $my_card.

Example (OnNonExistingStream)

Example of setting metadata for a stream which does not exist.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/ptr"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	expectedStreamMetadata := event_streams.StreamMetadata{
		MaxCount:              ptr.Int(17),
		TruncateBefore:        ptr.UInt64(10),
		CacheControlInSeconds: ptr.UInt64(17),
		MaxAgeInSeconds:       ptr.UInt64(15),
	}

	// write metadata for a stream
	_, err = client.SetStreamMetadata(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		expectedStreamMetadata,
	)
	if err != nil {
		log.Fatalln(err)
	}

	// read metadata for a stream
	metaData, err := client.GetStreamMetadata(context.Background(), streamId)
	if err != nil {
		log.Fatalln(err)
	}

	if metaData.IsEmpty() {
		log.Fatalln("Stream must have metadata")
	}

	if metaData.GetMetaStreamRevision() != 0 {
		log.Fatalln("Metadata must be at index 0 in stream's metadata stream")
	}

	if !reflect.DeepEqual(expectedStreamMetadata, metaData.GetStreamMetadata()) {
		log.Fatalln("Metadata received must be the same as the metadata written")
	}
}
Output:

Example (WhenStreamExists)

Example of setting metadata for an existing stream.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/ptr"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	// create a stream with one event
	_, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{proposedEvent})
	if err != nil {
		log.Fatalln(err)
	}

	expectedStreamMetadata := event_streams.StreamMetadata{
		MaxCount:              ptr.Int(17),
		TruncateBefore:        ptr.UInt64(10),
		CacheControlInSeconds: ptr.UInt64(17),
		MaxAgeInSeconds:       ptr.UInt64(15),
	}

	// write metadata for a stream
	_, err = client.SetStreamMetadata(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		expectedStreamMetadata,
	)
	if err != nil {
		log.Fatalln(err)
	}

	// read stream's metadata
	metaData, err := client.GetStreamMetadata(context.Background(), streamId)
	if err != nil {
		log.Fatalln(err)
	}

	if metaData.IsEmpty() {
		log.Fatalln("Stream must have metadata")
	}

	if metaData.GetMetaStreamRevision() != 0 {
		log.Fatalln("Metadata must be at index 0 in stream's metadata stream")
	}

	if !reflect.DeepEqual(expectedStreamMetadata, metaData.GetStreamMetadata()) {
		log.Fatalln("Metadata received must be the same as the metadata written")
	}
}
Output:

func (*Client) SubscribeToFilteredStreamAll added in v0.11.0

func (client *Client) SubscribeToFilteredStreamAll(
	ctx context.Context,
	position stream_revision.IsReadPositionAll,
	resolveLinks bool,
	filter Filter,
) (StreamReader, errors.Error)

SubscribeToFilteredStreamAll subscribes to stream $all using a filter and receives content from it.

Filter is used to filter by event's type or by a stream ID. Both can be filtered using a set of prefixes or by a regex.

Revision indicates from which point in a stream we want to receive content. Content can be received from the beginning of a stream, the end of a stream of from other specific point.

If we opt to receive content from start of the stream we will receive all content for the stream, eventually, unless we cancel our subscription.

If we opt to receive content from the end of a stream then we will receive only content written to a stream after our subscription was created.

If we set a specific point from which we want to start to receive content of the stream, then we will start to receive content only when that point (index) is reached.

If you only want to receive new content from stream $all, set revision to ReadPositionAllEnd.

Example

Example demonstrates how to subscribe to stream $all with filter.

We create three streams and write events to them. Subscription to stream $all with a filter which will filter only content from two of the three streams. Content is filtered by prefix of the stream's ID.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"
	"sync"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/core/systemmetadata"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, stdErr := connection.ParseConnectionString(clientURI)
	if stdErr != nil {
		log.Fatalln(stdErr)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	prefix1 := "my_first_prefix"
	prefix2 := "my_second_prefix"

	otherStream := "read_all_existing_and_new_ones_otherStream"
	prefixStream := prefix1 + "_stream"
	newPrefixStream := prefix2 + "_stream"

	createEvents := func(count uint32) event_streams.ProposedEventList {
		result := make(event_streams.ProposedEventList, 10)
		for i := uint32(0); i < count; i++ {
			result[i] = event_streams.ProposedEvent{
				EventId:      uuid.Must(uuid.NewRandom()),
				EventType:    "TestEvent",
				ContentType:  "application/octet-stream",
				UserMetadata: []byte{},
				Data:         []byte("some event data"),
			}
		}
		return result
	}
	otherStreamEvents := createEvents(10)
	prefixStreamEvents := createEvents(10)
	newPrefixStreamEvents := createEvents(10)

	// create other stream with 10 events
	_, err := client.AppendToStream(context.Background(),
		otherStream,
		stream_revision.WriteStreamRevisionNoStream{},
		otherStreamEvents)
	if err != nil {
		log.Fatalln(err)
	}

	// create first stream which content we will read
	_, err = client.AppendToStream(context.Background(),
		prefixStream,
		stream_revision.WriteStreamRevisionNoStream{},
		prefixStreamEvents)
	if err != nil {
		log.Fatalln(err)
	}

	// subscribe to stream $all and filter only events written to
	// streams with prefix my_first_prefix and my_second_prefix
	streamReader, err := client.SubscribeToFilteredStreamAll(context.Background(),
		stream_revision.ReadPositionAllStart{},
		false,
		event_streams.Filter{
			FilterBy: event_streams.FilterByStreamId{
				Matcher: event_streams.PrefixFilterMatcher{
					PrefixList: []string{prefix1, prefix2},
				},
			},
			Window:                       event_streams.FilterNoWindow{},
			CheckpointIntervalMultiplier: 5,
		})
	if err != nil {
		log.Fatalln(err)
	}

	waitForReadingFirstEvents := sync.WaitGroup{}
	waitForReadingFirstEvents.Add(1)

	// read events written to a stream with prefix my_first_prefix
	go func() {
		defer waitForReadingFirstEvents.Done()

		var result event_streams.ProposedEventList
		readResult, err := streamReader.ReadOne()
		if err != nil {
			log.Fatalln(err)
		}

		if event, isEvent := readResult.GetEvent(); isEvent {
			result = append(result, event.ToProposedEvent())
		}

		if reflect.DeepEqual(prefixStreamEvents, result) {
			return
		}
	}()

	waitForNewEventsAppend := sync.WaitGroup{}
	waitForNewEventsAppend.Add(1)

	// after events from stream with prefix my_first_prefix are read
	// create stream with prefix my_second_prefix
	go func() {
		defer waitForNewEventsAppend.Done()
		waitForReadingFirstEvents.Wait() // wait until all events from stream with prefix my_first_prefix are read

		// create stream with prefix my_second_prefix with 10 events in it
		_, err = client.AppendToStream(context.Background(),
			newPrefixStream,
			stream_revision.WriteStreamRevisionNoStream{},
			newPrefixStreamEvents)
		if err != nil {
			log.Fatalln(err)
		}
	}()

	waitForReadingNewEvents := sync.WaitGroup{}
	waitForReadingNewEvents.Add(1)

	// read events written to a stream with prefix my_second_prefix
	go func() {
		defer waitForReadingNewEvents.Done()
		waitForNewEventsAppend.Wait() // wait until stream my_second_prefix created
		var result event_streams.ProposedEventList

		readResult, err := streamReader.ReadOne()
		if err != nil {
			log.Fatalln(err)
		}

		if event, isEvent := readResult.GetEvent(); isEvent {
			if !systemmetadata.IsSystemStream(event.Event.StreamId) {
				result = append(result, event.ToProposedEvent())
			}
		}

		// we have finished reading
		if reflect.DeepEqual(newPrefixStreamEvents, result) {
			return
		}
	}()
	// wait for reader to receive new events
	waitForReadingNewEvents.Wait()
}
Output:

func (*Client) SubscribeToStream

func (client *Client) SubscribeToStream(
	ctx context.Context,
	streamID string,
	revision stream_revision.IsReadStreamRevision,
	resolveLinks bool,
) (StreamReader, errors.Error)

SubscribeToStream subscribes to a stream in a form of a live subscription, starting from a given revision.

Revision indicates from which point in a stream we want to receive content. Content can be received from the beginning of a stream, the end of a stream of from other specific revision.

If we opt to receive content from start of the stream we will receive all content for the stream, eventually, unless we cancel our subscription.

If we opt to receive content from the end of a stream then we will receive only content written to a stream after our subscription was created.

If we set a specific point from which we want to start to receive content of the stream, then we will start to receive content only when that point (index) is reached. For example: If we subscribe from revision 5 and stream currently contains only one event, then our subscription will receive content, only after 4 events have been written to a stream. That means that our subscription will receive the 6th event written to a stream and all content written to a stream after it.

If you only want to receive new content, set revision to ReadStreamRevisionEnd.

Example (CatchesDeletion)

Example demonstrates that subscription will receive StreamDeleted error once stream has been deleted.

package main

import (
	"context"
	"fmt"
	"log"
	"sync"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/errors"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	wg := sync.WaitGroup{}
	wg.Add(1)

	ctx := context.Background()

	// create a subscription to a stream, from start of the stream
	streamReader, err := client.SubscribeToStream(ctx,
		streamId,
		stream_revision.ReadStreamRevisionStart{},
		false)
	if err != nil {
		log.Fatalln(err)
	}

	go func() {
		defer wg.Done()
		_, err := streamReader.ReadOne() // reads content of a stream until StreamDeletedErr is received

		if err.Code() != errors.StreamDeletedErr {
			log.Fatalln("Unexpected error received")
		}
	}()

	_, err = client.TombstoneStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{})
	if err != nil {
		log.Fatalln(err)
	}

	// wait for reader to receive StreamDeleted
	wg.Wait()
}
Output:

Example (ReadOldAndNewContentFromStream)

Example shows that subscription from start of the stream will receive all old events written to it, as well as new events written to a stream after a subscription was created.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"
	"sync"
	"time"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/errors"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, stdErr := connection.ParseConnectionString(clientURI)
	if stdErr != nil {
		log.Fatalln(stdErr)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	readerWait := sync.WaitGroup{}
	readerWait.Add(1)

	cancelWait := sync.WaitGroup{}
	cancelWait.Add(1)

	ctx := context.Background()
	ctx, cancelFunc := context.WithTimeout(ctx, 20*time.Second)

	createEvents := func(eventCount uint32) event_streams.ProposedEventList {
		result := make(event_streams.ProposedEventList, eventCount)

		for i := uint32(0); i < eventCount; i++ {
			result = append(result, event_streams.ProposedEvent{
				EventId:      uuid.Must(uuid.NewRandom()),
				EventType:    "TestEvent",
				ContentType:  "application/octet-stream",
				UserMetadata: []byte{},
				Data:         []byte("some event data"),
			})
		}
		return result
	}
	beforeEvents := createEvents(3)
	afterEvents := createEvents(2)

	totalEvents := append(beforeEvents, afterEvents...)

	// create a stream with 3 events in it
	_, err := client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		beforeEvents)
	if err != nil {
		log.Fatalln(err)
	}

	// create a stream subscription from start of the stream
	streamReader, err := client.SubscribeToStream(ctx,
		streamId,
		stream_revision.ReadStreamRevisionStart{},
		false)
	if err != nil {
		log.Fatalln(err)
	}

	go func() {
		defer readerWait.Done()

		var result event_streams.ProposedEventList

		for {
			response, err := streamReader.ReadOne() // read event one by-one
			if err != nil {
				if err.Code() == errors.CanceledErr { // we have received cancellation of a subscription
					break
				}
				cancelWait.Done() // must never be reached, some other error occur
				log.Fatalln("Unexpected error received")
			}

			event, isEvent := response.GetEvent()

			if !isEvent {
				log.Fatalln("Must have read an event")
			}
			result = append(result, event.ToProposedEvent())
			if len(result) == len(totalEvents) {
				cancelWait.Done() // we have read all events, signal to main thread that it can cancel subscription
			}
		}

		if !reflect.DeepEqual(totalEvents, result) {
			log.Fatalln("Not all events have been read from the stream")
		}
	}()

	_, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionAny{},
		afterEvents)

	cancelWait.Wait() // wait until subscription receives all events
	cancelFunc()      // cancel subscription

	readerWait.Wait() // wait for subscription go routine to exit
}
Output:

Example (StreamDoesNotExist)

Example show how to subscribe to a stream which does not exist and wait for an event from it. We can subscribe to a non-existing stream. ReadOne method of StreamReader will block until stream with content is created.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"
	"sync"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	wg := sync.WaitGroup{}
	wg.Add(1)

	// subscribe to a stream
	// we can subscribe to a stream which does not exist
	// as soon as stream is created we will start to receive content from it.
	streamReader, err := client.SubscribeToStream(context.Background(),
		streamId,
		stream_revision.ReadStreamRevisionStart{},
		false)
	if err != nil {
		log.Fatalln(err)
	}

	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	// we will wait for events in a separate go routine
	go func() {
		defer wg.Done()

		response, err := streamReader.ReadOne() // read blocks until event is written to a stream
		if err != nil {
			log.Fatalln(err)
		}

		event, isEvent := response.GetEvent()
		if !isEvent {
			log.Fatalln("Must have received an event")
		}

		if !reflect.DeepEqual(proposedEvent, event.ToProposedEvent()) {
			log.Fatalln("Must receive an event we have written to a stream")
		}
	}()

	// create a stream with one event written to it
	_, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		event_streams.ProposedEventList{proposedEvent})

	wg.Wait()
}
Output:

Example (StreamExists)

Example shows how to subscribe to an existing stream from start.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"
	"sync"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamId := "some_stream"

	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}
	_, err = client.AppendToStream(context.Background(),
		streamId,
		stream_revision.WriteStreamRevisionNoStream{},
		event_streams.ProposedEventList{proposedEvent})
	if err != nil {
		log.Fatalln(err)
	}

	// subscribe to a stream fro start
	streamReader, err := client.SubscribeToStream(context.Background(),
		streamId,
		stream_revision.ReadStreamRevisionStart{},
		false)
	if err != nil {
		log.Fatalln(err)
	}

	wg := sync.WaitGroup{}
	wg.Add(1)
	// we will wait for an event in a separate go routine
	go func() {
		defer wg.Done()

		response, err := streamReader.ReadOne() // read an event written to a stream
		if err != nil {
			log.Fatalln(err)
		}

		event, isEvent := response.GetEvent()
		if !isEvent {
			log.Fatalln("Must have received an event")
		}

		if !reflect.DeepEqual(proposedEvent, event.ToProposedEvent()) {
			log.Fatalln("Must receive an event we have written to a stream")
		}
	}()

	wg.Wait()
}
Output:

func (*Client) SubscribeToStreamAll added in v0.11.0

func (client *Client) SubscribeToStreamAll(
	ctx context.Context,
	position stream_revision.IsReadPositionAll,
	resolveLinks bool,
) (StreamReader, errors.Error)

SubscribeToStreamAll subscribes to stream $all and receives content from it. Content is not filtered.

Revision indicates from which point in a stream we want to receive content. Content can be received from the beginning of a stream, the end of a stream of from other specific position.

If we opt to receive content from start of the stream we will receive all content for the stream, eventually, unless we cancel our subscription.

If we opt to receive content from the end of a stream then we will receive only content written to a stream after our subscription was created.

If we set a specific point from which we want to start to receive content of the stream, then we will start to receive content only when that point (index) is reached.

If you only want to receive new content, set revision to ReadPositionAllEnd.

Example

Example demonstrates how to subscribe to stream $all without a filter.

We create two streams and write events to them. Subscription to stream $all must catch all events written to those two streams.

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"
	"sync"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/core/systemmetadata"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, stdErr := connection.ParseConnectionString(clientURI)
	if stdErr != nil {
		log.Fatalln(stdErr)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	createEvents := func(count uint32) event_streams.ProposedEventList {
		result := make(event_streams.ProposedEventList, 10)
		for i := uint32(0); i < count; i++ {
			result[i] = event_streams.ProposedEvent{
				EventId:      uuid.Must(uuid.NewRandom()),
				EventType:    "TestEvent",
				ContentType:  "application/octet-stream",
				UserMetadata: []byte{},
				Data:         []byte("some event data"),
			}
		}
		return result
	}

	wg := sync.WaitGroup{}
	wg.Add(1)

	firstStream := "firstStream"
	secondStream := "secondStream"

	beforeEvents := createEvents(10)
	afterEvents := createEvents(10)

	allUserEvents := append(beforeEvents, afterEvents...)

	// create a stream with some events in it
	_, err := client.AppendToStream(context.Background(),
		firstStream,
		stream_revision.WriteStreamRevisionNoStream{},
		beforeEvents)
	if err != nil {
		log.Fatalln(err)
	}

	// create a subscription to a stream
	streamReader, err := client.SubscribeToStreamAll(context.Background(),
		stream_revision.ReadPositionAllStart{},
		false)
	if err != nil {
		log.Fatalln(err)
	}

	go func() {
		defer wg.Done()

		var resultsRead event_streams.ProposedEventList
		readResult, err := streamReader.ReadOne()
		if err != nil {
			log.Fatalln(err)
		}

		if event, isEvent := readResult.GetEvent(); isEvent {
			if !systemmetadata.IsSystemStream(event.Event.StreamId) {
				resultsRead = append(resultsRead, event.ToProposedEvent())
			}
		}

		// if we have read all user defined event stop listening for events and return from go routine
		if reflect.DeepEqual(allUserEvents, resultsRead) {
			streamReader.Close()
			return
		}
	}()

	// append some events to a stream after a listening go routine has started
	_, err = client.AppendToStream(context.Background(),
		secondStream,
		stream_revision.WriteStreamRevisionNoStream{},
		afterEvents)
	if err != nil {
		log.Fatalln(err)
	}

	// wait for subscription to receive all events
	wg.Wait()
}
Output:

func (*Client) TombstoneStream

func (client *Client) TombstoneStream(
	ctx context.Context,
	streamID string,
	revision stream_revision.IsWriteStreamRevision,
) (TombstoneResponse, errors.Error)

TombstoneStream performs a hard-delete on a stream.

After performing a hard-delete events cannot be written or read from a stream.

Example (StreamDoesNotExist)

Example of trying to put a tombstone on a stream which does not exist.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamName := "some_stream"

	// tombstone a non-existing stream
	tombstoneResult, err := client.TombstoneStream(context.Background(),
		streamName,
		stream_revision.WriteStreamRevisionNoStream{})
	tombstonePosition, isPosition := tombstoneResult.GetPosition()

	// result of a hard-delete must be a position
	if !isPosition {
		log.Fatalln("Must be a position")
	}

	// position returned by hard-delete must not be zero
	if tombstonePosition.CommitPosition == 0 || tombstonePosition.PreparePosition == 0 {
		log.Fatalln("Commit and Prepare position must not be zero")
	}
}
Output:

Example (StreamExists)

Example of putting a tombstone on an existing stream.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/google/uuid"
	"github.com/pivonroll/EventStore-Client-Go/core/connection"
	"github.com/pivonroll/EventStore-Client-Go/core/stream_revision"
	"github.com/pivonroll/EventStore-Client-Go/event_streams"
)

func main() {
	username := "admin"
	password := "changeit"
	eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113
	clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint)
	config, err := connection.ParseConnectionString(clientURI)
	if err != nil {
		log.Fatalln(err)
	}
	grpcClient := connection.NewGrpcClient(*config)
	client := event_streams.NewClient(grpcClient)

	streamName := "some_stream"
	proposedEvent := event_streams.ProposedEvent{
		EventId:      uuid.Must(uuid.NewRandom()),
		EventType:    "TestEvent",
		ContentType:  "application/octet-stream",
		UserMetadata: []byte{},
		Data:         []byte("some event data"),
	}

	// create a stream with one event
	writeResult, err := client.AppendToStream(context.Background(),
		streamName,
		stream_revision.WriteStreamRevisionNoStream{},
		[]event_streams.ProposedEvent{proposedEvent})
	if err != nil {
		log.Fatalln(err)
	}

	// put tombstone on a stream
	tombstoneResult, err := client.TombstoneStream(context.Background(),
		streamName,
		stream_revision.WriteStreamRevision{Revision: writeResult.GetCurrentRevision()})

	tombstonePosition, isPosition := tombstoneResult.GetPosition()

	if !isPosition {
		log.Fatalln("Must be a position")
	}

	writePosition, _ := writeResult.GetPosition()

	// position returned after we put a tombstone on a stream must be greater
	// than the one of the last event
	if !tombstonePosition.GreaterThan(writePosition) {
		log.Fatalln("Delete position must be greater than last event's write position")
	}
}
Output:

type ContentType

type ContentType string

ContentType represents the content type of the events stored in EventStoreDB. EventStoreDB can store events in a json format or in octet-stream format.

const (
	ContentTypeJson        ContentType = "application/json"
	ContentTypeOctetStream ContentType = "application/octet-stream"
)

type CustomMetadataType

type CustomMetadataType map[string]interface{}

CustomMetadataType is shorthand type for user defined metadata of a stream.

type DeleteResponse

type DeleteResponse struct {
	// contains filtered or unexported fields
}

DeleteResponse is response received when stream is soft-deleted by using Client.DeleteStream.

func (DeleteResponse) GetPosition

func (response DeleteResponse) GetPosition() (Position, bool)

GetPosition returns a position at which stream was soft-deleted. If position was received it will also return a true as a second return value. If position does not exist a zero initialized Position and a false will be returned. Position may not exist if an empty stream was soft-deleted.

type ErrorDetails added in v0.9.2

type ErrorDetails struct {
	TypeUrl string
	Value   []byte
}

ErrorDetails represents various details about an error EventStore might sent to us.

type Filter added in v0.11.0

type Filter struct {
	// FilterByEventType
	// FilterByStreamId
	FilterBy isFilterBy
	// FilterWindowMax
	// FilterNoWindow
	Window                       isFilterWindow
	CheckpointIntervalMultiplier uint32
}

Filter is used to specify a filter when reading events from stream $all. Events can be filtered by event type or by a stream ID. Reading window can be specified to a maximum number of events to read by setting Window to FilterWindowMax. If you do not want to specify a maximum number of events to read set Window to FilterNoWindow. CheckpointIntervalMultiplier must be greater than zero. Default is 1 as specified in DefaultCheckpointIntervalMultiplier.

type FilterByEventType added in v0.11.0

type FilterByEventType struct {
	// PrefixFilterMatcher
	// RegexFilterMatcher
	Matcher filterMatcher
}

FilterByEventType sets a filter matcher for stream $all to match by event type. Matching can be done by a set of prefixes or a regex.

type FilterByStreamId added in v0.11.0

type FilterByStreamId struct {
	// PrefixFilterMatcher
	// RegexFilterMatcher
	Matcher filterMatcher
}

FilterByStreamId sets a filter matcher for stream $all to match by stream ID. Matching can be done by a set of prefixes or a regex.

type FilterNoWindow added in v0.11.0

type FilterNoWindow struct{}

FilterNoWindow is used when we do not want to set the maximum number of events to receive as a result.

type FilterWindowMax added in v0.11.0

type FilterWindowMax struct {
	Max uint32
}

FilterWindowMax is a maximum number of events that we want to receive as a result.

func DefaultFilterWindowMax added in v0.11.0

func DefaultFilterWindowMax() FilterWindowMax

DefaultFilterWindowMax returns a default value for a maximum number of events to read with filter from streams $all.

type Position

type Position struct {
	CommitPosition  uint64
	PreparePosition uint64
}

Position represents event's position in stream $all.

func (Position) GreaterThan

func (position Position) GreaterThan(other Position) bool

GreaterThan returns true if receiver's position is greater than provided position. Position is greater if commit position is greater than argument's commit position or if commit position is the same then it returns true if prepare position is greater than arguments prepare position.

func (Position) LessThan

func (position Position) LessThan(other Position) bool

LessThan returns true if receiver's position is less than provided position. Position is less than if commit position is less than argument's commit position or if commit position is the same then it returns true if prepare position is less than argument's prepare position.

type PrefixFilterMatcher added in v0.11.0

type PrefixFilterMatcher struct {
	PrefixList []string
}

PrefixFilterMatcher represents a list of prefixes used to match against event type or a stream ID.

type ProposedEvent

type ProposedEvent struct {
	EventId      uuid.UUID
	EventType    string
	ContentType  ContentType
	Data         []byte
	UserMetadata []byte
}

ProposedEvent represents an event we want to append to a stream. EventId is a unique id of an event. EventType field is a user defined event type. ContentType will tell EventStoreDB how to store this event. Event can be stored as json or as octet-stream. Data are user defined data to be stored in an event. UserMetadata holds user defined metadata for an event.

type ProposedEventList added in v0.9.2

type ProposedEventList []ProposedEvent

ProposedEventList represents a slice of events.

type ReadDirection added in v0.11.0

type ReadDirection string

ReadDirection specifies a direction in which a stream will be read. Direction can be either forward or backward.

const (
	ReadDirectionForward  ReadDirection = "ReadDirectionForward"
	ReadDirectionBackward ReadDirection = "ReadDirectionBackward"
)

type ReadResponse

type ReadResponse struct {
	// contains filtered or unexported fields
}

ReadResponse represents a response received when reading a stream. When reading a stream we can receive either an event or a checkpoint. Use #GetEvent and GetCheckpoint to determine the result stored in a response.

func (ReadResponse) GetCheckpoint

func (response ReadResponse) GetCheckpoint() (ReadResponseCheckpoint, bool)

GetCheckpoint returns a checkpoint and a boolean which indicates if received value was actually a checkpoint. If returned boolean is false then returned checkpoint is a zero initialized ReadResponseCheckpoint.

func (ReadResponse) GetEvent

func (response ReadResponse) GetEvent() (ResolvedEvent, bool)

GetEvent returns an event and a boolean which indicates if received value was actually an event. If returned boolean is false then returned event is a zero initialized ResolvedEvent.

type ReadResponseCheckpoint

type ReadResponseCheckpoint struct {
	CommitPosition  uint64
	PreparePosition uint64
}

ReadResponseCheckpoint represents a checkpoint stored in a stream. Checkpoints are used to mark certain positions of interest in a stream.

type RecordedEvent

type RecordedEvent struct {
	EventID         uuid.UUID         // ID of an event. Event's ID is provided by user when event is appended to a stream
	EventType       string            // user defined event type
	ContentType     ContentType       // content type used to store event in EventStoreDB. Supported types are ContentTypeJson and ContentTypeOctetStream
	StreamId        string            // stream identifier of a stream on which this event is stored
	EventNumber     uint64            // index number of an event in a stream
	Position        position.Position // event's position in stream $all
	CreatedDateTime time.Time         // a date and time when event was stored in a stream
	Data            []byte            // user data stored in an event
	SystemMetadata  map[string]string // EventStoreDB's metadata set for an event
	UserMetadata    []byte            // user defined metadata
}

RecordedEvent represents an event recorded in the EventStoreDB.

type RegexFilterMatcher added in v0.11.0

type RegexFilterMatcher struct {
	Regex string
}

RegexFilterMatcher represents a regex used to match event type or a stream identifier.

type ResolvedEvent

type ResolvedEvent struct {
	Link           *RecordedEvent
	Event          *RecordedEvent
	CommitPosition *uint64 // nil if no position
}

ResolvedEvent is an event received from a stream. Each event has either Event or Link set. If event has no commit position CommitPosition will be nil.

func (ResolvedEvent) GetOriginalEvent

func (resolved ResolvedEvent) GetOriginalEvent() *RecordedEvent

GetOriginalEvent returns an original event. It chooses between Link and Event fields. Link field has precedence over Event field.

func (ResolvedEvent) ToProposedEvent added in v0.11.0

func (this ResolvedEvent) ToProposedEvent() ProposedEvent

ToProposedEvent returns event converted to ProposedEvent.

type ResolvedEventList added in v0.11.0

type ResolvedEventList []ResolvedEvent

ResolvedEventList is a shorthand type for slice of events.

func (ResolvedEventList) Reverse added in v0.11.0

func (list ResolvedEventList) Reverse() ResolvedEventList

Reverse returns a reversed slice of events.

func (ResolvedEventList) ToProposedEvents added in v0.11.0

func (list ResolvedEventList) ToProposedEvents() ProposedEventList

ToProposedEvents returns a slice of events, where each event is converted to ProposedEvent.

type StreamAcl

type StreamAcl struct {
	// ReadRoles is a list of users which can read from a stream.
	// If ReadRoles is empty that means that any user can read from a stream.
	ReadRoles []string `json:"$r"`
	// WriteRoles is a list of users which can write events to a stream.
	// If WriteRoles is empty that means that any user can write events to a stream.
	WriteRoles []string `json:"$w"`
	// DeleteRoles is a list of users  which can perform soft and hard delete of a stream.
	// If DeleteRoles is empty that means that any user can perform soft and hard delete of a stream.
	DeleteRoles []string `json:"$d"`
	// MetaReadRoles is a list of users which can read stream's metadata.
	// If MetaReadRoles is empty that means that any user can read stream's metadata.
	MetaReadRoles []string `json:"$mr"`
	// MetaWriteRoles is a list of users which can write stream's metadata.
	// If MetaWriteRoles is empty that means that any user can write stream's metadata.
	MetaWriteRoles []string `json:"$mw"`
}

StreamAcl represents an access control list for a stream. It is set through stream's metadata with Client.SetStreamMetadata. User must have a write access to a stream's metadata stream in order to be able to set access control list. Read more about stream ACL at https://developers.eventstore.com/server/v21.6/security/acl.html#stream-acl.

type StreamMetadata

type StreamMetadata struct {
	// MaxAgeInSeconds Sets a sliding window based on dates.
	// When data reaches a certain age it disappears automatically from the stream and is
	// considered eligible for scavenging.
	// This value is set as an integer representing the number of seconds. This value must be >= 1.
	MaxAgeInSeconds *uint64 `json:"$maxAge"`
	// TruncateBefore indicates a stream's revision before all events in a stream are truncated from
	// stream read operations.
	// If TruncateBefore is 4 that means that all events before revision 4 are truncated from stream
	// read operation.
	// Truncation naturally occurs when a soft-delete is performed on a stream with Client.DeleteStream.
	TruncateBefore *uint64 `json:"$tb"`
	// This controls the cache of the head of a stream.
	// Most URIs in a stream are infinitely cacheable but the head by default will not cache.
	// It may be preferable in some situations to set a small amount of caching on the head to
	// allow intermediaries to handle polls (say 10 seconds).
	// The argument is an integer representing the seconds to cache. This value must be >= 1.
	CacheControlInSeconds *uint64 `json:"$cacheControl"`
	// Access Control List for a stream.
	Acl *StreamAcl `json:"$acl"`
	// Sets a sliding window based on the number of items in the stream.
	// When data reaches a certain length it disappears automatically from the stream and is
	// considered eligible for scavenging.
	// This value is set as an integer representing the count of items. This value must be >= 1.
	MaxCount *int `json:"$maxCount"`
	// User defined metadata for a stream.
	CustomMetadata CustomMetadataType
}

StreamMetadata is the metadata of a stream. You can read more about stream metadata at https://developers.eventstore.com/server/v21.6/streams/metadata-and-reserved-names.html#reserved-names

func (StreamMetadata) MarshalJSON

func (b StreamMetadata) MarshalJSON() ([]byte, error)

MarshalJSON implements JSON marshaller interface

func (*StreamMetadata) UnmarshalJSON

func (b *StreamMetadata) UnmarshalJSON(data []byte) error

UnmarshalJSON implements JSON marshaller interface

type StreamMetadataResult

type StreamMetadataResult struct {
	// contains filtered or unexported fields
}

StreamMetadataResult is stream's metadata read by Client.GetStreamMetadata.

Streams metadata hold information about a stream. Some of that information is an access control list (acl) for a stream. See StreamMetadata for more info.

Stream does not need to have metadata set. If stream has no metadata set than IsEmpty returns false.

func (StreamMetadataResult) GetMetaStreamRevision

func (result StreamMetadataResult) GetMetaStreamRevision() uint64

GetMetaStreamRevision returns a current revision of stream's metadata stream if stream has metadata set. If stream has no metadata set it returns 0. Use IsEmpty to determine if stream has any metadata set.

func (StreamMetadataResult) GetStreamId

func (result StreamMetadataResult) GetStreamId() string

GetStreamId returns a stream's identifier to which metadata relates to.

func (StreamMetadataResult) GetStreamMetadata

func (result StreamMetadataResult) GetStreamMetadata() StreamMetadata

GetStreamMetadata returns stream's latest metadata. If stream has no metadata set it returns a zero initialized StreamMetadata.

func (StreamMetadataResult) IsEmpty added in v0.11.0

func (result StreamMetadataResult) IsEmpty() bool

IsEmpty returns true if stream's metadata stream has nothing stored in it.

type StreamNotFoundError added in v0.11.0

type StreamNotFoundError struct {
	// contains filtered or unexported fields
}

StreamNotFoundError is an error returned if we tried to read a stream which does not exist.

func (StreamNotFoundError) Code added in v0.11.0

func (streamNotFound StreamNotFoundError) Code() errors.ErrorCode

Code returns a code of an error. Use this to determine the error type.

func (StreamNotFoundError) Error added in v0.11.0

func (streamNotFound StreamNotFoundError) Error() string

Error returns a string representation of an error.

func (StreamNotFoundError) GetStreamId added in v0.11.0

func (streamNotFound StreamNotFoundError) GetStreamId() string

GetStreamId returns a stream identifier of a stream which does not exist. This identifier was set in a read request set to EventStoreDB.

type StreamReader

type StreamReader interface {
	// ReadOne reads one message from a stream.
	// Message can be read after a successful read/subscription is
	// established with EventStoreDB's event stream.
	// Message must contain either an event or a checkpoint.
	// If message contains subscription confirmation or a stream-not-found this method must panic.
	ReadOne() (ReadResponse, errors.Error)
	// Close closes a protobuf stream used to read or subscribe to stream's events.
	Close()
}

StreamReader is an interface which represents a reader of a stream. Implementation of this interface should read a stream in a dedicated go routine to avoid issues which can occur when multiple go routines are trying to read data from protobuf stream.

type TombstoneResponse

type TombstoneResponse struct {
	// contains filtered or unexported fields
}

TombstoneResponse is response received when stream is hard-deleted by using Client.TombstoneStream.

func (TombstoneResponse) GetPosition

func (response TombstoneResponse) GetPosition() (Position, bool)

GetPosition returns a position at which stream was hard-deleted. If position was received it will also return a true as a second return value. If position does not exist a zero initialized Position and a false will be returned. Position may not exist if an empty stream was hard-deleted.

type WrongExpectedVersion added in v0.10.0

type WrongExpectedVersion struct {
	// contains filtered or unexported fields
}

WrongExpectedVersion is an error returned when writing events to a stream fails gracefully. That means that if grpc connection is broken, WrongExpectedVersion error will not be returned, but a more general error of type errors.Error will be returned. Writing of events can fail gracefully if client sends a wrong expected revision.

func (WrongExpectedVersion) Code added in v0.10.0

func (exception WrongExpectedVersion) Code() errors.ErrorCode

Code returns a code of an error received. Currently, only WrongExpectedVersionErr code will be returned.

func (WrongExpectedVersion) Error added in v0.10.0

func (exception WrongExpectedVersion) Error() string

Error returns a string representation of an error

func (WrongExpectedVersion) GetCurrentRevision added in v0.10.0

func (exception WrongExpectedVersion) GetCurrentRevision() (uint64, bool)

GetCurrentRevision returns an actual current revision of a stream and a true value if current revision was received. If no current revision was received a zero is returned together with a false value.

func (WrongExpectedVersion) GetExpectedRevision added in v0.10.0

func (exception WrongExpectedVersion) GetExpectedRevision() uint64

GetExpectedRevision returns a finite expected revision of a stream which client has sent when it wanted to open a protobuf stream to write events. It panics if client did not send a finite expected revision. Use IsExpectedRevisionFinite to check if expected revision is finite.

func (WrongExpectedVersion) IsCurrentRevisionNoStream added in v0.10.0

func (exception WrongExpectedVersion) IsCurrentRevisionNoStream() bool

IsCurrentRevisionNoStream returns true if client has sent WriteStreamRevisionNoStream when it wanted to open a protobuf stream to write events to EventStoreDB's event stream.

func (WrongExpectedVersion) IsExpectedRevisionAny added in v0.10.0

func (exception WrongExpectedVersion) IsExpectedRevisionAny() bool

IsExpectedRevisionAny returns true if client has sent WriteStreamRevisionAny when it wanted to open a protobuf stream to write events to EventStoreDB's event stream.

func (WrongExpectedVersion) IsExpectedRevisionFinite added in v0.10.0

func (exception WrongExpectedVersion) IsExpectedRevisionFinite() bool

IsExpectedRevisionFinite returns true if client has sent a finite expected revision of a stream.

func (WrongExpectedVersion) IsExpectedRevisionNoStream added in v0.10.0

func (exception WrongExpectedVersion) IsExpectedRevisionNoStream() bool

IsExpectedRevisionNoStream returns true if client has sent WriteStreamRevisionNoStream when it wanted to open a protobuf stream to write events to EventStoreDB's event stream.

func (WrongExpectedVersion) IsExpectedRevisionStreamExists added in v0.10.0

func (exception WrongExpectedVersion) IsExpectedRevisionStreamExists() bool

IsExpectedRevisionStreamExists returns true if client has sent WriteStreamRevisionStreamExists when it wanted to open a protobuf stream to write events to EventStoreDB's event stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL