nq

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2022 License: GPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustNewQueue

func MustNewQueue(c NqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue

func NewQueue

func NewQueue(c NqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)

Types

type ConsumeHandle

type ConsumeHandle func(key, value string) error

type ConsumeHandler

type ConsumeHandler interface {
	Consume(key, value string) error
}

func WithHandle

func WithHandle(handle ConsumeHandle) ConsumeHandler

type MessageHandler

type MessageHandler struct {
}

func (*MessageHandler) Consume

func (mh *MessageHandler) Consume(key, val string) error

type NqConf

type NqConf struct {
	// service.ServiceConf
	Brokers    []string //连接的nsqlookupd
	Group      string   //可默认为分区ID,标准版nsq无效,有赞版有用(类型为int,需要转换)
	Topic      string
	Offset     string `json:",options=first|last,default=last"` //标准版nsq无效,有赞版有用(从哪个位置读取)
	Conns      int    `json:",default=1"`                       //建立多少连接
	Consumers  int    `json:",default=1"`                       //1个连接,多个协程读取,nsq固定为1.
	Processors int    `json:",default=8"`                       //1个连接,多少协程去消费
	Channel    string `json:",default=default"`                 //消费的channel,默认为default
}

type PushOption

type PushOption func(options *chunkOptions)

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

func WithFlushInterval

func WithFlushInterval(interval time.Duration) PushOption

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs string, topic string, opts ...PushOption) *Pusher

func (*Pusher) Close

func (p *Pusher) Close() error

func (*Pusher) Name

func (p *Pusher) Name() string

func (*Pusher) Ping added in v1.0.2

func (p *Pusher) Ping() error

func (*Pusher) Push

func (p *Pusher) Push(v string) error

type QueueOption

type QueueOption func(*queueOptions)

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL