gonsq

package module
v0.0.0-...-e2ed998 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2020 License: MIT Imports: 7 Imported by: 0

README

Gonsq

Work In Progress

Gonsq is a wrapper of go-nsq library.

This library is inspired by how Lyft Building an Adaptive, Multi-Tenant Stream Bus with Kafka and Golang and Flow Control architecture in envoy proxy.

Nsqio

Gonsq is not using standard nsq.Consumer and nsq.Producer, instead the library provides NSQConsumer and NSQProducer object to communicate directly with nsq.Consumer and nsq.Producer.

Some properties also added to the NSQConsumer object, for example concurrency. The concurrency information is used to inform the Gonsq about how many concurrent consumers that a given NSQConsumer want to run.

Design

Gonsq implements its own flow control on top of go-nsq library by using buffered channel. The messages that delivered to buffered channels, then consumed by internal gonsq-handler` which run using goroutines. This mechanism gives gonsq a way to communicate with each concurrent handlers to be able to control the queue flow.

gonsq-design

Stats

The library is exposing some metrics for internal usage and might be useful for the user to send the metrics to some monitoring backend. The stats object is available through nsq.Message struct and passed to the message handler.

The exposed metrics are:

  • Total Message Count: The total count of messages consumed by particular worker of topic and channel.
  • Total Error Count: The total count of error happens in particular worker of topic and channel.
  • Total Message In Buffer Count: The total count of buffer used in particular worker of topic and channel. This stat is used to determine whether a throttling mechanism need to be triggered or not.
  • Total Buffer Length: The total length of buffer available for particular worker of topic and channel
  • Total Concurrency: The total number of concurrency/woker for particular worker of topic and channel
  • Total Worker Count: The current total number of worker for particular worker of topic and channel. This stat will be useful if we have a mechanism to reduce/increase the number of worker based on condition. For now, this is used to determine the number of worker on startup and shutdown.
  • Throttled: The status of particular topic and channel, is the consumer is being throttled or not.

How To Use The Library

To use this library, the consumer must be created using nsq/nsqio.

TODO

  • DNS: make it possible to specify a single addresss with host or single/multiple address with IP. If a single host is given, then resolve to host.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidConcurrencyConfiguration happens when concurrency configuration number is not
	// as expected. The configuration is checked when adding new consumer.
	ErrInvalidConcurrencyConfiguration = errors.New("gonsq: invalid concurrency configuration")
	// ErrLookupdsAddrEmpty happens when NSQ lookupd address is empty when wrapping consumers.
	ErrLookupdsAddrEmpty = errors.New("gonsq: lookupds addresses is empty")
	// ErrTopicWithChannelNotFound for error when channel and topic is not found.
	ErrTopicWithChannelNotFound = errors.New("gonsq: topic and channel not found")
	// ErrStopDeadlineExceeded heppens when stop time exceeds context deadline time.
	ErrStopDeadlineExceeded = errors.New("gonsq: stop deadline exceeded")
)

Functions

This section is empty.

Types

type CompressionConfig

type CompressionConfig struct {
	Deflate      bool `toml:"deflate" yaml:"deflate"`
	DeflateLevel int  `toml:"deflate_level" yaml:"deflate_level"`
	Snappy       bool `toml:"snappy" yaml:"snappy"`
}

CompressionConfig to support compression

func (*CompressionConfig) Validate

func (cm *CompressionConfig) Validate() error

type ConcurrencyConfig

type ConcurrencyConfig struct {
	// Concurrency is the number of worker/goroutines intended for handling incoming/consumed messages.
	Concurrency int
	// MaxInFlight is sort of comparable to the TCP "window size", it controls how many messages nsqd will send
	// to the consumer before hearing back about any of them.
	// This description is taken from https://github.com/nsqio/go-nsq/issues/221#issuecomment-352865779.
	// Note that MaxInFlight number is the number per concurrent job. At the end, the length of buffered channel is
	// MaxInflight * Concurrency.
	MaxInFlight int
}

ConcurrencyConfig control the concurrency flow in gonsq. Concurrency and MaxInFlight are combined to determine the number of buffered channel. This number then affect how the library throttle the message consumption.

func (ConcurrencyConfig) IsEmpty

func (cc ConcurrencyConfig) IsEmpty() bool

func (*ConcurrencyConfig) Validate

func (cc *ConcurrencyConfig) Validate() error

Validate the value of concurrency config, if some value is not exist then set the default value.

type Config

type Config struct {
	Hostname string
	// This must be less than Timeout.WriteTimeout
	HeartbeatInterval time.Duration
	Lookupd           LookupdConfig
	Timeout           TimeoutConfig
	Queue             QueueConfig
	Compression       CompressionConfig
}

Config of nsqio

func (*Config) Validate

func (c *Config) Validate() error

type ConsumerClient

type ConsumerClient interface {
	Topic() string
	Channel() string
	Stop()
	AddHandler(handler nsqio.Handler)
	AddConcurrentHandlers(handler nsqio.Handler, concurrency int)
	ConnectToNSQLookupds(addresses []string) error
	ChangeMaxInFlight(n int)
	Concurrency() int
	MaxInFlight() int
}

ConsumerClient is he consumer client of NSQ. This backend implements all communication protocol to lookupd and nsqd servers.

type ConsumerManager

type ConsumerManager struct {

	// Default functions for throttling.
	OpenThrottleFunc   func(*Stats) bool
	BreaKThrottleFunc  func(*Stats) bool
	LoosenThrottleFunc func(*Stats) bool
	// contains filtered or unexported fields
}

ConsumerManager manage the consumer flow control. The ConsumerManager manages multiple nsq consumers client, and expose apis for message handler to handle the incoming messages. The ConsumerManager also manage the lifecycle of the nsq consumers client and the concurrent handlers(start and stop).

func ManageConsumers

func ManageConsumers(clients ...ConsumerClient) (*ConsumerManager, error)

ManageConsumers creates a new ConsumerManager

func NewConsumerManager

func NewConsumerManager(options *ConsumerManagerOptions) (*ConsumerManager, error)

NewConsumerManager create a new manager for NSQs consumers. The function takes options for managing the consumers.

func (*ConsumerManager) AddConsumer

func (c *ConsumerManager) AddConsumer(topic, channel string, client ConsumerClient) error

AddConsumer add consumer to ConsumerManager

func (*ConsumerManager) AddConsumers

func (c *ConsumerManager) AddConsumers(clients ...ConsumerClient) error

AddConsumers add more consumers to the consumer object.

func (*ConsumerManager) Handle

func (c *ConsumerManager) Handle(topic, channel string, handler HandlerFunc)

Handle to register the message handler function. Only for reigstering the message handler into the consumer.

func (*ConsumerManager) Start

func (c *ConsumerManager) Start(lookupdsAddr []string) error

Start for start the consumer. This will trigger all workers to start.

func (*ConsumerManager) Stop

func (c *ConsumerManager) Stop(ctx context.Context) (err error)

Stop for stopping all the nsq consumer.

func (*ConsumerManager) Use

func (c *ConsumerManager) Use(middleware ...MiddlewareFunc)

Use middleware, this should be called before handle function this function will avoid to add the same middleware twice if the same middleware is used, it will skip the addition.

type ConsumerManagerOptions

type ConsumerManagerOptions struct {
	OpenThrottleFunc   func(*Stats) bool
	LoosenThrottleFunc func(*Stats) bool
	BreakThrottleFunc  func(*Stats) bool
}

ConsumerManagerOptions is a set of options for consumer manager.

type Handler

type Handler interface {
	// HandleMessage implements nsqio.Handler to directly consume
	// messages from nsqio client. HandleMessage then pass the message
	// into internal buffered channel, so the message can be consumed
	// by Consume method.
	HandleMessage(message *nsqio.Message)
	// Consume starts the worker goroutines, to consume messages
	// in the buffered channel.
	Consume() error
	// Stop stops all worker goroutines.
	Stop()
}

Handler handle messages from NSQD and pass the message into the message handler via channel.

Handler implements nsqio.Handler to consume the message from NSQD(via nsqio/go-nsq) and pass the message via Channel. The messages from channel is consumed via worker goroutines that runs when Consume method is called. The handler responsible to stop the worker goroutines via Stop method.

type HandlerFunc

type HandlerFunc func(ctx context.Context, message *Message) error

HandlerFunc for nsq

type LookupdConfig

type LookupdConfig struct {
	PoolInterval time.Duration `toml:"pool_interval" yaml:"pool_interval"`
	PollJitter   float64       `toml:"pool_jitter" yaml:"pool_jitter"`
}

LookupdConfig for lookupd configuration

func (*LookupdConfig) Validate

func (ld *LookupdConfig) Validate() error

type Message

type Message struct {
	*nsqio.Message
	Topic   string
	Channel string
	Stats   *Stats
}

Message for nsq

type MiddlewareFunc

type MiddlewareFunc func(handler HandlerFunc) HandlerFunc

MiddlewareFunc for nsq middleware

type NSQConsumer

type NSQConsumer struct {
	// contains filtered or unexported fields
}

NSQConsumer backend

func NewConsumer

func NewConsumer(ctx context.Context, config NSQConsumerConfig) (*NSQConsumer, error)

NewConsumer for nsq

func (*NSQConsumer) AddConcurrentHandlers

func (nc *NSQConsumer) AddConcurrentHandlers(handler nsqio.Handler, concurrency int)

AddConcurrentHandlers add concurrent handler to nsq

func (*NSQConsumer) AddHandler

func (nc *NSQConsumer) AddHandler(handler nsqio.Handler)

AddHandler to nsq

func (*NSQConsumer) ChangeMaxInFlight

func (nc *NSQConsumer) ChangeMaxInFlight(n int)

ChangeMaxInFlight will change max in flight number in nsq consumer

func (*NSQConsumer) Channel

func (nc *NSQConsumer) Channel() string

Channel return the channel of consumer

func (*NSQConsumer) Concurrency

func (nc *NSQConsumer) Concurrency() int

Concurrency return the concurrency number for a given consumer

func (*NSQConsumer) ConnectToNSQLookupds

func (nc *NSQConsumer) ConnectToNSQLookupds(addresses []string) error

ConnectToNSQLookupds connecting to several nsq lookupd

func (*NSQConsumer) MaxInFlight

func (nc *NSQConsumer) MaxInFlight() int

MaxInFlight return the max in flight number for a given consumer

func (*NSQConsumer) Stop

func (nc *NSQConsumer) Stop()

Stop nsq consumer

func (*NSQConsumer) Topic

func (nc *NSQConsumer) Topic() string

Topic return the topic of consumer

type NSQConsumerConfig

type NSQConsumerConfig struct {
	Hostname string
	Topic    string
	Channel  string
	// This must be less than Timeout.WriteTimeout
	HeartbeatInterval time.Duration
	Lookupd           LookupdConfig
	Timeout           TimeoutConfig
	Queue             QueueConfig
	Compression       CompressionConfig
	Concurrency       ConcurrencyConfig
}

NSQConsumerConfig for nsq consumer

func (*NSQConsumerConfig) Validate

func (cf *NSQConsumerConfig) Validate() error

Validate consumer configuration

type NSQProducer

type NSQProducer struct {
	// contains filtered or unexported fields
}

NSQProducer backend

func NewProducer

func NewProducer(ctx context.Context, config ProducerConfig) (*NSQProducer, error)

NewProducer return a new producer

func (*NSQProducer) MultiPublish

func (np *NSQProducer) MultiPublish(topic string, body [][]byte) error

MultiPublish to nsqd

func (*NSQProducer) Ping

func (np *NSQProducer) Ping() error

Ping the nsqd of producer

func (*NSQProducer) Publish

func (np *NSQProducer) Publish(topic string, body []byte) error

Publish to nsqd

func (*NSQProducer) Stop

func (np *NSQProducer) Stop()

Stop the nsq producer

type ProducerClient

type ProducerClient interface {
	Ping() error
	Publish(topic string, body []byte) error
	MultiPublish(topic string, body [][]byte) error
	Stop()
}

ProducerClient is the producer client of NSQ. This backend implements all communication protocol to nsqd servers.

type ProducerConfig

type ProducerConfig struct {
	Hostname    string
	Address     string
	Compression CompressionConfig
	Timeout     TimeoutConfig
}

ProducerConfig struct

type ProducerManager

type ProducerManager struct {
	// contains filtered or unexported fields
}

ProducerManager manage the producer flow. If a given topic is not available in the manager, the producer will return a failure message.

func ManageProducers

func ManageProducers(backend ProducerClient, topics ...string) (*ProducerManager, error)

ManageProducers is a function to wrap the nsq producer. The function receive topics parameters because in NSQ, we can publish message without registering any new topics. This sometimes can be problematic as we don't have a list of topics that we will publish the message to.

func (*ProducerManager) MultiPublish

func (p *ProducerManager) MultiPublish(topic string, body [][]byte) error

MultiPublish message to nsqd, ifa given topic does not exists, then return error.

func (*ProducerManager) Publish

func (p *ProducerManager) Publish(topic string, body []byte) error

Publish message to nsqd, if a given topic does not exists, then return error.

type QueueConfig

type QueueConfig struct {
	MsgTimeout          time.Duration `toml:"message_timeout" yaml:"message_timeout"`
	MaxRequeueDelay     time.Duration `toml:"max_requeue_delay" yaml:"max_requeue_delay"`
	DefaultRequeueDelay time.Duration `toml:"default_requeue_delay" yaml:"default_requeue_delay"`
}

QueueConfig for message configuration. In the queue config, MaxInFlight is excluded because the MaxInFlight configuration will depends on the buffer length.

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

Stats object to be included in every nsq consumer worker to collect statuses of nsq consumers.

func (*Stats) BufferLength

func (s *Stats) BufferLength() int

BufferLength return length of the buffer used in a message handler

func (*Stats) Concurrency

func (s *Stats) Concurrency() int

Concurrency return the number of concurrency in a handler.

func (*Stats) ErrorCount

func (s *Stats) ErrorCount() uint64

ErrorCount return the total number of error when handle nsq message.

func (*Stats) MaxInFlight

func (s *Stats) MaxInFlight() int

MaxInFlight return the number of maxInFlight used to calculate buffer length

func (*Stats) MessageCount

func (s *Stats) MessageCount() uint64

MessageCount return the total number of messages retrieved from NSQ.

func (*Stats) MessageInBuffer

func (s *Stats) MessageInBuffer() int64

MessageInBuffer return the total number of messages in buffer

func (*Stats) Throttle

func (s *Stats) Throttle() ThrottleStats

Throttle return whether the consumer/producer is being throttled or not.

func (*Stats) ThrottleCount

func (s *Stats) ThrottleCount() int64

ThrottleCount return the total number of throttle happened.

func (*Stats) Worker

func (s *Stats) Worker() int64

Worker return the current number of worker in a message handler.

type ThrottleStats

type ThrottleStats int32

ThrottleStats is the indicator of throttling. 0 = no throttle. 1 = throttle loosen. 2 = throttled.

func (ThrottleStats) IsThrottleLoosen

func (t ThrottleStats) IsThrottleLoosen() bool

IsThrottleLoosen return true if throttle loosen.

func (ThrottleStats) IsThrottled

func (t ThrottleStats) IsThrottled() bool

IsThrottled return true if throttle is on.

type TimeoutConfig

type TimeoutConfig struct {
	Dial           time.Duration `toml:"dial" yaml:"dial"`
	Read           time.Duration `toml:"read" yaml:"read"`
	Write          time.Duration `toml:"write" yaml:"write"`
	MessageTimeout time.Duration `toml:"message" yaml:"message"`
}

TimeoutConfig for timeout configuration

func (*TimeoutConfig) Validate

func (tm *TimeoutConfig) Validate() error

Directories

Path Synopsis
example
middleware

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL