fifo

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2022 License: BSD-3-Clause Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const BatchSize = 200

Variables

View Source
var ErrBadPoolKey = errors.New("pool key must be 'kafka.offset' in ascending order")

Functions

func RunLocalQuery

func RunLocalQuery(ctx context.Context, zctx *zed.Context, batch *zbuf.Array, query string) (*zbuf.Array, error)

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(zctx *zed.Context, opts []kgo.Opt, reg *srclient.SchemaRegistryClient, format, topic string, startAt int64, meta bool) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) ReadValue

func (c *Consumer) ReadValue(ctx context.Context) (*zed.Value, error)

ReadValue returns the next value. Unlike zio.Reader.Read, the caller receives ownership of zed.Value.Bytes.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context, w zio.Writer, timeout time.Duration) error

func (*Consumer) Watermarks

func (c *Consumer) Watermarks(ctx context.Context) (int64, int64, error)

type Lake

type Lake struct {
	// contains filtered or unexported fields
}

func NewLake

func NewLake(ctx context.Context, poolName, shaper string, server lakeapi.Interface) (*Lake, error)

func (*Lake) LoadBatch

func (l *Lake) LoadBatch(ctx context.Context, zctx *zed.Context, batch *zbuf.Array) (ksuid.KSUID, error)

func (*Lake) NextConsumerOffset

func (l *Lake) NextConsumerOffset(ctx context.Context, topic string) (int64, error)

func (*Lake) Query

func (l *Lake) Query(ctx context.Context, src string) (*zbuf.Array, error)

func (*Lake) ReadBatch

func (l *Lake) ReadBatch(ctx context.Context, topic string, offset int64, size int) (zbuf.Batch, error)

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(opts []kgo.Opt, reg *srclient.SchemaRegistryClient, format, topic, namespace string) (*Producer, error)

func (*Producer) HeadOffset

func (p *Producer) HeadOffset(ctx context.Context) (int64, error)

func (*Producer) Run

func (p *Producer) Run(ctx context.Context, reader zio.Reader) error

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, batch zbuf.Batch) error

type To

type To struct {
	// contains filtered or unexported fields
}

To provides a means to sync from a Zed data pool to a Kafka topic in a consistent and crash-recoverable fashion. The data synced to the topic has the same offset as the kafka.offset field in the records in the pool.

func NewTo

func NewTo(zctx *zed.Context, dst *Producer, src *Lake) *To

func (*To) Sync

func (t *To) Sync(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL