Documentation ¶
Index ¶
- Variables
- type CompressionConfig
- type ConcurrencyConfig
- type Config
- type ConsumerClient
- type ConsumerManager
- func (c *ConsumerManager) AddConsumer(topic, channel string, client ConsumerClient) error
- func (c *ConsumerManager) AddConsumers(clients ...ConsumerClient) error
- func (c *ConsumerManager) Handle(topic, channel string, handler HandlerFunc)
- func (c *ConsumerManager) Start(lookupdsAddr []string) error
- func (c *ConsumerManager) Stop(ctx context.Context) (err error)
- func (c *ConsumerManager) Use(middleware ...MiddlewareFunc)
- type ConsumerManagerOptions
- type Handler
- type HandlerFunc
- type LookupdConfig
- type Message
- type MiddlewareFunc
- type NSQConsumer
- func (nc *NSQConsumer) AddConcurrentHandlers(handler nsqio.Handler, concurrency int)
- func (nc *NSQConsumer) AddHandler(handler nsqio.Handler)
- func (nc *NSQConsumer) ChangeMaxInFlight(n int)
- func (nc *NSQConsumer) Channel() string
- func (nc *NSQConsumer) Concurrency() int
- func (nc *NSQConsumer) ConnectToNSQLookupds(addresses []string) error
- func (nc *NSQConsumer) MaxInFlight() int
- func (nc *NSQConsumer) Stop()
- func (nc *NSQConsumer) Topic() string
- type NSQConsumerConfig
- type NSQProducer
- type ProducerClient
- type ProducerConfig
- type ProducerManager
- type QueueConfig
- type Stats
- func (s *Stats) BufferLength() int
- func (s *Stats) Concurrency() int
- func (s *Stats) ErrorCount() uint64
- func (s *Stats) MaxInFlight() int
- func (s *Stats) MessageCount() uint64
- func (s *Stats) MessageInBuffer() int64
- func (s *Stats) Throttle() ThrottleStats
- func (s *Stats) ThrottleCount() int64
- func (s *Stats) Worker() int64
- type ThrottleStats
- type TimeoutConfig
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidConcurrencyConfiguration happens when concurrency configuration number is not // as expected. The configuration is checked when adding new consumer. ErrInvalidConcurrencyConfiguration = errors.New("gonsq: invalid concurrency configuration") // ErrLookupdsAddrEmpty happens when NSQ lookupd address is empty when wrapping consumers. ErrLookupdsAddrEmpty = errors.New("gonsq: lookupds addresses is empty") // ErrTopicWithChannelNotFound for error when channel and topic is not found. ErrTopicWithChannelNotFound = errors.New("gonsq: topic and channel not found") // ErrStopDeadlineExceeded heppens when stop time exceeds context deadline time. ErrStopDeadlineExceeded = errors.New("gonsq: stop deadline exceeded") )
Functions ¶
This section is empty.
Types ¶
type CompressionConfig ¶
type CompressionConfig struct { Deflate bool `toml:"deflate" yaml:"deflate"` DeflateLevel int `toml:"deflate_level" yaml:"deflate_level"` Snappy bool `toml:"snappy" yaml:"snappy"` }
CompressionConfig to support compression
func (*CompressionConfig) Validate ¶
func (cm *CompressionConfig) Validate() error
type ConcurrencyConfig ¶
type ConcurrencyConfig struct { // Concurrency is the number of worker/goroutines intended for handling incoming/consumed messages. Concurrency int // MaxInFlight is sort of comparable to the TCP "window size", it controls how many messages nsqd will send // to the consumer before hearing back about any of them. // This description is taken from https://github.com/nsqio/go-nsq/issues/221#issuecomment-352865779. // Note that MaxInFlight number is the number per concurrent job. At the end, the length of buffered channel is // MaxInflight * Concurrency. MaxInFlight int }
ConcurrencyConfig control the concurrency flow in gonsq. Concurrency and MaxInFlight are combined to determine the number of buffered channel. This number then affect how the library throttle the message consumption.
func (ConcurrencyConfig) IsEmpty ¶
func (cc ConcurrencyConfig) IsEmpty() bool
func (*ConcurrencyConfig) Validate ¶
func (cc *ConcurrencyConfig) Validate() error
Validate the value of concurrency config, if some value is not exist then set the default value.
type Config ¶
type Config struct { Hostname string // This must be less than Timeout.WriteTimeout HeartbeatInterval time.Duration Lookupd LookupdConfig Timeout TimeoutConfig Queue QueueConfig Compression CompressionConfig }
Config of nsqio
type ConsumerClient ¶
type ConsumerClient interface { Topic() string Channel() string Stop() AddHandler(handler nsqio.Handler) AddConcurrentHandlers(handler nsqio.Handler, concurrency int) ConnectToNSQLookupds(addresses []string) error ChangeMaxInFlight(n int) Concurrency() int MaxInFlight() int }
ConsumerClient is he consumer client of NSQ. This backend implements all communication protocol to lookupd and nsqd servers.
type ConsumerManager ¶
type ConsumerManager struct { // Default functions for throttling. OpenThrottleFunc func(*Stats) bool BreaKThrottleFunc func(*Stats) bool LoosenThrottleFunc func(*Stats) bool // contains filtered or unexported fields }
ConsumerManager manage the consumer flow control. The ConsumerManager manages multiple nsq consumers client, and expose apis for message handler to handle the incoming messages. The ConsumerManager also manage the lifecycle of the nsq consumers client and the concurrent handlers(start and stop).
func ManageConsumers ¶
func ManageConsumers(clients ...ConsumerClient) (*ConsumerManager, error)
ManageConsumers creates a new ConsumerManager
func NewConsumerManager ¶
func NewConsumerManager(options *ConsumerManagerOptions) (*ConsumerManager, error)
NewConsumerManager create a new manager for NSQs consumers. The function takes options for managing the consumers.
func (*ConsumerManager) AddConsumer ¶
func (c *ConsumerManager) AddConsumer(topic, channel string, client ConsumerClient) error
AddConsumer add consumer to ConsumerManager
func (*ConsumerManager) AddConsumers ¶
func (c *ConsumerManager) AddConsumers(clients ...ConsumerClient) error
AddConsumers add more consumers to the consumer object.
func (*ConsumerManager) Handle ¶
func (c *ConsumerManager) Handle(topic, channel string, handler HandlerFunc)
Handle to register the message handler function. Only for reigstering the message handler into the consumer.
func (*ConsumerManager) Start ¶
func (c *ConsumerManager) Start(lookupdsAddr []string) error
Start for start the consumer. This will trigger all workers to start.
func (*ConsumerManager) Stop ¶
func (c *ConsumerManager) Stop(ctx context.Context) (err error)
Stop for stopping all the nsq consumer.
func (*ConsumerManager) Use ¶
func (c *ConsumerManager) Use(middleware ...MiddlewareFunc)
Use middleware, this should be called before handle function this function will avoid to add the same middleware twice if the same middleware is used, it will skip the addition.
type ConsumerManagerOptions ¶
type ConsumerManagerOptions struct { OpenThrottleFunc func(*Stats) bool LoosenThrottleFunc func(*Stats) bool BreakThrottleFunc func(*Stats) bool }
ConsumerManagerOptions is a set of options for consumer manager.
type Handler ¶
type Handler interface { // HandleMessage implements nsqio.Handler to directly consume // messages from nsqio client. HandleMessage then pass the message // into internal buffered channel, so the message can be consumed // by Consume method. HandleMessage(message *nsqio.Message) // Consume starts the worker goroutines, to consume messages // in the buffered channel. Consume() error // Stop stops all worker goroutines. Stop() }
Handler handle messages from NSQD and pass the message into the message handler via channel.
Handler implements nsqio.Handler to consume the message from NSQD(via nsqio/go-nsq) and pass the message via Channel. The messages from channel is consumed via worker goroutines that runs when Consume method is called. The handler responsible to stop the worker goroutines via Stop method.
type HandlerFunc ¶
HandlerFunc for nsq
type LookupdConfig ¶
type LookupdConfig struct { PoolInterval time.Duration `toml:"pool_interval" yaml:"pool_interval"` PollJitter float64 `toml:"pool_jitter" yaml:"pool_jitter"` }
LookupdConfig for lookupd configuration
func (*LookupdConfig) Validate ¶
func (ld *LookupdConfig) Validate() error
type MiddlewareFunc ¶
type MiddlewareFunc func(handler HandlerFunc) HandlerFunc
MiddlewareFunc for nsq middleware
type NSQConsumer ¶
type NSQConsumer struct {
// contains filtered or unexported fields
}
NSQConsumer backend
func NewConsumer ¶
func NewConsumer(ctx context.Context, config NSQConsumerConfig) (*NSQConsumer, error)
NewConsumer for nsq
func (*NSQConsumer) AddConcurrentHandlers ¶
func (nc *NSQConsumer) AddConcurrentHandlers(handler nsqio.Handler, concurrency int)
AddConcurrentHandlers add concurrent handler to nsq
func (*NSQConsumer) AddHandler ¶
func (nc *NSQConsumer) AddHandler(handler nsqio.Handler)
AddHandler to nsq
func (*NSQConsumer) ChangeMaxInFlight ¶
func (nc *NSQConsumer) ChangeMaxInFlight(n int)
ChangeMaxInFlight will change max in flight number in nsq consumer
func (*NSQConsumer) Channel ¶
func (nc *NSQConsumer) Channel() string
Channel return the channel of consumer
func (*NSQConsumer) Concurrency ¶
func (nc *NSQConsumer) Concurrency() int
Concurrency return the concurrency number for a given consumer
func (*NSQConsumer) ConnectToNSQLookupds ¶
func (nc *NSQConsumer) ConnectToNSQLookupds(addresses []string) error
ConnectToNSQLookupds connecting to several nsq lookupd
func (*NSQConsumer) MaxInFlight ¶
func (nc *NSQConsumer) MaxInFlight() int
MaxInFlight return the max in flight number for a given consumer
func (*NSQConsumer) Topic ¶
func (nc *NSQConsumer) Topic() string
Topic return the topic of consumer
type NSQConsumerConfig ¶
type NSQConsumerConfig struct { Hostname string Topic string Channel string // This must be less than Timeout.WriteTimeout HeartbeatInterval time.Duration Lookupd LookupdConfig Timeout TimeoutConfig Queue QueueConfig Compression CompressionConfig Concurrency ConcurrencyConfig }
NSQConsumerConfig for nsq consumer
func (*NSQConsumerConfig) Validate ¶
func (cf *NSQConsumerConfig) Validate() error
Validate consumer configuration
type NSQProducer ¶
type NSQProducer struct {
// contains filtered or unexported fields
}
NSQProducer backend
func NewProducer ¶
func NewProducer(ctx context.Context, config ProducerConfig) (*NSQProducer, error)
NewProducer return a new producer
func (*NSQProducer) MultiPublish ¶
func (np *NSQProducer) MultiPublish(topic string, body [][]byte) error
MultiPublish to nsqd
type ProducerClient ¶
type ProducerClient interface { Ping() error Publish(topic string, body []byte) error MultiPublish(topic string, body [][]byte) error Stop() }
ProducerClient is the producer client of NSQ. This backend implements all communication protocol to nsqd servers.
type ProducerConfig ¶
type ProducerConfig struct { Hostname string Address string Compression CompressionConfig Timeout TimeoutConfig }
ProducerConfig struct
type ProducerManager ¶
type ProducerManager struct {
// contains filtered or unexported fields
}
ProducerManager manage the producer flow. If a given topic is not available in the manager, the producer will return a failure message.
func ManageProducers ¶
func ManageProducers(backend ProducerClient, topics ...string) (*ProducerManager, error)
ManageProducers is a function to wrap the nsq producer. The function receive topics parameters because in NSQ, we can publish message without registering any new topics. This sometimes can be problematic as we don't have a list of topics that we will publish the message to.
func (*ProducerManager) MultiPublish ¶
func (p *ProducerManager) MultiPublish(topic string, body [][]byte) error
MultiPublish message to nsqd, ifa given topic does not exists, then return error.
type QueueConfig ¶
type QueueConfig struct { MsgTimeout time.Duration `toml:"message_timeout" yaml:"message_timeout"` MaxRequeueDelay time.Duration `toml:"max_requeue_delay" yaml:"max_requeue_delay"` DefaultRequeueDelay time.Duration `toml:"default_requeue_delay" yaml:"default_requeue_delay"` }
QueueConfig for message configuration. In the queue config, MaxInFlight is excluded because the MaxInFlight configuration will depends on the buffer length.
type Stats ¶
type Stats struct {
// contains filtered or unexported fields
}
Stats object to be included in every nsq consumer worker to collect statuses of nsq consumers.
func (*Stats) BufferLength ¶
BufferLength return length of the buffer used in a message handler
func (*Stats) Concurrency ¶
Concurrency return the number of concurrency in a handler.
func (*Stats) ErrorCount ¶
ErrorCount return the total number of error when handle nsq message.
func (*Stats) MaxInFlight ¶
MaxInFlight return the number of maxInFlight used to calculate buffer length
func (*Stats) MessageCount ¶
MessageCount return the total number of messages retrieved from NSQ.
func (*Stats) MessageInBuffer ¶
MessageInBuffer return the total number of messages in buffer
func (*Stats) Throttle ¶
func (s *Stats) Throttle() ThrottleStats
Throttle return whether the consumer/producer is being throttled or not.
func (*Stats) ThrottleCount ¶
ThrottleCount return the total number of throttle happened.
type ThrottleStats ¶
type ThrottleStats int32
ThrottleStats is the indicator of throttling. 0 = no throttle. 1 = throttle loosen. 2 = throttled.
func (ThrottleStats) IsThrottleLoosen ¶
func (t ThrottleStats) IsThrottleLoosen() bool
IsThrottleLoosen return true if throttle loosen.
func (ThrottleStats) IsThrottled ¶
func (t ThrottleStats) IsThrottled() bool
IsThrottled return true if throttle is on.
type TimeoutConfig ¶
type TimeoutConfig struct { Dial time.Duration `toml:"dial" yaml:"dial"` Read time.Duration `toml:"read" yaml:"read"` Write time.Duration `toml:"write" yaml:"write"` MessageTimeout time.Duration `toml:"message" yaml:"message"` }
TimeoutConfig for timeout configuration
func (*TimeoutConfig) Validate ¶
func (tm *TimeoutConfig) Validate() error