kafka

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

README

KafkaPubsub

move by briansamuel/kafpubsub helper use concurrent to managerment job test.

Go Reference

Install
 go get github.com/briansamuel/kafpubsub
Usage

Library have 3 modules publisher, consumer, engine subscriber. For example to set up:

  • Create appContext Struct
type appCtx struct {
	kafkaPs        PubSub
}

func NewAppContext(kafkaPs PubSub) *appCtx {
	return &appCtx{kafkaPs: kafkaPs}
}

func (ctx *appCtx) GetKafka() PubSub { return ctx.kafkaPs }
  • Create Handle for each Topic Subscriber

func Topic1HandleFunction(appCtx AppContext) consumerJob {
	return consumerJob{
		Title: "Topic1HandleFunction",
		Hdl: func(ctx context.Context, msg *Message) error {

			fmt.Sprintf("Message %s of topic %s",msg.Data(), msg.Chanel())
			return nil
		},
	}
}

func Topic2HandleFunction(appCtx AppContext) consumerJob {
	return consumerJob{
		Title: "Topic2HandleFunction",
		Hdl: func(ctx context.Context, msg *Message) error {

			fmt.Sprintf("Message %s of topic %s",msg.Data(), msg.Chanel())
			return nil
		},
	}
}
  • Start subscriber engine

func (sb *subscriber) Start() {

	err := sb.InitConsumer()
	if err != nil {
		log.Print(err)

	}
	sb.Setup()

}

func (sb *subscriber) Setup() {

	sb.startSubTopic(
		"topic-1",
		true,
		Topic1HandleFunction(sb.appCtx),
	)

	// Follow topic in queue
	sb.startSubTopic(
		"topic-2",
		true,
		Topic2HandleFunction(sb.appCtx),
	)

}
  • Test Init Kafka Client and Subscriber Start Example

func TestStartKafkaClient(t *testing.T) {
	brokers := strings.Split("0.0.0.0:9092", ",")
	client := NewClient(brokers)
	appCtx := NewAppContext(client)
	NewSubscriber(appCtx).Start()
	
	
}
  • Publish Message With Topic

func TestPublishMessage(t *testing.T) {
	Publish(common.TopicUserDislikeRestaurant, data)
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NUM_PARTITION      = 3
	REPLICATION_FACTOR = 1
)

Functions

func BodyParse

func BodyParse(bin []byte, p interface{}) error

func NewKafkaPubSub

func NewKafkaPubSub(brokerUrl ...string) *kafkaPubSub

func NewSubscriber

func NewSubscriber(appCtx AppContext) *subscriber

func ToInt32

func ToInt32(in *int32) int32

func ToPInt32

func ToPInt32(in int32) *int32

Types

type AppContext

type AppContext interface {
	GetKafka() PubSub
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(brokerURLs []string) *Client

func (*Client) Close

func (ps *Client) Close() error

func (*Client) InitConsumer

func (ps *Client) InitConsumer(brokerURLs ...string) error

func (*Client) InitConsumerGroup

func (ps *Client) InitConsumerGroup(consumerGroup string) error

func (*Client) InitPublisher

func (ps *Client) InitPublisher(brokerURLs ...string)

InitPublisher init with addr is url of lookupd

func (*Client) ListTopics

func (ps *Client) ListTopics(brokers ...string) ([]string, error)

func (*Client) OnAsyncSubscribe

func (ps *Client) OnAsyncSubscribe(topics []*Topic, numberPuller int, buf chan *Message) error

func (*Client) OnScanMessages

func (ps *Client) OnScanMessages(topics []string, bufMessage chan Message) error

func (*Client) Publish

func (ps *Client) Publish(topic string, messages ...interface{}) error

Publish sync publish message

type ConsumerGroupHandle

type ConsumerGroupHandle struct {
	// contains filtered or unexported fields
}

ConsumerGroupHandle represents a Sarama consumer group consumer

func (*ConsumerGroupHandle) Cleanup

func (consumer *ConsumerGroupHandle) Cleanup(ss sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerGroupHandle) ConsumeClaim

func (consumer *ConsumerGroupHandle) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumerGroupHandle) Setup

type ConsumerJob

type ConsumerJob struct {
	Title string
	Hdl   func(ctx context.Context, msg *Message) error
}

type GroupJob

type GroupJob interface {
	Run(ctx context.Context) error
}

type Message

type Message struct {
	Id            string
	Offset        int64  `json:"offset,omitempty"`
	Partition     int    `json:"partition,omitempty"`
	Topic         string `json:"topic,omitempty"`
	Body          []byte `json:"body,omitempty"`
	Timestamp     int64  `json:"timestamp,omitempty"`
	ConsumerGroup string `json:"consumer_group,omitempty"`
	Commit        func()
	Headers       map[string]string
}

Message define message encode/decode sarama message

func NewMessage

func NewMessage(data interface{}) *Message

func (*Message) Chanel

func (evt *Message) Chanel() string

func (*Message) Data

func (evt *Message) Data() interface{}

func (*Message) SetChanel

func (evt *Message) SetChanel(chanel string)

func (*Message) String

func (evt *Message) String() string

type PubSub

type PubSub interface {
	InitialClient(topics ...string) error
	Publish(ctx context.Context, topic string, data interface{}) error
	Subscribe(ctx context.Context, topic string) (ch <-chan *Message, close func())
}

type SenderConfig

type SenderConfig struct {
	Metadata interface{}
	Headers  map[string]string
}

SenderConfig addion config when publish message

type Subscriber

type Subscriber interface {
	Start() error
	StartSubTopic(topic string, isConcurrency bool, consumerJobs ...ConsumerJob)
	GetAppContext() AppContext
}

type Topic

type Topic struct {
	Name                    string
	AutoCommit              bool
	Partition               *int32
	IsNeedManualCreateTopic bool
}

type User added in v1.0.1

type User struct {
	Name string
	Age  int
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL