producer

package module
v0.0.0-...-1112d63 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2018 License: MIT Imports: 6 Imported by: 0

README

balancer-nsqd-producer

Build Status Go Report Card codecov

解决 nsq 消息队列,官方 go-nsq 生产者没有负载均衡的问题。

一般 nsqd 都会以集群的方式部署多台,通过负载可以将信息分散的写入 nsqd,有如下考虑:

  1. 减少因为 nsqd 异常退出,产生内存数据未及时落盘,导致数据的丢失。
  2. 平滑权重可有效解决 nsqd 机器性能不一致的缺点,提高机器使用率。
  3. 生产数据 io 吞吐会更高,充分利用集群整体性能。

特性

  1. 实现了 轮询、随机、平滑权重 3种常用负载算法,抽象为接口,可快速添加其他(有状态的负载)算法。
  2. 算法性能较高,4 core cpu 30-50ns/op 不会存在瓶颈。
  3. 链接失效,摘除负载队列,进入 ping 队列,恢复后自动加入负载。
注意

因为 go-nsq 包的特性,在下不才,只实现了同步生产消息,支持方法 MultiPublish Publish,一般情况够用,balancer 包支持方法可再扩充, 同时通过多个 goroutine 写入也可解决异步并发写入问题。

使用

go get github.com/hopingtop/balancer-nsqd-producer
func main() {
	
	var addrs = map[string]int{
		"192.168.1.104:4150": 2,
		"192.168.1.109:4150": 8,
	}

	var opt = producer.Options{
		Addrs:        addrs,
		Retry:        2,                    // 如果遇到当前链接发送失败,重试次数,建议与 链接地址数量一致
		Mode:         producer.PollingMode, // 算法方式 PollingMode RandomMode SmoothWeightMode
		PingInterval: 1,                    // 暂时失效链接进入 ping 队列, ping 的间隔时间
		PingTimeout:  1,                    // ping 此时间后,balancer 通过 ErrorsChan   chan error  返回 nsqd 链接错误, 使用者应该消费 ErrorsChan
	}
	// nsq.NewConfig() go-nsq 官方包 生产者配置
	bl, err := producer.NewBalancer(opt, nsq.NewConfig())
	if err != nil { // 初始化链接时,出错
		fmt.Println(err)
		os.Exit(1)
	}
	count := 100
	for count > 0 {
		err := bl.Publish("test_polling", []byte(" ")) // 使用方法与 go-nsq 官方包一致
		if err != nil {
			fmt.Println(err)
			os.Exit(1)
		}
		count--
	}

	bl.CloseAll() // 关闭所有 nsqd 链接

}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoAvailableConns = errors.New("no available conns")
)

error 定义

Functions

func Validate

func Validate(opt Options) error

Validate Options 参数验证

Types

type BalanceMode

type BalanceMode int

BalanceMode 算法模式

const (
	PollingMode BalanceMode = iota
	RandomMode
	SmoothWeightMode
)

支持如下算法

type Balancer

type Balancer struct {
	ErrorsChan chan error // nsqd 链接错误
	// contains filtered or unexported fields
}

Balancer 负载均衡生成器

func NewBalancer

func NewBalancer(opt Options, config *nsq.Config) (*Balancer, error)

NewBalancer 生成负载均衡器 opt.Addrs 非权重 int = 0 config = nsq.NewConfig()

func (*Balancer) CloseAll

func (bl *Balancer) CloseAll()

CloseAll 关闭所有链接

func (*Balancer) MultiPublish

func (bl *Balancer) MultiPublish(topic string, body [][]byte) error

MultiPublish 生产多条消息一次写入 nsqd

func (*Balancer) Publish

func (bl *Balancer) Publish(topic string, body []byte) error

Publish 生产消息写入 nsqd

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn nsqd 链接

type Options

type Options struct {
	Addrs        map[string]int
	Retry        int
	Mode         BalanceMode
	PingInterval int // s
	PingTimeout  int
}

Options 负载均衡器配置文件

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL