state_stores

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewChangeLogger

func NewChangeLogger(producer kafka.Producer, tp kafka.TopicPartition) topology.ChangeLogger

func NewStoreBuilder

func NewStoreBuilder(name string, keyEncoder encoding.Encoder, valEncoder encoding.Encoder, options ...StoreBuilderOption) topology.LoggableStoreBuilder

Types

type ChangelogBuilderOption

type ChangelogBuilderOption func(store *changelogBuilder)

func ChangelogWithSourceTopic

func ChangelogWithSourceTopic(topic string) ChangelogBuilderOption

func ChangelogWithTopicConfigs

func ChangelogWithTopicConfigs(config map[string]string) ChangelogBuilderOption

func ChangelogWithTopicReplicaCount

func ChangelogWithTopicReplicaCount(count int16) ChangelogBuilderOption

func ChangelogWithTopicTopicNameFormatter

func ChangelogWithTopicTopicNameFormatter(fn ChangelogTopicFormatter) ChangelogBuilderOption

type ChangelogLoggerBuilder

type ChangelogLoggerBuilder interface {
	Build(ctx topology.SubTopologyContext, store string) (topology.ChangeLogger, error)
}

type ChangelogStatus

type ChangelogStatus string

type ChangelogTopicFormatter

type ChangelogTopicFormatter func(storeName string) func(ctx topology.BuilderContext) string

type OffsetStore

type OffsetStore interface {
	Commit(ctx context.Context, tp kafka.TopicPartition, offset kafka.Offset) error
	Committed(tp kafka.TopicPartition) (kafka.Offset, error)
}

type StateStore

type StateStore struct {
	stores.Store
	topology.ChangelogSyncer
	// contains filtered or unexported fields
}

func (*StateStore) Delete

func (str *StateStore) Delete(ctx context.Context, key interface{}) error

func (*StateStore) Flush

func (str *StateStore) Flush() error

func (*StateStore) Get

func (str *StateStore) Get(ctx context.Context, key interface{}) (interface{}, error)

func (*StateStore) Iterator

func (str *StateStore) Iterator(ctx context.Context) (stores.Iterator, error)

func (*StateStore) PrefixedIterator

func (str *StateStore) PrefixedIterator(ctx context.Context, keyPrefix interface{}, prefixEncoder encoding.Encoder) (stores.Iterator, error)

func (*StateStore) ResetCache

func (str *StateStore) ResetCache()

func (*StateStore) Set

func (str *StateStore) Set(ctx context.Context, key, value interface{}, expiry time.Duration) error

type StoreBuilderOption

type StoreBuilderOption func(builder *stateStoreBuilder)

func ChangelogSyncEnabled

func ChangelogSyncEnabled() StoreBuilderOption

func LoggingDisabled

func LoggingDisabled() StoreBuilderOption

func StoreBuilderWithKeyEncoder

func StoreBuilderWithKeyEncoder(encoder encoding.Encoder) StoreBuilderOption

func StoreBuilderWithStoreOption

func StoreBuilderWithStoreOption(options ...stores.Option) StoreBuilderOption

func StoreBuilderWithValEncoder

func StoreBuilderWithValEncoder(encoder encoding.Encoder) StoreBuilderOption

func UseStoreBuilder

func UseStoreBuilder(nativeBuilder stores.StoreBuilder) StoreBuilderOption

func WithChangelogOptions

func WithChangelogOptions(options ...ChangelogBuilderOption) StoreBuilderOption

func WithChangelogSyncDisabled

func WithChangelogSyncDisabled() StoreBuilderOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL