redis

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2023 License: MIT Imports: 14 Imported by: 1

Documentation

Overview

Example
var (
	EVN_REDIS_SERVERS = strings.Split(os.Getenv("REDIS_SERVERS"), ",")
)
if len(EVN_REDIS_SERVERS) == 0 {
	EVN_REDIS_SERVERS = []string{"127.0.0.1:6379"}
}

// register consumer group
{
	admin, err := redis.NewAdminClient(&redis.UniversalOptions{
		Addrs: __TEST_REDIS_SERVERS,
		DB:    0,
	})
	if err != nil {
		panic(err)
	}
	defer func() {
		defer admin.Close()

		/*
			XGROUP DESTROY gotestStream gotestGroup
		*/*/
		_, err = admin.DeleteConsumerGroup("gotestStream", "gotestGroup")
		if err != nil {
			panic(err)
		}

		/*
			DEL gotestStream
		*/*/
		_, err = admin.Handle().Del("gotestStream").Result()
		if err != nil {
			panic(err)
		}
	}()

	/*
		XGROUP CREATE gotestStream gotestGroup $ MKSTREAM
	*/*/
	_, err = admin.CreateConsumerGroupAndStream("gotestStream", "gotestGroup", redis.StreamLastDeliveredID)
	if err != nil {
		panic(err)
	}
}

// publish
{
	conf := redis.ProducerConfig{
		UniversalOptions: &redis.UniversalOptions{
			Addrs: __TEST_REDIS_SERVERS,
			DB:    0,
		},
	}
	p, err := redis.NewProducer(&conf)
	if err != nil {
		panic(err)
	}
	defer p.Close()

	// produce message
	{
		publishMessages := []map[string]interface{}{
			{"name": "luffy", "age": 19},
			{"name": "nami", "age": 21},
			{"name": "zoro", "age": 21},
		}

		for _, message := range publishMessages {
			reply, err := p.Write("gotestStream", message)
			if err != nil {
				panic(err)
			}
			_ = reply
			fmt.Printf("ID: %s\n", reply)
		}
	}
}

// subscribe
{
	// the config only for test use !!
	opt := redis.UniversalOptions{
		Addrs: __TEST_REDIS_SERVERS,
		DB:    0,
	}

	c := &redis.Consumer{
		Group:               "gotestGroup",
		Name:                "gotestConsumer",
		RedisOption:         &opt,
		MaxInFlight:         1,
		MaxPollingTimeout:   10 * time.Millisecond,
		ClaimMinIdleTime:    30 * time.Millisecond,
		IdlingTimeout:       2000 * time.Millisecond,
		ClaimSensitivity:    2,
		ClaimOccurrenceRate: 2,
		MessageHandler: func(message *redis.Message) {
			fmt.Printf("Message on %s: %v\n", message.Stream, message.XMessage)
			message.Ack()
			message.Del()
		},
		RedisErrorHandler: func(err error) (disposed bool) {
			fmt.Println(err)
			return true
		},
	}

	err := c.Subscribe(
		redis.Stream("gotestStream"),
	)
	if err != nil {
		panic(err)
	}

	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

	select {
	case <-ctx.Done():
		c.Close()
		break
	}
}
Output:

Index

Examples

Constants

View Source
const (
	StreamAsteriskID           string = "*"
	StreamLastDeliveredID      string = "$"
	StreamZeroID               string = "0"
	StreamZeroOffset           string = "0"
	StreamNeverDeliveredOffset string = ">"
	StreamUnspecifiedOffset    string = ""

	Nil = redis.Nil

	LOGGER_PREFIX string = "[lib-redis-stream] "

	MAX_PENDING_FETCHING_SIZE         int64 = 4096
	MIN_PENDING_FETCHING_SIZE         int64 = 16
	PENDING_FETCHING_SIZE_COEFFICIENT int64 = 3
)
View Source
const (
	MESSAGE_STATE_NAME_MAX_LENGTH = 255
	MESSAGE_STATE_VALUE_MAX_SIZE  = 0x0fff
)

Variables

This section is empty.

Functions

func DefaultLogger

func DefaultLogger() *log.Logger

Types

type AdminClient

type AdminClient struct {
	// contains filtered or unexported fields
}

func NewAdminClient

func NewAdminClient(opt *UniversalOptions) (*AdminClient, error)

func (*AdminClient) Close

func (c *AdminClient) Close() error

func (*AdminClient) CreateConsumerGroup

func (c *AdminClient) CreateConsumerGroup(stream, group, offset string) (string, error)

func (*AdminClient) CreateConsumerGroupAndStream

func (c *AdminClient) CreateConsumerGroupAndStream(stream, group, offset string) (string, error)

func (*AdminClient) DeleteConsumer

func (c *AdminClient) DeleteConsumer(stream, group, consumer string) (int64, error)

func (*AdminClient) DeleteConsumerGroup

func (c *AdminClient) DeleteConsumerGroup(stream, group string) (int64, error)

func (*AdminClient) Handle

func (c *AdminClient) Handle() redis.UniversalClient

func (*AdminClient) SetConsumerGroupOffset

func (c *AdminClient) SetConsumerGroupOffset(stream, group, offset string) (string, error)

type Consumer

type Consumer struct {
	Group               string
	Name                string
	RedisOption         *redis.UniversalOptions
	MaxInFlight         int64
	MaxPollingTimeout   time.Duration
	ClaimMinIdleTime    time.Duration
	IdlingTimeout       time.Duration // 若沒有任何訊息時等待多久
	ClaimSensitivity    int           // Read 時取得的訊息數小於 n 的話, 執行 Claim
	ClaimOccurrenceRate int32         // Read 每執行 n 次後 執行 Claim 1 次
	MessageHandler      MessageHandleProc
	RedisErrorHandler   RedisErrorHandleProc
	Logger              *log.Logger
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(streams ...StreamOffsetInfo) error

type ConsumerClient

type ConsumerClient struct {
	Group       string
	Name        string
	RedisOption *redis.UniversalOptions
	// contains filtered or unexported fields
}

type ConsumerError

type ConsumerError struct {
	// contains filtered or unexported fields
}

func (*ConsumerError) Error

func (e *ConsumerError) Error() string

func (*ConsumerError) IsRedisError

func (e *ConsumerError) IsRedisError() bool

func (*ConsumerError) Unwrap

func (e *ConsumerError) Unwrap() error

type CyclicCounter

type CyclicCounter struct {
	// contains filtered or unexported fields
}

type DecodeMessageContentOption added in v0.2.1

type DecodeMessageContentOption interface {
	// contains filtered or unexported methods
}

func WithMessageStateKeyPrefix added in v0.2.1

func WithMessageStateKeyPrefix(prefix string) DecodeMessageContentOption

------------------------------

type DecodeMessageContentOptionFunc added in v0.2.1

type DecodeMessageContentOptionFunc func(setting *DecodeMessageContentSetting)

type DecodeMessageContentSetting added in v0.2.1

type DecodeMessageContentSetting struct {
	MessageStateKeyPrefix string
}

type Forwarder

type Forwarder struct {
	*Producer
}

func NewForwarder

func NewForwarder(config *ProducerConfig) (*Forwarder, error)

func (*Forwarder) Runner

func (f *Forwarder) Runner() *ForwarderRunner

type ForwarderRunner

type ForwarderRunner struct {
	// contains filtered or unexported fields
}

func (*ForwarderRunner) Start

func (r *ForwarderRunner) Start()

func (*ForwarderRunner) Stop

func (r *ForwarderRunner) Stop()

type Message

type Message struct {
	*XMessage

	ConsumerGroup string
	Stream        string
	Delegate      MessageDelegate
	// contains filtered or unexported fields
}

func (*Message) Ack

func (m *Message) Ack()

func (*Message) Clone

func (m *Message) Clone() *Message

func (*Message) Content

func (m *Message) Content(opts ...DecodeMessageContentOption) *MessageContent

func (*Message) Del

func (m *Message) Del()

func (*Message) HasResponded

func (m *Message) HasResponded() bool

type MessageContent

type MessageContent struct {
	State  MessageState
	Values map[string]interface{}
}

func DecodeMessageContent

func DecodeMessageContent(container map[string]interface{}, opts ...DecodeMessageContentOption) *MessageContent

func NewMessageContent

func NewMessageContent() *MessageContent

func (*MessageContent) WriteTo

func (c *MessageContent) WriteTo(container map[string]interface{})

type MessageDelegate

type MessageDelegate interface {
	OnAck(msg *Message)
	OnDel(msg *Message)
}

type MessageHandleProc

type MessageHandleProc func(message *Message)

func

type MessageState

type MessageState struct {
	// contains filtered or unexported fields
}

func (*MessageState) Del

func (s *MessageState) Del(name string) interface{}

func (*MessageState) Has

func (s *MessageState) Has(name string) bool

func (*MessageState) Len

func (s *MessageState) Len() int

func (*MessageState) Set

func (s *MessageState) Set(name string, value interface{}) (old interface{}, err error)

func (*MessageState) SetString

func (s *MessageState) SetString(name string, value interface{}) (old interface{}, err error)

func (*MessageState) Value

func (s *MessageState) Value(name string) interface{}

func (*MessageState) Visit

func (s *MessageState) Visit(visit func(name string, value interface{}))

type ProduceMessageContentOption

type ProduceMessageContentOption func(msg *MessageContent) error

type ProduceMessageIDOption

type ProduceMessageIDOption func(id string) string

func WithMessageID

func WithMessageID(id string) ProduceMessageIDOption

type ProduceMessageOption

type ProduceMessageOption interface {
	// contains filtered or unexported methods
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(config *ProducerConfig) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Handle

func (p *Producer) Handle() redis.UniversalClient

func (*Producer) Write

func (p *Producer) Write(stream string, values map[string]interface{}, opts ...ProduceMessageOption) (string, error)

func (*Producer) WriteContent

func (p *Producer) WriteContent(stream string, msg *MessageContent, opts ...ProduceMessageOption) (string, error)

type ProducerConfig

type ProducerConfig struct {
	*UniversalOptions

	Logger *log.Logger
}

type RedisError

type RedisError interface {
	RedisError()
}

type RedisErrorHandleProc

type RedisErrorHandleProc func(err error) (disposed bool)

func

type Stream

type Stream string

func (Stream) NeverDeliveredOffset

func (s Stream) NeverDeliveredOffset() StreamOffset

func (Stream) Offset

func (s Stream) Offset(offset string) StreamOffset

func (Stream) Zero

func (s Stream) Zero() StreamOffset

type StreamOffset

type StreamOffset struct {
	Stream string
	Offset string
}

type StreamOffsetInfo

type StreamOffsetInfo interface {
	// contains filtered or unexported methods
}

type UniversalClient

type UniversalClient = redis.UniversalClient

type UniversalOptions

type UniversalOptions = redis.UniversalOptions

type XMessage

type XMessage = redis.XMessage

type XStream

type XStream = redis.XStream

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL