persist

package
v2.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2024 License: MIT Imports: 9 Imported by: 6

Documentation

Overview

Package persist provides abstract structures for checkpoint persistence.

Index

Constants

View Source
const (
	// StartOfStream is a constant defined to represent the start of a partition stream in EventHub.
	StartOfStream = "-1"

	// EndOfStream is a constant defined to represent the current end of a partition stream in EventHub.
	// This can be used as an offset argument in receiver creation to start receiving from the latest
	// event, instead of a specific offset or point in time.
	EndOfStream = "@latest"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	Offset         string    `json:"offset"`
	SequenceNumber int64     `json:"sequenceNumber"`
	EnqueueTime    time.Time `json:"enqueueTime"`
}

Checkpoint is the information needed to determine the last message processed

func NewCheckpoint

func NewCheckpoint(offset string, sequence int64, enqueueTime time.Time) Checkpoint

NewCheckpoint contains the information needed to checkpoint Event Hub progress

func NewCheckpointFromEndOfStream

func NewCheckpointFromEndOfStream() Checkpoint

NewCheckpointFromEndOfStream returns a checkpoint for the end of the stream

func NewCheckpointFromStartOfStream

func NewCheckpointFromStartOfStream() Checkpoint

NewCheckpointFromStartOfStream returns a checkpoint for the start of the stream

type CheckpointPersister

type CheckpointPersister interface {
	Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error
	Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)
}

CheckpointPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and offset so that if a receiver where to be interrupted, it could resume after the last consumed event.

type FilePersister

type FilePersister struct {
	// contains filtered or unexported fields
}

FilePersister implements CheckpointPersister for saving to the file system

func NewFilePersister

func NewFilePersister(directory string) (*FilePersister, error)

NewFilePersister creates a FilePersister for saving to a given directory

func (*FilePersister) Read

func (fp *FilePersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)

func (*FilePersister) Write

func (fp *FilePersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error

type MemoryPersister

type MemoryPersister struct {
	// contains filtered or unexported fields
}

MemoryPersister is a default implementation of a Hub CheckpointPersister, which will persist offset information in memory.

func NewMemoryPersister

func NewMemoryPersister() *MemoryPersister

NewMemoryPersister creates a new in-memory storage for checkpoints

MemoryPersister is only intended to be shared with EventProcessorHosts within the same process. This implementation is a toy. You should probably use the Azure Storage implementation or any other that provides durable storage for checkpoints.

func (*MemoryPersister) Read

func (p *MemoryPersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)

func (*MemoryPersister) Write

func (p *MemoryPersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL