pkafka

package module
v0.0.0-...-8490895 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2023 License: BSD-3-Clause Imports: 14 Imported by: 0

README

pkafka

This package implements a persistent writer for Kafka. It'll try to write to Kafka asynchronously, and if it fails, it'll retry until it succeeds. It'll persist the messages on either local or GCS disk, and will retry from there if it fails. Upon initialization, it'll look for any messages that were not written to Kafka, and will try to write them again. It's designed to be used in a long-running process, and it'll keep retrying until it succeeds.

It uses the amazing franz-go package to do the heavy lifting.

Usage

You initialize the write with the New() function. It takes a storage type, pkafka.LocalStorage or pkafka.GCSStorage, and a "bucketname". For GCS the bucketname is the name of the bucket, for local storage it is the path to the directory where the messages will be stores.

In addition, New takes a list of kgo.Option, which are passed to the franz-go client. It will add the option to enable transactions, which are needed for the persistent writer to work. Without it, we'd possibly change the messaging order.

Example:

package main

import (
	"context"
	"github.com/perbu/pkafka"
	"github.com/twmb/franz-go/pkg/kgo"
	"os"
	"os/signal"
	"sync"
)

func main() {
	// Create a new writer
	writer, err := pkafka.New(pkafka.LocalStorage, "my-dir", kgo.SeedBrokers("localhost:9092"))
	if err != nil {
		panic(err)
	}
	defer writer.Close()
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		// Start the writer
		err = writer.Run(ctx)
		if err != nil {
			panic(err)
		}
	}()
	// create a message:
	msg := kgo.Record{
		Topic: "my-topic",
        Value: []byte("my-value"),
    }
	// Write a message
	err = writer.PersistentProduce(msg)
    if err != nil {
		panic(err)
	}
    
	// Wait for the writer to finish (Ctrl+C)
	wg.Wait()
}

Documentation

Overview

Package pkafka provides a kafka producer that tries really hard never to lose messages. It'll retry messages that fail to send, and will buffer messages to disk, so we're sure we're not loosing them.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrKafkaDown is returned when kafka is down.
	ErrKafkaDown = fmt.Errorf("kafka is down")
)

Functions

This section is empty.

Types

type AbstractStorage

type AbstractStorage interface {
	ListObjects(context.Context) ([]string, error)
	OpenObjectForWriting(context.Context, string) (io.WriteCloser, error)
	OpenObjectForReading(context.Context, string) (io.ReadCloser, error)
	DeleteObject(context.Context, string) error
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(sType StorageType, bucketName string, opts ...kgo.Opt) (*Client, error)

New creates a new kafka client. It will apply the options you pass to it. kgo.TransactionalID is set to "perbu" by default to make the transactions work. See kgo.NewClient for more info on the options. It will ping kafka to make sure it's up and running and fail if it's not.

func (*Client) GetMsgCount

func (c *Client) GetMsgCount() uint64

func (*Client) PersistentProduce

func (c *Client) PersistentProduce(msg kgo.Record) error

func (*Client) Run

func (c *Client) Run(ctx context.Context) error

Run starts the goroutine that will flush the buffer to kafka or GCS. this will reduce the need to locking, and will make the producer async.

func (*Client) Status

func (c *Client) Status() (string, error)

type PersistedMessage

type PersistedMessage struct {
	Key       []byte
	Value     []byte
	Timestamp time.Time
	Topic     string
}

PersistedMessage is the struct that is persisted to storage. It can be used to reconstruct the original kgo.Record, but suited for storage.

type StorageType

type StorageType int
const (
	LocalStorage StorageType = iota + 1
	GCSStorage
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL