bbq

package
v1.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: BSD-3-Clause Imports: 14 Imported by: 0

README

BBQ

BBQ is a library that writes messages from a Kafka topic directly into BigQuery. To use it, we need to define a list of TableOptions which specifies the input topic we wish to write, the topic's object and its codec, and the expiration time of the table in which the message will be written.

The table will be automatically created, and its name will be equal to the topic's name. If the structure of the object being written changes, the table's schema will be automatically updated and appended non-existing fields to the schema. However, and this is a BigQuery limitation, if a field of the object is removed, it won't be remove from the BBQ schema.

Example


The following code is a working example of BBQ using Goka as our Kafka processing library.

package main

import (
	"context"
	"log"
	"time"

	"cloud.google.com/go/bigquery"
	"github.com/lovoo/goka"
	"github.com/lovoo/goka-tools/bbq"
	"github.com/lovoo/goka/codec"
)

// tableOptions retypes the bbq.TableOptions-Type to allow extracting
// a list of goka-edges to initialize the processor.
type tableOptions []*bbq.TableOptions

func (tt tableOptions) edges(consumer goka.ProcessCallback) []goka.Edge {
	var edges []goka.Edge
	for _, option := range tt {
		edges = append(edges, goka.Input(option.Input, option.Codec, consumer))
	}
	return edges
}

func main() {
	tables := []*bbq.TableOptions{
		&bbq.TableOptions{
			Input:            "topic_name",
			Obj:              []byte{},
			TimePartitioning: &bigquery.TimePartitioning{Expiration: 14 * 24 * time.Hour},
			Codec:            new(codec.Bytes),
		},
	}

	bbq, err := bbq.NewBbq("gcp-project", "target-dataset", tables)
	if err != nil {
		log.Fatalf("Unable to create new BBQ: %v", err)
	}

	proc, err := goka.NewProcessor([]string{"kafka", "brokers"}, goka.DefineGroup(
		"bbq-group",
		tableOptions(tables).edges(bbq.Consume)...,
	), goka.WithClientID("bbq"))

	if err != nil {
		log.Fatalf("Unable to create Goka processor: %v", err)
	}

	done := make(chan bool)
	go func() {
		defer close(done)
		if err = proc.Run(context.Background()); err != nil {
			log.Fatalf("error running processor: %v", err)
		}
	}()

	bbq.Stop(10 * time.Second)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bbq

type Bbq struct {
	// contains filtered or unexported fields
}

Bbq writes the contents of kafka topics to bigquery

func NewBbq

func NewBbq(gcpproject string, datesetName string, tables []*TableOptions, metricsNamespace string) (*Bbq, error)

NewBbq creates a new Bbq struct.

func (Bbq) Collect added in v0.1.1

func (m Bbq) Collect(ch chan<- prometheus.Metric)

func (*Bbq) Consume

func (b *Bbq) Consume(ctx goka.Context, msg interface{})

Consume consumes the streams

func (Bbq) Describe added in v0.1.1

func (m Bbq) Describe(ch chan<- *prometheus.Desc)

func (*Bbq) Stop

func (b *Bbq) Stop(timeout time.Duration)

Stop drains the batches in the bbq-uploaders and blocks until they're done

type BytesCodec added in v0.1.1

type BytesCodec struct{}

func (*BytesCodec) Decode added in v0.1.1

func (d *BytesCodec) Decode(data []byte) (interface{}, error)

func (*BytesCodec) Encode added in v0.1.1

func (d *BytesCodec) Encode(value interface{}) ([]byte, error)

type MessageA added in v0.1.1

type MessageA struct {
	A string
	B int32
	C *MessageB
	D []*MessageB
}

type MessageB added in v0.1.1

type MessageB struct {
	A int32
	B map[string]string
}

type TableOptions

type TableOptions struct {
	Obj              interface{}
	TimePartitioning *bigquery.TimePartitioning
	Input            goka.Stream
	Codec            goka.Codec
	// CustomSchema allows us to specify the table's schema.
	CustomSchema func() (bigquery.Schema, error)
	// CustomObject allows us to modify the input value into another one.
	CustomObject func(interface{}) interface{}
}

TableOptions represents one kafka topic and the connected codec

func (*TableOptions) Name

func (to *TableOptions) Name() string

Name returns the name of the topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL