nsq

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2022 License: MIT, MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WaitTermSig added in v1.4.1

func WaitTermSig(handler func(context.Context) error) <-chan struct{}

WaitTermSig wait for termination signal

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer instance

func NewConsumer

func NewConsumer(cfg ConsumerConfig) *Consumer

NewConsumer will instantiate the nsq consumer

func (*Consumer) RegisterHandler

func (c *Consumer) RegisterHandler(handler ConsumerHandler)

RegisterHandler will register the consumer handlers

func (*Consumer) Run

func (c *Consumer) Run() error

Run will connecting all registered consumer handlers to the nsqlookupd address

func (*Consumer) RunDirect

func (c *Consumer) RunDirect() error

RunDirect will connecting all registered consumer handlers directly to the nsqd address

func (*Consumer) Wait added in v1.4.1

func (c *Consumer) Wait()

Wait waits for the stop/restart signal and shutdown the NSQ consumers gracefully

type ConsumerConfig

type ConsumerConfig struct {
	ListenAddress []string
	Prefix        string
	StopTimeout   time.Duration
}

ConsumerConfig config for the consumer instance

type ConsumerHandler

type ConsumerHandler struct {
	Topic       string
	Channel     string
	Concurrent  int
	MaxAttempts uint16
	MaxInFlight int
	Enable      bool

	Handler func(message IMessage) error
}

ConsumerHandler handler for consumer

type IMessage

type IMessage interface {
	// Finish sends a FIN command to the nsqd which
	// sent this message
	Finish()
	// RequeueWithoutBackoff sends a REQ command to the nsqd which
	// sent this message, using the supplied delay.
	//
	// Notably, using this method to respond does not trigger a backoff
	// event on the configured Delegate.
	RequeueWithoutBackoff(delay time.Duration)
	// Requeue sends a REQ command to the nsqd which
	// sent this message, using the supplied delay.
	Requeue(delay time.Duration)
	// GetAttempts will get the current attempts
	GetAttempts() uint16
	// GetBody will get the body value
	GetBody() []byte
}

IMessage interface define contract for nsq message in handler

type Message

type Message struct {
	*nsq.Message
}

Message alias for built in nsq message

func (*Message) GetAttempts

func (m *Message) GetAttempts() uint16

GetAttempts return number of how many this message enter the consumer

func (*Message) GetBody

func (m *Message) GetBody() []byte

GetBody return body of message

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is struct for publisher

func NewPublisher

func NewPublisher(publishAddress, prefix string) (*Publisher, error)

NewPublisher will create new publisher instance leaf the prefix empty

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, data interface{}) error

Publish will publish the data using json format, by default will always use the prefix in the topic

func (*Publisher) PublishWithoutPrefix

func (p *Publisher) PublishWithoutPrefix(topic string, data interface{}) error

PublishWithoutPrefix will publish the data using json format without prefix in the topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL