rnsq

package module
v0.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2023 License: GPL-3.0 Imports: 8 Imported by: 0

README

rose-nsq

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseConsumer

type BaseConsumer interface {
	Consume(handler Handler) error
	ConsumeWithTopic(topic, channel string, handler Handler, opts ...Option) error
	ConsumeMany(handler Handler, concurrency int) error
	ConsumeManyWithTopic(topic, channel string, handler Handler, concurrency int, opts ...Option) error
	ChangeMaxInFlight(maxInFlight int)
	ChangeMaxInFlightWithTopic(topic, channel string, maxInFlight int)
	SetLogLevel(level string)
	SetLogLevelWithTopic(topic, channel, level string)
	Stats() *ConsumerStates
	StatsWithTopic(topic, channel string) *ConsumerStates
	Stop()

	SetMaxInFlight(maxInFlight int) *defBaseConsumer
	SetMaxAttempts(maxAttempts uint16) *defBaseConsumer
	SetSecret(secret string) *defBaseConsumer
}

func NewConsumerNSQD

func NewConsumerNSQD(addr []string, topic, channel string) BaseConsumer

func NewConsumerNSQLookUpD

func NewConsumerNSQLookUpD(addr []string, topic, channel string) BaseConsumer

type BaseProducer

type BaseProducer interface {
	Publish(data string) error
	PublishB(data []byte) error
	PublishWithTopic(topic, data string) error
	PublishBWithTopic(topic string, data []byte) error
	PublishAsync(data string) error
	PublishBAsync(data []byte) error
	PublishAsyncWithTopic(topic, data string) error
	PublishBAsyncWithTopic(topic string, data []byte) error
	PublishAsyncWithChan(data string, doneChan chan *nsq.ProducerTransaction, args ...interface{}) error
	PublishBAsyncWithChan(data []byte, doneChan chan *nsq.ProducerTransaction, args ...interface{}) error
	PublishAsyncWithChanWithTopic(topic, data string, doneChan chan *nsq.ProducerTransaction, args ...interface{}) error
	PublishBAsyncWithChanWithTopic(topic string, data []byte, doneChan chan *nsq.ProducerTransaction, args ...interface{}) error
	PublishDelay(data string, delay time.Duration) error
	PublishBDelay(data []byte, delay time.Duration) error
	PublishDelayWithTopic(topic, data string, delay time.Duration) error
	PublishBDelayWithTopic(topic string, data []byte, delay time.Duration) error
	PublishDelayAsync(data string, delay time.Duration) error
	PublishBDelayAsync(data []byte, delay time.Duration) error
	PublishDelayAsyncWithChan(data string, delay time.Duration, doneChan chan *nsq.ProducerTransaction,
		args ...interface{}) error
	PublishBDelayAsyncWithChan(data []byte, delay time.Duration, doneChan chan *nsq.ProducerTransaction,
		args ...interface{}) error
	PublishDelayAsyncWithChanWithTopic(topic, data string, delay time.Duration, doneChan chan *nsq.ProducerTransaction,
		args ...interface{}) error
	PublishBDelayAsyncWithChanWithTopic(topic string, data []byte, delay time.Duration, doneChan chan *nsq.ProducerTransaction,
		args ...interface{}) error
	PublishMulti(messages [][]byte) error
	PublishMultiWithTopic(topic string, messages [][]byte) error
	PublishMultiAsync(messages [][]byte) error
	PublishMultiAsyncWithTopic(topic string, messages [][]byte) error
	PublishMultiAsyncWithChan(messages [][]byte, doneChan chan *nsq.ProducerTransaction,
		args ...interface{}) error
	PublishMultiAsyncWithChanWithTopic(topic string, messages [][]byte, doneChan chan *nsq.ProducerTransaction,
		args ...interface{}) error
	SetLogLevel(level string)
	Stop()
}

func NewProducer

func NewProducer(addr string, topic string, opts ...Option) (BaseProducer, error)

type ConnType

type ConnType int
const (
	NSQD       ConnType = 0
	NSQLookupD ConnType = 1
)

type ConsumerStates added in v0.2.4

type ConsumerStates struct {
	Received    uint64
	Finished    uint64
	Requeued    uint64
	Connections int
}

type Handler

type Handler func(*XMessage) error

type Option added in v0.2.2

type Option func(opt *options)

func SetSecret added in v0.2.2

func SetSecret(secret string) Option

type XHandler

type XHandler struct {
	// contains filtered or unexported fields
}

func (*XHandler) HandleMessage

func (q *XHandler) HandleMessage(message *nsq.Message) error

type XMessage

type XMessage struct {
	*nsq.Message
}

func (*XMessage) DelayReSend

func (m *XMessage) DelayReSend(delay time.Duration)

func (*XMessage) DelayReSendWithoutBackoff added in v0.2.3

func (m *XMessage) DelayReSendWithoutBackoff(delay time.Duration)

func (*XMessage) Fail

func (m *XMessage) Fail()

func (*XMessage) Finish

func (m *XMessage) Finish(success bool)

func (*XMessage) Success

func (m *XMessage) Success()

func (*XMessage) ToByte added in v0.1.1

func (m *XMessage) ToByte() []byte

func (*XMessage) ToJson added in v0.1.1

func (m *XMessage) ToJson(v interface{}) error

func (*XMessage) ToString added in v0.1.1

func (m *XMessage) ToString() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL