dms

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

devcloud-go/dms

Feature
  1. support asynchronous consume kafka message and ensure message not lost.
  2. support consumption speed-limiting.
QuickStart
  1. First you need implement the OffsetPersist interface which is defined in offset_persist.go, the create_table sql see example/create_table.sql.
type OffsetPersist interface {
	Find(groupId, topic string, partition int) (int64, error)
	Save(groupId, topic string, partition int, offset int64) error
}
  1. Then you need implement the message Handler which is defined in method_info.go#L30
type BizHandler func(msg *sarama.ConsumerMessage) error
  1. Create a props for dms consumer, there are several modes of props, async and sync, you also can specify how to commit offset, interval or quantitative by set CommitInterval or CommitSize.
  • async: consume messages asynchronous
  • sync: consume messages synchronous
  1. Create a dms consumer to consume kafka messages.

See details in package example.

Note
  1. when using async mode, the pool size should be larger than topic*partition numbers.

Documentation

Overview

Package dms implements a kafka consumer based on sarama, user can consume messages asynchronous or synchronous with dms, and ensure message not lost.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BizHandler

type BizHandler func(msg *sarama.ConsumerMessage) error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(ctx context.Context, methods []MethodInfo, propertiesMap map[string]*Properties, offsetPersist OffsetPersist) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) Consume

func (c *Consumer) Consume()

type DmsHandler

type DmsHandler struct {
	// contains filtered or unexported fields
}

func NewDmsHandler

func NewDmsHandler(
	ctx context.Context,
	methodInfo MethodInfo,
	pool *ants.Pool,
	limiter *rate.Limiter,
	properties *Properties,
	offsetPersist OffsetPersist) (*DmsHandler, error)

func (*DmsHandler) AddTopicToMethod

func (h *DmsHandler) AddTopicToMethod(method MethodInfo)

func (*DmsHandler) Cleanup

func (h *DmsHandler) Cleanup(sess sarama.ConsumerGroupSession) error

func (*DmsHandler) Close

func (h *DmsHandler) Close() error

func (*DmsHandler) ConsumeClaim

func (*DmsHandler) OnConsume

func (h *DmsHandler) OnConsume(msg *sarama.ConsumerMessage)

func (*DmsHandler) Setup

Setup implements ConsumerGroupHandler interface, when set disable auto commit, Setup will obtain a valid offset of groupId-topic-partition from kafka broker, db and the broker's beginning offset.

func (*DmsHandler) Start

func (h *DmsHandler) Start(wg *sync.WaitGroup)

type MethodInfo

type MethodInfo struct {
	GroupId  string
	Topics   []string
	BizGroup string
	Method   BizHandler
}

func (*MethodInfo) GetUniqueKey

func (m *MethodInfo) GetUniqueKey() string

type OffsetManager

type OffsetManager struct {
	// contains filtered or unexported fields
}

func NewOffsetManager

func NewOffsetManager(startOffset int64, blockCapacity, partition int, groupId, topic string, version byte) *OffsetManager

type OffsetNode

type OffsetNode struct {
	// contains filtered or unexported fields
}

func NewOffsetNode

func NewOffsetNode(capacity int) *OffsetNode

type OffsetPersist

type OffsetPersist interface {
	Find(groupId, topic string, partition int) (int64, error)
	Save(groupId, topic string, partition int, offset int64) error
}

type Properties

type Properties struct {
	Addrs           []string
	Async           bool
	OffsetBlockSize int
	BizRetryTimes   int
	LimitPerSecond  int

	// goroutine pool size
	PoolSize     int
	PoolTaskSize int

	SaramaConfig  *sarama.Config
	InitialOffset int64

	CommitSize     int // default partitionCount*OffsetBlockSize
	AutoCommit     bool
	CommitInterval time.Duration
}

func NewProperties

func NewProperties() *Properties

NewProperties return a default dms properties

func (*Properties) Clone

func (p *Properties) Clone() *Properties

Directories

Path Synopsis
Package example provides an example for user how to use dms.
Package example provides an example for user how to use dms.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL