nsqclient

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2020 License: MIT Imports: 15 Imported by: 0

README

nsqclient

nsq client for golang

Build Status codecov Go Report Card
GoDoc

Connection

nsqclient.Connect(map[string]Config{
    "default": Config{
        Host:     "10.64.146.231",
        Port:     "4150",
        InitSize: 10,
        MaxSize:  10,
    },
    "other": Config{
        Host:     "10.64.146.22",
        Port:     "4150",
        InitSize: 10,
        MaxSize:  10,
    },
})

Publish

写入nsq为了方便进行单元测试,可以注入nsqclient.Producer接口,然后在单元测试使用mock的实现

// default client
nsq := nsqclient.NewProducer("default")
nsq.Publish("topic","body")

//mock
nsq := nsqclient.NewMockProducer("default")
nsq.Publish("topic","body")

Consumer

// default connect
consumer := &nsqclient.NsqHandler{
    Topic:   "log",
    Channel: "default",
    Size:   10,
}

consumer.SetHandle(func(lg logger.ILogger, message *nsq.Message) error {
    // something
    return nil
})

nsqclient.Register(consumer, "access_log")

nsqclient.Run("access_log", ctx)

// other connect
consumer := &nsqclient.NsqHandler{
    Connect: "other",
    Topic:   "log",
    Channel: "default",
    Size:   10,
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Client

func Client(name ...string) (pool.Pool, bool)

func Connect

func Connect(configs map[string]Config)

func MockMessage added in v1.1.0

func MockMessage(body []byte) *nsq.Message

func NewDebugError

func NewDebugError(msg interface{}) error

func NewMockProducer added in v1.1.0

func NewMockProducer(name string) (*producerMock, error)

func NewProducer

func NewProducer(name string) (*producer, error)

func Register

func Register(group string, h ...*NsqHandler)

func Run

func Run(group string, ctx context.Context)

func RunMock added in v1.1.0

func RunMock(ctx context.Context, h *NsqHandler, msg *nsq.Message) error

Types

type Config

type Config struct {
	Host     string `toml:"host" json:"host"`
	Port     string `toml:"port" json:"port"`
	InitSize int    `toml:"init_size" json:"init_size"`
	MaxSize  int    `toml:"max_size" json:"max_size"`
}

type HandleFunc

type HandleFunc func(log logger.ILogger, message *nsq.Message) error

type NoMessageDelegate added in v1.1.0

type NoMessageDelegate struct{}

func (*NoMessageDelegate) OnFinish added in v1.1.0

func (d *NoMessageDelegate) OnFinish(m *nsq.Message)

func (*NoMessageDelegate) OnRequeue added in v1.1.0

func (d *NoMessageDelegate) OnRequeue(m *nsq.Message, delay time.Duration, backoff bool)

func (*NoMessageDelegate) OnTouch added in v1.1.0

func (d *NoMessageDelegate) OnTouch(m *nsq.Message)

type NsqConsumer

type NsqConsumer struct {
	// contains filtered or unexported fields
}

func NewNsqConsumer

func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error)

func (*NsqConsumer) AddHandler

func (n *NsqConsumer) AddHandler(handler nsq.Handler)

func (*NsqConsumer) Run

func (n *NsqConsumer) Run(conf *Config, concurrency int)

type NsqHandler

type NsqHandler struct {
	Connect          string        // 连接的nsq 默认是default
	Topic            string        // nsq topic
	Channel          string        // topic channel
	Size             int           // 并发数MaxInFlight
	MaxAttempts      uint16        // 最大执行次数,默认是100
	OpenChannelTopic bool          // 是否开启独立的topic [Topic.Channel]
	TouchDuration    time.Duration // 多久之后touch一次当前message,保持消息存活,默认不Touch
	Logger           logger.ILogger
	// contains filtered or unexported fields
}

func NewNsqHandler

func NewNsqHandler(options ...func(*NsqHandler)) *NsqHandler

func (*NsqHandler) Init

func (h *NsqHandler) Init(fn func(ctx context.Context))

func (*NsqHandler) SetHandle

func (h *NsqHandler) SetHandle(fn HandleFunc)

func (*NsqHandler) SetShouldRequeue

func (h *NsqHandler) SetShouldRequeue(fn func(message *nsq.Message) (bool, time.Duration))

type Producer added in v1.1.0

type Producer interface {
	Publish(topic string, body []byte) error
	MultiPublish(topic string, body [][]byte) error
	DeferredPublish(topic string, delay time.Duration, body []byte) error
}

Directories

Path Synopsis
internal
color
Package color provides facility to make normal text into ansi-colored text so as to output on console.
Package color provides facility to make normal text into ansi-colored text so as to output on console.
pool
Package pool implements a pool of net.Conn interfaces to manage and reuse them.
Package pool implements a pool of net.Conn interfaces to manage and reuse them.
Package pool implements a pool of net.Conn interfaces to manage and reuse them.
Package pool implements a pool of net.Conn interfaces to manage and reuse them.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL