nsq

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Client

func Client(name ...string) (pool.Pool, bool)

func Connect

func Connect(configs map[string]Config)

func MockMessage

func MockMessage(body []byte) *nsq.Message

func NewDebugError

func NewDebugError(msg interface{}) error

func NewMockProducer

func NewMockProducer(name string) (*producerMock, error)

func NewProducer

func NewProducer(name string) (*producer, error)

func Register

func Register(group string, h ...*NsqHandler)

func Run

func Run(group string, ctx context.Context)

func RunMock

func RunMock(ctx context.Context, h *NsqHandler, msg *nsq.Message) error

Types

type Config

type Config struct {
	Host     string `toml:"host" json:"host"`
	Port     string `toml:"port" json:"port"`
	InitSize int    `toml:"init_size" json:"init_size"`
	MaxSize  int    `toml:"max_size" json:"max_size"`
}

type HandleFunc

type HandleFunc func(log *logger.Logger, message *nsq.Message) error

type NoMessageDelegate

type NoMessageDelegate struct{}

func (*NoMessageDelegate) OnFinish

func (d *NoMessageDelegate) OnFinish(m *nsq.Message)

func (*NoMessageDelegate) OnRequeue

func (d *NoMessageDelegate) OnRequeue(m *nsq.Message, delay time.Duration, backoff bool)

func (*NoMessageDelegate) OnTouch

func (d *NoMessageDelegate) OnTouch(m *nsq.Message)

type NsqConsumer

type NsqConsumer struct {
	// contains filtered or unexported fields
}

func NewNsqConsumer

func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error)

func (*NsqConsumer) AddHandler

func (n *NsqConsumer) AddHandler(handler nsq.Handler)

func (*NsqConsumer) Run

func (n *NsqConsumer) Run(conf *Config, concurrency int)

type NsqHandler

type NsqHandler struct {
	Connect          string        // 连接的nsq 默认是default
	Topic            string        // nsq topic
	Channel          string        // topic channel
	Size             int           // 并发数MaxInFlight
	MaxAttempts      uint16        // 最大执行次数,默认是100
	OpenChannelTopic bool          // 是否开启独立的topic [Topic.Channel]
	TouchDuration    time.Duration // 多久之后touch一次当前message,保持消息存活,默认不Touch
	Logger           *logger.Logger
	// contains filtered or unexported fields
}

func NewNsqHandler

func NewNsqHandler(options ...func(*NsqHandler)) *NsqHandler

func (*NsqHandler) Init

func (h *NsqHandler) Init(fn func(ctx context.Context))

func (*NsqHandler) SetHandle

func (h *NsqHandler) SetHandle(fn HandleFunc)

func (*NsqHandler) SetShouldRequeue

func (h *NsqHandler) SetShouldRequeue(fn func(message *nsq.Message) (bool, time.Duration))

type Producer

type Producer interface {
	Publish(topic string, body []byte) error
	MultiPublish(topic string, body [][]byte) error
	DeferredPublish(topic string, delay time.Duration, body []byte) error
}

Directories

Path Synopsis
internal
color
Package color provides facility to make normal text into ansi-colored text so as to output on console.
Package color provides facility to make normal text into ansi-colored text so as to output on console.
pool
Package pool implements a pool of net.Conn interfaces to manage and reuse them.
Package pool implements a pool of net.Conn interfaces to manage and reuse them.
Package pool implements a pool of net.Conn interfaces to manage and reuse them.
Package pool implements a pool of net.Conn interfaces to manage and reuse them.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL