ami

package module
v0.1.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2020 License: MIT Imports: 7 Imported by: 4

README

Ami

Go client to reliable queues based on Redis Cluster Streams.

Godoc Coverage Status Go Report Card Build Status

Consume/produce performance

Performance is dependent from:

  • Redis Cluster nodes count;
  • ping RTT from client to Redis Cluster master nodes;
  • network speed between nodes;
  • message sizes;
  • Ami configuration.

As example, 10-nodes Redis Cluster with half of nodes in other datacenter (50 msec ping), 1 master/1 slave, with message "{}" got:

$ go run examples/performance/main.go
Produced 1000000 in 3.423883 sec, rps 292066.022156
Consumed 151000 in 1.049238 sec, rps 143913.931722
Acked 151000 in 0.973587 sec, rps 155096.612263

Producer example

	type errorLogger struct{}

	func (l *errorLogger) AmiError(err error) {
		println("Got error from Ami:", err.Error())
	}

	pr, err := ami.NewProducer(
		ami.ProducerOptions{
			ErrorNotifier:     &errorLogger{},
			Name:              "ruthie",
			PendingBufferSize: 10000000,
			PipeBufferSize:    50000,
			PipePeriod:        time.Microsecond * 1000,
			ShardsCount:       10,
		},
		&redis.ClusterOptions{
			Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
			ReadTimeout:  time.Second * 60,
			WriteTimeout: time.Second * 60,
		},
	)
	if err != nil {
		panic(err)
	}

	for i := 0; i < 10000; i++ {
		pr.Send("{}")
	}

	pr.Close()

Consumer example

	type errorLogger struct{}

	func (l *errorLogger) AmiError(err error) {
		println("Got error from Ami:", err.Error())
	}

	cn, err := ami.NewConsumer(
		ami.ConsumerOptions{
			Consumer:          "alice",
			ErrorNotifier:     &errorLogger{},
			Name:              "ruthie",
			PendingBufferSize: 10000000,
			PipeBufferSize:    50000,
			PipePeriod:        time.Microsecond * 1000,
			PrefetchCount:     100,
			ShardsCount:       10,
		},
		&redis.ClusterOptions{
			Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
			ReadTimeout:  time.Second * 60,
			WriteTimeout: time.Second * 60,
		},
	)
	if err != nil {
		panic(err)
	}

	c := cn.Start()

	wg := sync.WaitGroup{}
	wg.Add(1)

	go func() {
		for {
			m, more := <-c
			if !more {
				break
			}
			println("Got", m.Body, "ID", m.ID)
			cn.Ack(m)
		}
		wg.Done()
	}()

	time.Sleep(time.Second)

	cn.Stop()
	wg.Wait()

	cn.Close()

Documentation

Overview

Package ami - Go client to reliable queues based on Redis Cluster Streams https://redis.io/topics/streams-intro.

Producer example

type errorLogger struct{}

func (l *errorLogger) AmiError(err error) {
	println("Got error from Ami:", err.Error())
}

pr, err := ami.NewProducer(
	ami.ProducerOptions{
		ErrorNotifier:     &errorLogger{},
		Name:              "ruthie",
		PendingBufferSize: 10000000,
		PipeBufferSize:    50000,
		PipePeriod:        time.Microsecond * 1000,
		ShardsCount:       10,
	},
	&redis.ClusterOptions{
		Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
		ReadTimeout:  time.Second * 60,
		WriteTimeout: time.Second * 60,
	},
)
if err != nil {
	panic(err)
}

for i := 0; i < 10000; i++ {
	pr.Send("{}")
}

pr.Close()

Consumer example

type errorLogger struct{}

func (l *errorLogger) AmiError(err error) {
	println("Got error from Ami:", err.Error())
}

cn, err := ami.NewConsumer(
	ami.ConsumerOptions{
		Consumer:          "alice",
		ErrorNotifier:     &errorLogger{},
		Name:              "ruthie",
		PendingBufferSize: 10000000,
		PipeBufferSize:    50000,
		PipePeriod:        time.Microsecond * 1000,
		PrefetchCount:     100,
		ShardsCount:       10,
	},
	&redis.ClusterOptions{
		Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
		ReadTimeout:  time.Second * 60,
		WriteTimeout: time.Second * 60,
	},
)
if err != nil {
	panic(err)
}

c := cn.Start()

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
	for {
		m, more := <-c
		if !more {
			break
		}
		println("Got", m.Body, "ID", m.ID)
		cn.Ack(m)
	}
	wg.Done()
}()

time.Sleep(time.Second)

cn.Stop()
wg.Wait()

cn.Close()

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer added in v0.1.6

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer client for Ami.

Consumer lifecycle is:

1. Get Consumer object.

2. Start() - start read messages from Redis streams and return channel.

3. Application read messages from channel and do Ack() on them.

4. Stop() - stop reading messages from Redis streams and lock until all read messages being processed.

5. Close() - lock until all ACK messages will be sent to Redis.

func NewConsumer added in v0.1.6

func NewConsumer(opt ConsumerOptions, ropt *redis.ClusterOptions) (*Consumer, error)

NewConsumer creates new consumer client for Ami

Note, that you MUST set in ClusterOptions ReadTimeout and WriteTimeout to at least 30-60 seconds, if you set big PipeBufferSize (50000 in examples) and if you do producing big messages. Reason: one full buffer is send in one big-sized query to Redis. So this big query may don't be completed in time and will be retransmitted, may be forewer retransmitted. This is especially strictly for Producer, because it send big queries with big messages. And not so strictly for Consumer, because it send big queries with not so big messages (only ids of ACKed messages).

func (*Consumer) Ack added in v0.1.6

func (c *Consumer) Ack(m Message)

Ack acknowledges message

Function not only do XACK call, but additionally it deletes message from stream with XDELETE. Ack do not do immediately, but pushed to send buffer and sended to Redis in other goroutine.

func (*Consumer) Close added in v0.1.6

func (c *Consumer) Close()

Close queue client

Lock until all ACK messages will be sent to Redis.

func (*Consumer) Start added in v0.1.6

func (c *Consumer) Start() chan Message

Start consume from queue.

Start read messages from Redis streams and return channel.

func (*Consumer) Stop added in v0.1.6

func (c *Consumer) Stop()

Stop queue client.

Stop reading messages from Redis streams and lock until all read messages being processed.

type ConsumerOptions added in v0.1.6

type ConsumerOptions struct {
	// Queue name
	Name string

	// Unique consumer name per queue in Redis Cluster.
	//
	// Pay attention, that if consumer got some messages, and not fully processed
	// and ACKed them, then gone away and not started again - this messages will
	// be in stream forever.
	// If new consumer starts with same name - it got unprocessed messages.
	//
	// Also, if you start two consumers with same name at the same time - they
	// will got same
	// messages, processes them, and only one can ACK message, and second will
	// retry ACKing of this message forever.
	//
	// Now there is no mechanism in Ami, than can move messages from one consumer
	// to other and I am thinking, how to do it.
	Consumer string

	// Shard queue along different Redis Cluster nodes. Default 10.
	//
	// Ami queues spreads along cluster by default Redis Cluster ability - shards.
	// Every queue has setuped number of streams with same name, but with
	// different shard number. So different streams are placed at different
	// Redis Cluster nodes.
	// So bigger value get better spreading of queue along cluster. But huge
	// value is not better idea - it got bigger memory usage. Normal value for
	// cluster with 5 masters and 5 slaves - from 5 to 10.
	// May be later will be added auto-sharding option to place queue on each
	// Redis Cluster node.
	// Shards count must have identical values in all producers and consumers of
	// this queue.
	ShardsCount int8

	// Maximum amount of messages that can be read from queue at same time.
	//  Default 100.
	//
	// But this is not real amount of messages, that will only be got from Redis.
	// Ami preloads some amount of messages from all shards.
	// So, this value:
	// - limits consumer channel length;
	// - limits maximum amount of messages, that will be read from one shard at
	// one read operation.
	// Bigger PrefetchCount got bigger memory usage, but can get better
	// performance.
	PrefetchCount int64

	// BLOCK option of XREADGROUP Redis command
	// https://redis.io/topics/streams-intro
	// Set it to other then 0 value only if you know what you do
	//
	// If you set this value greater then 0, Ami will use this value to block
	// XREADGROUP for this period.
	// Otherwise Ami will use default value - 1 second.
	// If you set this value lower then 0 - blocking will not be used.
	Block time.Duration

	// Limits maximum amount of ACK messages queue. Default 10000000.
	//
	// Bigger value got better ACK performance and bigger memory usage.
	// If you your process dies with big amount of ACKed messages, but not
	// already sended to Redis - ACKs will be lost and messages will be
	// processed again.
	PendingBufferSize int64

	// Request to Redis sended in pipe mode with setuped numbers of requests in
	// one batch. Default 50000.
	//
	// Bigger value get better performance.
	PipeBufferSize int64

	// If there is no full batch collected - pipe will be sended every setuped
	// period. Default time.Microsecond * 1000.
	PipePeriod time.Duration

	// If you set optional ErrorNotifier, you will receiving errors notifications
	// in interface function
	ErrorNotifier ErrorNotifier
}

ConsumerOptions - options for consumer client for Ami.

Optimal values for me is:

ShardsCount:       10,
PrefetchCount:     100,
PendingBufferSize: 10000000,
PipeBufferSize:    50000,
PipePeriod:        time.Microsecond * 1000,

type ErrorNotifier added in v0.1.9

type ErrorNotifier interface {
	// Function is called for every error
	AmiError(error)
}

ErrorNotifier is the interface for receive error notifications

type Message

type Message struct {
	Body   string // Message content, you interested in
	ID     string // ID of message in Redis stream
	Stream string // Redis stream name
	Group  string // Redis stream group name
}

Message from queue

type Producer added in v0.1.6

type Producer struct {
	// contains filtered or unexported fields
}

Producer client for Ami.

Producer lifecycle is:

1. Get Producer object.

2. Application send messages with Producer object.

3. Close() - locks until all produced messages will be sent to Redis.

func NewProducer added in v0.1.6

func NewProducer(opt ProducerOptions, ropt *redis.ClusterOptions) (*Producer, error)

NewProducer creates new producer client for Ami

Note, that you MUST set in ClusterOptions ReadTimeout and WriteTimeout to at least 30-60 seconds, if you set big PipeBufferSize (50000 in examples) and if you do producing big messages. Reason: one full buffer is send in one big-sized query to Redis. So this big query may don't be completed in time and will be retransmitted, may be forewer retransmitted. This is especially strictly for Producer, because it send big queries with big messages. And not so strictly for Consumer, because it send big queries with not so big messages (only ids of ACKed messages).

func (*Producer) Close added in v0.1.6

func (p *Producer) Close()

Close queue client.

Function locks until all produced messages will be sent to Redis. If PendingBufferSize has huge value - Close can wait long time.

func (*Producer) Send added in v0.1.6

func (p *Producer) Send(m string)

Send message.

Message not sended immediately, but pushed to send buffer and sended to Redis in other goroutine.

type ProducerOptions added in v0.1.6

type ProducerOptions struct {
	// Queue name
	Name string

	// Shard queue along different Redis Cluster nodes. Default 10.
	//
	// Ami queues spreads along cluster by default Redis Cluster ability - shards.
	// Every queue has setuped number of streams with same name, but with
	// different shard number. So different streams are placed at different Redis
	// Cluster nodes.
	// So bigger value get better spreading of queue along cluster. But huge
	// value is not better idea - it got bigger memory usage. Normal value for
	// cluster with 5 masters and 5 slaves - from 5 to 10.
	// May be later will be added auto-sharding option to place queue on each
	// Redis Cluster node.
	// Shards count must have identical values in all producers and consumers of
	// this queue.
	ShardsCount int8

	// Limits maximum amount of ACK messages queue. Default 10000000.
	//
	// Bigger value got better ACK performance and bigger memory usage.
	// If you your process dies with big amount of ACKed messages, but not already
	// sended to Redis - ACKs will be lost and messages will be processed again.
	PendingBufferSize int64

	// Request to Redis sended in pipe mode with setuped numbers of requests in
	// one batch. Default 50000.
	//
	// Bigger value get better performance.
	PipeBufferSize int64

	// If there is no full batch collected - pipe will be sended every setuped
	// period. Default time.Microsecond * 1000.
	PipePeriod time.Duration

	// If you set optional ErrorNotifier, you will receiving errors notifications
	// in interface function
	ErrorNotifier ErrorNotifier
}

ProducerOptions - options for producer client for Ami

Optimal values for me is:

ShardsCount:       10,
PendingBufferSize: 10000000,
PipeBufferSize:    50000,
PipePeriod:        time.Microsecond * 1000,

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL