kafkaconsumer

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2020 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	//StatusOn 启用
	StatusOn = "on"
	//StatusOff 关闭
	StatusOff = "off"
)
View Source
const MIN_CONSUMER_COUNT = 3
View Source
const RETRY_CONSUMER_NUM = 3
View Source
const RETRY_PRODUCER_NUM = 3

Variables

This section is empty.

Functions

This section is empty.

Types

type Handler

type Handler struct {
}

func (*Handler) Deal

func (this *Handler) Deal(broker IBroker, partition int32, Offset int64, Key string, Value []byte, ext interface{}) bool

type HandlerFunc

type HandlerFunc struct {
	MessageKey string
	Func       func([]byte) error
}

type IBroker

type IBroker struct {
	Topic     string
	FailTopic string
	KafkaHost []string
}

type IConsumer

type IConsumer struct {
	Cnt     int
	GroupId string
}

type IKafka

type IKafka struct {
	// contains filtered or unexported fields
}

func NewIKafka

func NewIKafka(callback IKafkaCallback) (this *IKafka)

func (*IKafka) Close

func (this *IKafka) Close()

func (*IKafka) Run

func (this *IKafka) Run(brokers []IBroker, consumer *IConsumer)

type IKafkaCallback

type IKafkaCallback func(broker IBroker, partition int32, offset int64, Key string, Value []byte, ext interface{}) bool

type OptionFunc

type OptionFunc func(*Options)

func ConsumerCnt

func ConsumerCnt(c int) OptionFunc

func FailTopic

func FailTopic(f string) OptionFunc

func GroupName

func GroupName(n string) OptionFunc

func KafkaHost

func KafkaHost(h string) OptionFunc

func Topic

func Topic(t string) OptionFunc

type Options

type Options struct {
	KafkaHost   string `ini:"kafkahost"`
	Topic       string `ini:"topic"`
	FailTopic   string `ini:"failtopic"`
	ConsumerCnt int    `ini:"consumerCnt"`
	GroupName   string `ini:"groupName"`
}

Options server options

func DefaultOptions

func DefaultOptions() Options

DefaultOptions default config

type Server

type Server struct {
	Opts Options
	// contains filtered or unexported fields
}

Server struct

func NewServer

func NewServer(options ...OptionFunc) *Server

NewServer get server instance

func NewServerWithOptions

func NewServerWithOptions(opts Options) *Server

NewServerWithOptions with options

func (*Server) AddAfterServerStopFunc

func (srv *Server) AddAfterServerStopFunc(fns ...bootstrap.AfterServerStopFunc)

AddAfterServerStopFunc add after function

func (*Server) AddBeforeServerStartFunc

func (srv *Server) AddBeforeServerStartFunc(fns ...bootstrap.BeforeServerStartFunc)

AddBeforeServerStartFunc add before function

func (*Server) ConfigureOptions

func (srv *Server) ConfigureOptions(options ...OptionFunc)

ConfigureOptions 更新配置

func (*Server) InjectHandleFuncs

func (srv *Server) InjectHandleFuncs(funcs []HandlerFunc)

注入消息处理方法

func (*Server) Server

func (srv *Server) Server() *IKafka

Server 获取rpcx server

func (*Server) Start

func (srv *Server) Start() error

Start 初始化各种插件

func (*Server) Stop

func (srv *Server) Stop()

Stop 平滑关闭

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL