pebble

package
v0.0.0-...-434faa1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrorTableNotInitalized = errors.New("pebble.DB not initalized")
	ErrorPipeClosed         = errors.New("pebble.Reader pipe closed")
	ErrorInvalidStartOffset = errors.New("pebble.Reader invalid start offset")
	ErrorOffsetNotFound     = errors.New("pebble.Reader could not find start offset")
	ErrorReicevedOldMessage = errors.New("pebble.Reader received old message from pebble.Client")
)
View Source
var Metrics = &Mtx{
	Reads: prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "kstore",
			Subsystem: "pebble",
			Name:      "reads_total",
			Help:      "Messages read by topic and status",
		},
		[]string{"topic", "status"},
	),
	Writes: prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "kstore",
			Subsystem: "pebble",
			Name:      "writes_total",
			Help:      "Messages written by topic",
		},
		[]string{"topic"},
	),
}

Functions

func Offset

func Offset(storageKey []byte) uint64

Offset extracts and returns the offset from a given storage key.

func OffsetBytes

func OffsetBytes(offset uint64) []byte

OffsetBytes converts an offset to a byte slice of 8 bytes.

func StorageKey

func StorageKey(msg api.Message) []byte

StorageKey returns a []byte key used for storing messages ordered by offset.

func StorageValue

func StorageValue(msg api.Message) []byte

Types

type AcquireMode

type AcquireMode int
const (
	AcquireModeRead AcquireMode = iota
	AcquireModeWrite
)

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(prefix string) *Client

func (*Client) AcquireDB

func (c *Client) AcquireDB(topic string, mode AcquireMode) (*pebble.DB, context.CancelFunc, error)

func (*Client) Close

func (c *Client) Close() error

func (*Client) CreateDB

func (c *Client) CreateDB(topic string) error

func (*Client) CreateTopics

func (c *Client) CreateTopics(ctx context.Context, topics ...string) (api.TopicErrors, error)

func (*Client) DeleteDB

func (c *Client) DeleteDB(topic string) error

func (*Client) DeleteTopics

func (c *Client) DeleteTopics(ctx context.Context, topics ...string) (api.TopicErrors, error)

func (*Client) FindFirst

func (c *Client) FindFirst(ctx context.Context, topic string) ([]byte, error)

func (*Client) FindLast

func (c *Client) FindLast(ctx context.Context, topic string) ([]byte, error)

func (*Client) Get

func (c *Client) Get(topic string, storageKey []byte) (api.Message, error)

func (*Client) GetLogger

func (c *Client) GetLogger() api.LoggerFunc

func (*Client) IsExistsError

func (c *Client) IsExistsError(err error) bool

func (*Client) NewReader

func (c *Client) NewReader(topic string) api.Reader

func (*Client) NewWriter

func (c *Client) NewWriter() api.Writer

func (*Client) Read

func (c *Client) Read(ctx context.Context, topic string, partition int, offset *uint64) (api.Message, error)

func (*Client) ReadNext

func (c *Client) ReadNext(ctx context.Context, topic string, currentKey []byte) (api.Message, error)

func (*Client) SetLogger

func (c *Client) SetLogger(fn api.LoggerFunc)

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, topic string, fn HandleFunc) error

func (*Client) Write

func (c *Client) Write(ctx context.Context, topic string, msg ...api.Message) error

Write writes a message.

type HandleFunc

type HandleFunc func(ctx context.Context, message *Message) error

type Message

type Message struct{ kschema.Message }

type Mtx

type Mtx struct {
	Reads  *prometheus.CounterVec
	Writes *prometheus.CounterVec
}

func (*Mtx) GetReads

func (m *Mtx) GetReads(topic string, status ...OffsetStatus) map[OffsetStatus]int

func (*Mtx) GetWrites

func (m *Mtx) GetWrites(topic string) int

func (*Mtx) ObserveRead

func (m *Mtx) ObserveRead(msg api.Message, topic string, status OffsetStatus)

func (*Mtx) ObserveWrite

func (m *Mtx) ObserveWrite(topic string)

type OffsetStatus

type OffsetStatus int
const (
	OffsetStatusOlder OffsetStatus = iota
	OffsetStatusCurrent
	OffsetStatusNewer
)

func CompareOffsetByKey

func CompareOffsetByKey(currentKey, otherKey []byte) OffsetStatus

compareOffset compares the offset of the message with the offset of the last message seen. NOTE: Must be protected by r.mu!

func (OffsetStatus) String

func (s OffsetStatus) String() string

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(client *Client, topic string, startOffset StartOffset) *Reader

func (*Reader) Close

func (r *Reader) Close() error

func (*Reader) Commit

func (r *Reader) Commit(ctx context.Context, msg api.Message) error

func (*Reader) Get

func (r *Reader) Get(storageKey []byte) (api.Message, error)

func (*Reader) Read

func (r *Reader) Read(ctx context.Context) (api.Message, error)

func (*Reader) Validate

func (r *Reader) Validate() error

type StartOffset

type StartOffset int
const (
	StartOffsetFirst StartOffset = iota
	StartOffsetLast
)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(c *Client) *Writer

func (*Writer) Close

func (w *Writer) Close() error

func (*Writer) Write

func (w *Writer) Write(ctx context.Context, topic string, messages ...api.Message) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL