nsqio

package
v0.0.0-...-d22e7c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2020 License: GPL-3.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CompressionConfig

type CompressionConfig struct {
	Deflate      bool `toml:"deflate" yaml:"deflate"`
	DeflateLevel int  `toml:"deflate_level" yaml:"deflate_level"`
	Snappy       bool `toml:"snappy" yaml:"snappy"`
}

CompressionConfig to support compression

type Config

type Config struct {
	Hostname    string
	Lookupd     LookupdConfig
	Timeout     TimeoutConfig
	Queue       QueueConfig
	Compression CompressionConfig
}

Config of nsqio

type ConsumerConfig

type ConsumerConfig struct {
	Hostname    string
	Topic       string
	Channel     string
	Lookupd     LookupdConfig
	Timeout     TimeoutConfig
	Queue       QueueConfig
	Compression CompressionConfig
	Concurrency int
	// BufferMultiplier means the length of the buffer per concurrent worker
	// is the multiplier factor of concurrency to set the size of buffer when consuming message
	// the size of buffer multiplier is number of message being consumed before the buffer will be half full
	// for example, 20(default value) buffer multiplier means the worker is able to consume more than 10 message
	// before the buffer is half full from the nsqd message consumption.
	// To fill this configuration correctly, it is needed to observe the consumption rate of the message and the handling rate of the worker.
	BufferMultiplier int
}

ConsumerConfig for nsq consumer

func (*ConsumerConfig) Validate

func (cf *ConsumerConfig) Validate() error

Validate consumer configuration

type LookupdConfig

type LookupdConfig struct {
	PoolInterval time.Duration `toml:"pool_interval" yaml:"pool_interval"`
	PollJitter   float64       `toml:"pool_jitter" yaml:"pool_jitter"`
}

LookupdConfig for lookupd configuration

type NSQConsumer

type NSQConsumer struct {
	// contains filtered or unexported fields
}

NSQConsumer backend

func NewConsumer

func NewConsumer(ctx context.Context, config ConsumerConfig) (*NSQConsumer, error)

NewConsumer for nsq

func (*NSQConsumer) AddConcurrentHandlers

func (c *NSQConsumer) AddConcurrentHandlers(handler nsqio.Handler, concurrency int)

AddConcurrentHandlers add concurrent handler to nsq

func (*NSQConsumer) AddHandler

func (c *NSQConsumer) AddHandler(handler nsqio.Handler)

AddHandler to nsq

func (*NSQConsumer) BufferMultiplier

func (c *NSQConsumer) BufferMultiplier() int

BufferMultiplier return the buffer multiplier number for a given consumer

func (*NSQConsumer) ChangeMaxInFlight

func (c *NSQConsumer) ChangeMaxInFlight(n int)

ChangeMaxInFlight will change max in flight number in nsq consumer

func (*NSQConsumer) Channel

func (c *NSQConsumer) Channel() string

Channel return the channel of consumer

func (*NSQConsumer) Concurrency

func (c *NSQConsumer) Concurrency() int

Concurrency return the concurrency number for a given consumer

func (*NSQConsumer) ConnectToNSQLookupds

func (c *NSQConsumer) ConnectToNSQLookupds(addresses []string) error

ConnectToNSQLookupds connecting to several nsq lookupd

func (*NSQConsumer) Stop

func (c *NSQConsumer) Stop()

Stop nsq consumer

func (*NSQConsumer) Topic

func (c *NSQConsumer) Topic() string

Topic return the topic of consumer

type NSQProducer

type NSQProducer struct {
	// contains filtered or unexported fields
}

NSQProducer backend

func NewProducer

func NewProducer(ctx context.Context, config ProducerConfig) (*NSQProducer, error)

NewProducer return a new producer

func (*NSQProducer) MultiPublish

func (np *NSQProducer) MultiPublish(topic string, body [][]byte) error

MultiPublish to nsqd

func (*NSQProducer) Ping

func (np *NSQProducer) Ping() error

Ping the nsqd of producer

func (*NSQProducer) Publish

func (np *NSQProducer) Publish(topic string, body []byte) error

Publish to nsqd

func (*NSQProducer) Stop

func (np *NSQProducer) Stop()

Stop the nsq producer

type ProducerConfig

type ProducerConfig struct {
	Hostname    string
	Address     string
	Compression CompressionConfig
	Timeout     TimeoutConfig
}

ProducerConfig struct

type QueueConfig

type QueueConfig struct {
	MaxInFlight         int           `toml:"max_in_flight" yaml:"max_in_flight"`
	MsgTimeout          time.Duration `toml:"message_timeout" yaml:"message_timeout"`
	MaxRequeueDelay     time.Duration `toml:"max_requeue_delay" yaml:"max_requeue_delay"`
	DefaultRequeueDelay time.Duration `toml:"default_requeue_delay" yaml:"default_requeue_delay"`
}

QueueConfig for message configuration

type TimeoutConfig

type TimeoutConfig struct {
	Dial           time.Duration `toml:"dial" yaml:"dial"`
	Read           time.Duration `toml:"read" yaml:"read"`
	Write          time.Duration `toml:"write" yaml:"write"`
	MessageTimeout time.Duration `toml:"message" yaml:"message"`
}

TimeoutConfig for timeout configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL