kafka

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

README

go-confluent-kafka

A library provides consumer/producer to work with kafka, avro and schema registry in confluent

Producer

package main

import (
	"flag"
	"fmt"
	"gitlab.com/ihsanul14/go-confluent-kafka"
	"time"
)

var kafkaServers = []string{"localhost:9092"}
var schemaRegistryServers = []string{"http://localhost:8081"}
var topic = "test"

func main() {
	var n int
	schema := `{
				"type": "record",
				"name": "Example",
				"fields": [
					{"name": "Id", "type": "string"},
					{"name": "Type", "type": "string"},
					{"name": "Data", "type": "string"}
				]
			}`
	producer, err := kafka.NewAvroProducer(kafkaServers, schemaRegistryServers)
	if err != nil {
		fmt.Printf("Could not create avro producer: %s", err)
	}
	flag.IntVar(&n, "n", 1, "number")
	flag.Parse()
	for i := 0; i < n; i++ {
		fmt.Println(i)
		addMsg(producer, schema)
	}
}

func addMsg(producer *kafka.AvroProducer, schema string) {
	value := `{
		"Id": "1",
		"Type": "example_type",
		"Data": "example_data"
	}`
	key := time.Now().String()
	err := producer.Add(topic, schema, []byte(value))
	fmt.Println(key)
	if err != nil {
		fmt.Printf("Could not add a msg: %s", err)
	}
}

Producer with Plain SASL - SSL

package main

import (
	"flag"
	"fmt"
	"gitlab.com/ihsanul14/go-confluent-kafka"
	"time"
)

var kafkaServers = []string{"localhost:9092"}
var schemaRegistryServers = []string{"https://localhost:8081"}
var topic = "test"

func main() {
	var n int
	schema := `{
				"type": "record",
				"name": "Example",
				"fields": [
					{"name": "Id", "type": "string"},
					{"name": "Type", "type": "string"},
					{"name": "Data", "type": "string"}
				]
			}`

	var config = &AvroProducerConfig{
		KafkaServers : kafkaServers,
		SchemaRegistryServers: schemaRegistryServers,
		SASL: &SASLConfig{
			Username: "username",
			Password: "password",
			TLSConfig: &tls.Config{}
		}
	}
	producer, err := kafka.NewAvroProducer(config)
	if err != nil {
		fmt.Printf("Could not create avro producer: %s", err)
	}
	flag.IntVar(&n, "n", 1, "number")
	flag.Parse()
	for i := 0; i < n; i++ {
		fmt.Println(i)
		addMsg(producer, schema)
	}
}

func addMsg(producer *kafka.AvroProducer, schema string) {
	value := `{
		"Id": "1",
		"Type": "example_type",
		"Data": "example_data"
	}`
	key := time.Now().String()
	err := producer.Add(topic, schema, []byte(value))
	fmt.Println(key)
	if err != nil {
		fmt.Printf("Could not add a msg: %s", err)
	}
}

Consumer

package main

import (
	"fmt"
	"gitlab.com/ihsanul14/go-confluent-kafka"
	"github.com/bsm/sarama-cluster"
)

var kafkaServers = []string{"localhost:9092"}
var schemaRegistryServers = []string{"http://localhost:8081"}
var topic = "test"

func main() {
	consumerCallbacks := kafka.ConsumerCallbacks{
		OnDataReceived: func(msg kafka.Message) {
			fmt.Println(msg)
		},
		OnError: func(err error) {
			fmt.Println("Consumer error", err)
		},
		OnNotification: func(notification *cluster.Notification) {
			fmt.Println(notification)
		},
	}

	consumer, err := kafka.NewAvroConsumer(kafkaServers, schemaRegistryServers, topic, "consumer-group", consumerCallbacks)
	if err != nil {
		fmt.Println(err)
	}
	consumer.Consume()
}
References

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewAvroConsumer

func NewAvroConsumer(cfg AvroConsumerConfig) (*avroConsumer, error)

avroConsumer is a basic consumer to interact with schema registry, avro and kafka

Types

type AvroConsumerConfig added in v1.0.1

type AvroConsumerConfig struct {
	KafkaServers          []string
	SchemaRegistryServers []string
	Topic                 string
	GroupId               string
	Callbacks             ConsumerCallbacks
	SASL                  *SASLConfig
}

type AvroProducer

type AvroProducer struct {
	SASL *SASLConfig
	// contains filtered or unexported fields
}

func NewAvroProducer

func NewAvroProducer(cfg AvroProducerConfig) (*AvroProducer, error)

NewAvroProducer is a basic producer to interact with schema registry, avro and kafka

func (*AvroProducer) Add

func (ap *AvroProducer) Add(topic string, schema string, value []byte) error

func (*AvroProducer) Close

func (ac *AvroProducer) Close()

func (*AvroProducer) GetSchemaId

func (ap *AvroProducer) GetSchemaId(topic string, avroCodec *goavro.Codec) (int, error)

GetSchemaId get schema id from schema-registry service

type AvroProducerConfig added in v1.0.1

type AvroProducerConfig struct {
	KafkaServers          []string
	SchemaRegistryServers []string
	SASL                  *SASLConfig
}

type CachedSchemaRegistryClient

type CachedSchemaRegistryClient struct {
	SchemaRegistryClient *SchemaRegistryClient

	SASL *SASLConfig
	// contains filtered or unexported fields
}

CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance

func NewCachedSchemaRegistryClient

func NewCachedSchemaRegistryClient(connect []string, saslConfig *SASLConfig) *CachedSchemaRegistryClient

func NewCachedSchemaRegistryClientWithRetries

func NewCachedSchemaRegistryClientWithRetries(connect []string, retries int, saslConfig *SASLConfig) *CachedSchemaRegistryClient

func (*CachedSchemaRegistryClient) CreateSubject

func (client *CachedSchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)

CreateSubject will return and cache the id with the given codec

func (*CachedSchemaRegistryClient) DeleteSubject

func (client *CachedSchemaRegistryClient) DeleteSubject(subject string) error

DeleteSubject deletes the subject, should only be used in development

func (*CachedSchemaRegistryClient) DeleteVersion

func (client *CachedSchemaRegistryClient) DeleteVersion(subject string, version int) error

DeleteVersion deletes the a specific version of a subject, should only be used in development.

func (*CachedSchemaRegistryClient) GetLatestSchema

func (client *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)

GetLatestSchema returns the highest version schema for a subject

func (*CachedSchemaRegistryClient) GetSchema

func (client *CachedSchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)

GetSchema will return and cache the codec with the given id

func (*CachedSchemaRegistryClient) GetSchemaByVersion

func (client *CachedSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)

GetSchemaByVersion returns the codec for a specific version of a subject

func (*CachedSchemaRegistryClient) GetSubjects

func (client *CachedSchemaRegistryClient) GetSubjects() ([]string, error)

GetSubjects returns a list of subjects

func (*CachedSchemaRegistryClient) GetVersions

func (client *CachedSchemaRegistryClient) GetVersions(subject string) ([]int, error)

GetVersions returns a list of all versions of a subject

func (*CachedSchemaRegistryClient) IsSchemaRegistered

func (client *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)

IsSchemaRegistered checks if a specific codec is already registered to a subject

type ConsumerCallbacks

type ConsumerCallbacks struct {
	OnDataReceived func(msg Message)
	OnError        func(err error)
	OnNotification func(notification *cluster.Notification)
}

type Error

type Error struct {
	ErrorCode int    `json:"error_code"`
	Message   string `json:"message"`
}

Error holds more detailed information about errors coming back from schema registry

func (*Error) Error

func (e *Error) Error() string

type Message

type Message struct {
	SchemaId  int
	Topic     string
	Partition int32
	Offset    int64
	Key       string
	Value     string
}

type SASLConfig added in v1.0.1

type SASLConfig struct {
	Username  string
	Password  string
	TLSConfig *tls.Config
}

type SchemaRegistryClient

type SchemaRegistryClient struct {
	SchemaRegistryConnect []string

	SASL *SASLConfig
	// contains filtered or unexported fields
}

SchemaRegistryClient is a basic http client to interact with schema registry

func NewSchemaRegistryClient

func NewSchemaRegistryClient(connect []string, saslConfig *SASLConfig) *SchemaRegistryClient

NewSchemaRegistryClient creates a client to talk with the schema registry at the connect string By default it will retry failed requests (5XX responses and http errors) len(connect) number of times

func NewSchemaRegistryClientWithRetries

func NewSchemaRegistryClientWithRetries(connect []string, retries int, saslConfig *SASLConfig) *SchemaRegistryClient

NewSchemaRegistryClientWithRetries creates an http client with a configurable amount of retries on 5XX responses

func (*SchemaRegistryClient) CreateSubject

func (client *SchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)

CreateSubject adds a schema to the subject

func (*SchemaRegistryClient) DeleteSubject

func (client *SchemaRegistryClient) DeleteSubject(subject string) error

DeleteSubject deletes a subject. It should only be used in development

func (*SchemaRegistryClient) DeleteVersion

func (client *SchemaRegistryClient) DeleteVersion(subject string, version int) error

DeleteVersion deletes a subject. It should only be used in development

func (*SchemaRegistryClient) GetLatestSchema

func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)

GetLatestSchema returns a goavro.Codec for the latest version of the subject

func (*SchemaRegistryClient) GetSchema

func (client *SchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)

GetSchema returns a goavro.Codec by unique id

func (*SchemaRegistryClient) GetSchemaByVersion

func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)

GetSchemaByVersion returns a goavro.Codec for the version of the subject

func (*SchemaRegistryClient) GetSubjects

func (client *SchemaRegistryClient) GetSubjects() ([]string, error)

GetSubjects returns a list of all subjects in the schema registry

func (*SchemaRegistryClient) GetVersions

func (client *SchemaRegistryClient) GetVersions(subject string) ([]int, error)

GetVersions returns a list of the versions of a subject

func (*SchemaRegistryClient) IsSchemaRegistered

func (client *SchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)

IsSchemaRegistered tests if the schema is registered, if so it returns the unique id of that schema

type SchemaRegistryClientInterface

type SchemaRegistryClientInterface interface {
	GetSchema(int) (*goavro.Codec, error)
	GetSubjects() ([]string, error)
	GetVersions(string) ([]int, error)
	GetSchemaByVersion(string, int) (*goavro.Codec, error)
	GetLatestSchema(string) (*goavro.Codec, error)
	CreateSubject(string, *goavro.Codec) (int, error)
	IsSchemaRegistered(string, *goavro.Codec) (int, error)
	DeleteSubject(string) error
	DeleteVersion(string, int) error
}

SchemaRegistryClientInterface defines the api for all clients interfacing with schema registry

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL