broker

package module
v1.3.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2022 License: MIT Imports: 4 Imported by: 0

README

broker

go broker interface,you can use kafka,redis,pulsar etc.

pulsar in docker

run pulsar in docker
docker run -dit \
--name pulsar-sever \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.7.4 \
bin/pulsar standalone

pulsar-go

https://pulsar.apache.org/docs/zh-CN/client-libraries-go/

usage

For specific usage, refer to gpulsar/gredis test
kafka consumer groups require Version to be >= V0_10_2_0
if lower than V0_10_2_0, please use go-god/broker v1.1.2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DummyLogger = LoggerFunc(func(string, ...interface{}) {})

DummyLogger dummy logger writes nothing.

Functions

func ParseMessage

func ParseMessage(msg interface{}) ([]byte, error)

ParseMessage parse msg

func Recovery

func Recovery(logger Logger)

Recovery catch go runtime panic

Types

type Broker

type Broker interface {
	// Publish pub message to topic
	Publish(ctx context.Context, topic string, msg interface{}, opts ...PubOption) error

	// Subscribe sub message from topic + channel
	Subscribe(ctx context.Context, topic string, channel string, subHandler SubHandler, opts ...SubOption) error

	// Shutdown graceful shutdown broker
	Shutdown(ctx context.Context) error
}

Broker broker interface

type Logger

type Logger interface {
	Printf(string, ...interface{})
}

Logger is logger interface.

type LoggerFunc

type LoggerFunc func(string, ...interface{})

LoggerFunc is a bridge between Logger and any third party logger.

func (LoggerFunc) Printf

func (f LoggerFunc) Printf(msg string, args ...interface{})

Printf implements Logger interface.

type Option

type Option func(o *Options)

Option options functional option

func WithAuthToken

func WithAuthToken(token string) Option

WithAuthToken set broker token eg:pulsar broker

func WithBrokerAddress

func WithBrokerAddress(addrs ...string) Option

WithBrokerAddress set broker address

func WithBrokerPrefix

func WithBrokerPrefix(prefix string) Option

WithBrokerPrefix set broker prefix

func WithConnectionTimeout

func WithConnectionTimeout(t time.Duration) Option

WithConnectionTimeout set broker connection timeout

func WithConsumerAutoCommitInterval added in v1.3.8

func WithConsumerAutoCommitInterval(interval time.Duration) Option

WithConsumerAutoCommitInterval set consumer auto commit interval.

func WithGracefulWait

func WithGracefulWait(t time.Duration) Option

WithGracefulWait set sub graceful exit time

func WithListenerName

func WithListenerName(name string) Option

WithListenerName set broker listener name

func WithLogger

func WithLogger(logger Logger) Option

WithLogger set broker logger

func WithMaxConnectionsPerBroker

func WithMaxConnectionsPerBroker(num int) Option

WithMaxConnectionsPerBroker set max connection

func WithNoDataWaitSec

func WithNoDataWaitSec(sec int) Option

WithNoDataWaitSec no data wait second

func WithOperationTimeout

func WithOperationTimeout(t time.Duration) Option

WithOperationTimeout set broker op timeout

func WithPassword

func WithPassword(pwd string) Option

WithPassword set broker password

func WithRedisConf

func WithRedisConf(conf *RedisConf) Option

WithRedisConf with redis config

func WithUser

func WithUser(user string) Option

WithUser set broker user

type Options

type Options struct {
	Addrs    []string // client connection address list
	Prefix   string   // client mq prefix
	User     string   // user
	Password string   // password

	// ========pulsar mq===============
	// ListenerName Configure the net model for vpc user to connect the pulsar broker
	ListenerName string
	// AuthToken auth token
	AuthToken string
	// OperationTimeout operation timeout
	OperationTimeout time.Duration
	// ConnectionTimeout timeout for the establishment of a TCP connection (default: 10 seconds)
	ConnectionTimeout time.Duration

	// MaxConnectionsPerBroker the max number of connections to a single broker
	// that will keep in the pool. (Default: 1 connection)
	// this param for pulsar connection per broker
	MaxConnectionsPerBroker int

	// =======redis mq================
	RedisConf *RedisConf

	// graceful exit time
	GracefulWait time.Duration

	// no data wait second
	NoDataWaitSec int

	// ConsumerAutoCommitInterval consumer auto commit interval (default: 1s)
	ConsumerAutoCommitInterval time.Duration

	// Logger logger
	Logger Logger
}

Options broker option

type PubOption

type PubOption func(p *PublishOptions)

PubOption publish option

func WithDisableBatching

func WithDisableBatching() PubOption

WithDisableBatching disable batch publish

func WithPublishDelay

func WithPublishDelay(t time.Duration) PubOption

WithPublishDelay set publish delay time

func WithPublishName

func WithPublishName(name string) PubOption

WithPublishName set publish script name

func WithSendTimeout

func WithSendTimeout(t time.Duration) PubOption

WithSendTimeout set publish send msg timeout

type PublishOptions

type PublishOptions struct {
	// PublishDelay specifies the time period within which the messages sent will be batched (default: 10ms)
	// if message is enabled. If set to a non zero value, messages will be queued until this time
	// interval or until
	PublishDelay time.Duration

	// Name specifies a name for the producer.
	// if you use pulsar mq,if not assigned, the system will generate
	// a globally unique name which can be access with
	// Producer.ProducerName().
	//
	// for kafka publish message key
	// The partitioning key for this message. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Name string

	// DisableBatching controls whether automatic batching of messages is enabled for the producer.
	// By default batching is enabled.
	// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
	// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
	// messages will be compressed at the batch level, leading to a much better compression ratio
	// for similar headers or contents.
	// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
	// Setting `DisableBatching: true` will make the producer to send messages individually
	DisableBatching bool

	// SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent.
	// Send and SendAsync returns an error after timeout.
	// Default is 30 seconds, negative such as -1 to disable.
	SendTimeout time.Duration
}

PublishOptions publish message option

type RedisConf

type RedisConf struct {
	// host:port address.
	Address string

	// Optional password. Must match the password specified in the
	// require pass server configuration option.
	Password string

	// Database to be selected after connecting to the server.
	DB int

	// Maximum number of retries before giving up.
	// Default is to not retry failed commands.
	MaxRetries int

	// Dial timeout for establishing new connections.
	// Default is 5 seconds.
	DialTimeout time.Duration

	// Timeout for socket reads. If reached, commands will fail
	// with a timeout instead of blocking. Use value -1 for no timeout and 0 for default.
	// Default is 3 seconds.
	ReadTimeout time.Duration

	// Timeout for socket writes. If reached, commands will fail
	// with a timeout instead of blocking.
	// Default is ReadTimeout.
	WriteTimeout time.Duration

	// Maximum number of socket connections.
	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
	PoolSize int

	// Amount of time client waits for connection if all connections
	// are busy before returning an error.
	// Default is ReadTimeout + 1 second.
	PoolTimeout time.Duration

	// Minimum number of idle connections which is useful when establishing
	// new connection is slow.
	MinIdleConns int

	// Amount of time after which client closes idle connections.
	// Should be less than server's timeout.
	// Default is 5 minutes. -1 disables idle timeout check.
	IdleTimeout time.Duration

	// Connection age at which client retires (closes) the connection.
	// go redis Default is to not close aged connections
	// but 1800s is recommended.
	MaxConnAge time.Duration
}

RedisConf redis client config

type SubHandler

type SubHandler func(ctx context.Context, value []byte) error

SubHandler subscribe func

type SubOption

type SubOption func(s *SubscribeOptions)

SubOption subscribe option

func WithCommitOffsetBlock added in v1.3.7

func WithCommitOffsetBlock() SubOption

WithCommitOffsetBlock commit offset block when message consumer.

func WithMessageChannel

func WithMessageChannel() SubOption

WithMessageChannel set sub message channel

func WithMessageChannelSize

func WithMessageChannelSize(size int) SubOption

WithMessageChannelSize set sub message channel size

func WithSubConcurrencySize

func WithSubConcurrencySize(size int) SubOption

WithSubConcurrencySize set subscribe size

func WithSubInterval

func WithSubInterval(t time.Duration) SubOption

WithSubInterval set sub interval

func WithSubKeyHandlers added in v1.3.6

func WithSubKeyHandlers(keyHandlers map[string]SubHandler) SubOption

WithSubKeyHandlers set sub key => subHandler map

func WithSubName

func WithSubName(name string) SubOption

WithSubName set sub name

func WithSubOffset

func WithSubOffset(offset int64) SubOption

WithSubOffset set sub offset

func WithSubRetryEnable

func WithSubRetryEnable() SubOption

WithSubRetryEnable set sub retry

func WithSubType

func WithSubType(t SubscriptionType) SubOption

WithSubType set subType

type SubscribeOptions

type SubscribeOptions struct {
	// specifies the consumer name
	Name string

	// KeyHandlers for kafka consumer message key handler map
	// for redis sub,you can specify different message subscriber functions to handle msg.
	KeyHandlers map[string]SubHandler

	// Receive messages from channel. The channel returns a struct which contains message and the consumer from where
	// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
	// shared across multiple consumers as well
	MessageChannel     bool // default:false
	MessageChannelSize int  // default:100

	// subscribe concurrency count,default:1
	// Note: this param for redis or pulsar consumer message
	ConcurrencySize int

	Offset int64

	// Commit the offset to the backend for kafka
	// Note: calling Commit performs a blocking synchronous operation.
	CommitOffsetBlock bool

	// SubInterval subscribe interval,default:0
	SubInterval time.Duration

	// ===========pulsar mq=======
	// subType specifies the subscription type to be used when subscribing to a topic.
	// Default is `Shared` 1:N
	// Exclusive there can be only 1 consumer on the same topic with the same subscription name
	//
	// Shared 1:N
	// Shared subscription mode, multiple consumer will be able to use the same subscription name
	// and the messages will be dispatched according to
	//
	// Failover subscription mode, multiple consumer will be able to use the same subscription name
	// but only 1 consumer will receive the messages.
	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
	SubType SubscriptionType

	// ReceiverQueueSize sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
	// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is `1000` messages and should be good for most use cases.
	ReceiverQueueSize int

	// retryEnable for pulsar sub RetryEnable
	RetryEnable bool
}

SubscribeOptions subscribe message option

type SubscriptionType

type SubscriptionType int

SubscriptionType of subscription supported by Pulsar

const (
	// Exclusive there can be only 1 consumer on the same topic with the same subscription name
	Exclusive SubscriptionType = iota

	// Shared subscription mode, multiple consumer will be able to use the same subscription name
	// and the messages will be dispatched according to
	// a round-robin rotation between the connected consumers
	Shared

	// Failover subscription mode, multiple consumer will be able to use the same subscription name
	// but only 1 consumer will receive the messages.
	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
	Failover

	// KeyShared subscription mode, multiple consumer will be able to use the same
	// subscription and all messages with the same key will be dispatched to only one consumer
	KeyShared
)

Directories

Path Synopsis
Package backoff provides backoff functionality
Package backoff provides backoff functionality

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL