ami: github.com/kak-tus/ami Index | Files

package ami

import "github.com/kak-tus/ami"

Package ami - Go client to reliable queues based on Redis Cluster Streams https://redis.io/topics/streams-intro.

Producer example

type errorLogger struct{}

func (l *errorLogger) AmiError(err error) {
	println("Got error from Ami:", err.Error())
}

pr, err := ami.NewProducer(
	ami.ProducerOptions{
		ErrorNotifier:     &errorLogger{},
		Name:              "ruthie",
		PendingBufferSize: 10000000,
		PipeBufferSize:    50000,
		PipePeriod:        time.Microsecond * 1000,
		ShardsCount:       10,
	},
	&redis.ClusterOptions{
		Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
		ReadTimeout:  time.Second * 60,
		WriteTimeout: time.Second * 60,
	},
)
if err != nil {
	panic(err)
}

for i := 0; i < 10000; i++ {
	pr.Send("{}")
}

pr.Close()

Consumer example

type errorLogger struct{}

func (l *errorLogger) AmiError(err error) {
	println("Got error from Ami:", err.Error())
}

cn, err := ami.NewConsumer(
	ami.ConsumerOptions{
		Consumer:          "alice",
		ErrorNotifier:     &errorLogger{},
		Name:              "ruthie",
		PendingBufferSize: 10000000,
		PipeBufferSize:    50000,
		PipePeriod:        time.Microsecond * 1000,
		PrefetchCount:     100,
		ShardsCount:       10,
	},
	&redis.ClusterOptions{
		Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
		ReadTimeout:  time.Second * 60,
		WriteTimeout: time.Second * 60,
	},
)
if err != nil {
	panic(err)
}

c := cn.Start()

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
	for {
		m, more := <-c
		if !more {
			break
		}
		println("Got", m.Body, "ID", m.ID)
		cn.Ack(m)
	}
	wg.Done()
}()

time.Sleep(time.Second)

cn.Stop()
wg.Wait()

cn.Close()

Index

Package Files

ami.go consumer.go doc.go producer.go types.go

type Consumer Uses

type Consumer struct {
    // contains filtered or unexported fields
}

Consumer client for Ami.

Consumer lifecycle is:

1. Get Consumer object.

2. Start() - start read messages from Redis streams and return channel.

3. Application read messages from channel and do Ack() on them.

4. Stop() - stop reading messages from Redis streams and lock until all read messages being processed.

5. Close() - lock until all ACK messages will be sent to Redis.

func NewConsumer Uses

func NewConsumer(opt ConsumerOptions, ropt *redis.ClusterOptions) (*Consumer, error)

NewConsumer creates new consumer client for Ami

Note, that you MUST set in ClusterOptions ReadTimeout and WriteTimeout to at least 30-60 seconds, if you set big PipeBufferSize (50000 in examples) and if you do producing big messages. Reason: one full buffer is send in one big-sized query to Redis. So this big query may don't be completed in time and will be retransmitted, may be forewer retransmitted. This is especially strictly for Producer, because it send big queries with big messages. And not so strictly for Consumer, because it send big queries with not so big messages (only ids of ACKed messages).

func (*Consumer) Ack Uses

func (c *Consumer) Ack(m Message)

Ack acknowledges message

Function not only do XACK call, but additionally it deletes message from stream with XDELETE. Ack do not do immediately, but pushed to send buffer and sended to Redis in other goroutine.

func (*Consumer) Close Uses

func (c *Consumer) Close()

Close queue client

Lock until all ACK messages will be sent to Redis.

func (*Consumer) Start Uses

func (c *Consumer) Start() chan Message

Start consume from queue.

Start read messages from Redis streams and return channel.

func (*Consumer) Stop Uses

func (c *Consumer) Stop()

Stop queue client.

Stop reading messages from Redis streams and lock until all read messages being processed.

type ConsumerOptions Uses

type ConsumerOptions struct {
    // Queue name
    Name string

    // Unique consumer name per queue in Redis Cluster.
    //
    // Pay attention, that if consumer got some messages, and not fully processed
    // and ACKed them, then gone away and not started again - this messages will
    // be in stream forever.
    // If new consumer starts with same name - it got unprocessed messages.
    //
    // Also, if you start two consumers with same name at the same time - they
    // will got same
    // messages, processes them, and only one can ACK message, and second will
    // retry ACKing of this message forever.
    //
    // Now there is no mechanism in Ami, than can move messages from one consumer
    // to other and I am thinking, how to do it.
    Consumer string

    // Shard queue along different Redis Cluster nodes. Default 10.
    //
    // Ami queues spreads along cluster by default Redis Cluster ability - shards.
    // Every queue has setuped number of streams with same name, but with
    // different shard number. So different streams are placed at different
    // Redis Cluster nodes.
    // So bigger value get better spreading of queue along cluster. But huge
    // value is not better idea - it got bigger memory usage. Normal value for
    // cluster with 5 masters and 5 slaves - from 5 to 10.
    // May be later will be added auto-sharding option to place queue on each
    // Redis Cluster node.
    // Shards count must have identical values in all producers and consumers of
    // this queue.
    ShardsCount int8

    // Maximum amount of messages that can be read from queue at same time.
    //  Default 100.
    //
    // But this is not real amount of messages, that will only be got from Redis.
    // Ami preloads some amount of messages from all shards.
    // So, this value:
    // - limits consumer channel length;
    // - limits maximum amount of messages, that will be read from one shard at
    // one read operation.
    // Bigger PrefetchCount got bigger memory usage, but can get better
    // performance.
    PrefetchCount int64

    // BLOCK option of XREADGROUP Redis command
    // https://redis.io/topics/streams-intro
    // Set it to other then 0 value only if you know what you do
    //
    // If you set this value greater then 0, Ami will use this value to block
    // XREADGROUP for this period.
    // Otherwise Ami will use default value - 1 second.
    // If you set this value lower then 0 - blocking will not be used.
    Block time.Duration

    // Limits maximum amount of ACK messages queue. Default 10000000.
    //
    // Bigger value got better ACK performance and bigger memory usage.
    // If you your process dies with big amount of ACKed messages, but not
    // already sended to Redis - ACKs will be lost and messages will be
    // processed again.
    PendingBufferSize int64

    // Request to Redis sended in pipe mode with setuped numbers of requests in
    // one batch. Default 50000.
    //
    // Bigger value get better performance.
    PipeBufferSize int64

    // If there is no full batch collected - pipe will be sended every setuped
    // period. Default time.Microsecond * 1000.
    PipePeriod time.Duration

    // If you set optional ErrorNotifier, you will receiving errors notifications
    // in interface function
    ErrorNotifier ErrorNotifier
}

ConsumerOptions - options for consumer client for Ami.

Optimal values for me is:

ShardsCount:       10,
PrefetchCount:     100,
PendingBufferSize: 10000000,
PipeBufferSize:    50000,
PipePeriod:        time.Microsecond * 1000,

type ErrorNotifier Uses

type ErrorNotifier interface {
    // Function is called for every error
    AmiError(error)
}

ErrorNotifier is the interface for receive error notifications

type Message Uses

type Message struct {
    Body   string // Message content, you interested in
    ID     string // ID of message in Redis stream
    Stream string // Redis stream name
    Group  string // Redis stream group name
}

Message from queue

type Producer Uses

type Producer struct {
    // contains filtered or unexported fields
}

Producer client for Ami.

Producer lifecycle is:

1. Get Producer object.

2. Application send messages with Producer object.

3. Close() - locks until all produced messages will be sent to Redis.

func NewProducer Uses

func NewProducer(opt ProducerOptions, ropt *redis.ClusterOptions) (*Producer, error)

NewProducer creates new producer client for Ami

Note, that you MUST set in ClusterOptions ReadTimeout and WriteTimeout to at least 30-60 seconds, if you set big PipeBufferSize (50000 in examples) and if you do producing big messages. Reason: one full buffer is send in one big-sized query to Redis. So this big query may don't be completed in time and will be retransmitted, may be forewer retransmitted. This is especially strictly for Producer, because it send big queries with big messages. And not so strictly for Consumer, because it send big queries with not so big messages (only ids of ACKed messages).

func (*Producer) Close Uses

func (p *Producer) Close()

Close queue client.

Function locks until all produced messages will be sent to Redis. If PendingBufferSize has huge value - Close can wait long time.

func (*Producer) Send Uses

func (p *Producer) Send(m string)

Send message.

Message not sended immediately, but pushed to send buffer and sended to Redis in other goroutine.

type ProducerOptions Uses

type ProducerOptions struct {
    // Queue name
    Name string

    // Shard queue along different Redis Cluster nodes. Default 10.
    //
    // Ami queues spreads along cluster by default Redis Cluster ability - shards.
    // Every queue has setuped number of streams with same name, but with
    // different shard number. So different streams are placed at different Redis
    // Cluster nodes.
    // So bigger value get better spreading of queue along cluster. But huge
    // value is not better idea - it got bigger memory usage. Normal value for
    // cluster with 5 masters and 5 slaves - from 5 to 10.
    // May be later will be added auto-sharding option to place queue on each
    // Redis Cluster node.
    // Shards count must have identical values in all producers and consumers of
    // this queue.
    ShardsCount int8

    // Limits maximum amount of ACK messages queue. Default 10000000.
    //
    // Bigger value got better ACK performance and bigger memory usage.
    // If you your process dies with big amount of ACKed messages, but not already
    // sended to Redis - ACKs will be lost and messages will be processed again.
    PendingBufferSize int64

    // Request to Redis sended in pipe mode with setuped numbers of requests in
    // one batch. Default 50000.
    //
    // Bigger value get better performance.
    PipeBufferSize int64

    // If there is no full batch collected - pipe will be sended every setuped
    // period. Default time.Microsecond * 1000.
    PipePeriod time.Duration

    // If you set optional ErrorNotifier, you will receiving errors notifications
    // in interface function
    ErrorNotifier ErrorNotifier
}

ProducerOptions - options for producer client for Ami

Optimal values for me is:

ShardsCount:       10,
PendingBufferSize: 10000000,
PipeBufferSize:    50000,
PipePeriod:        time.Microsecond * 1000,

Package ami imports 7 packages (graph) and is imported by 3 packages. Updated 2020-04-09. Refresh now. Tools for package owners.