Documentation ¶
Overview ¶
Example ¶
var ( EVN_REDIS_SERVERS = strings.Split(os.Getenv("REDIS_SERVERS"), ",") ) if len(EVN_REDIS_SERVERS) == 0 { EVN_REDIS_SERVERS = []string{"127.0.0.1:6379"} } // register consumer group { admin, err := redis.NewAdminClient(&redis.UniversalOptions{ Addrs: __TEST_REDIS_SERVERS, DB: 0, }) if err != nil { panic(err) } defer func() { defer admin.Close() /* XGROUP DESTROY gotestStream gotestGroup */*/ _, err = admin.DeleteConsumerGroup("gotestStream", "gotestGroup") if err != nil { panic(err) } /* DEL gotestStream */*/ _, err = admin.Handle().Del("gotestStream").Result() if err != nil { panic(err) } }() /* XGROUP CREATE gotestStream gotestGroup $ MKSTREAM */*/ _, err = admin.CreateConsumerGroupAndStream("gotestStream", "gotestGroup", redis.StreamLastDeliveredID) if err != nil { panic(err) } } // publish { conf := redis.ProducerConfig{ UniversalOptions: &redis.UniversalOptions{ Addrs: __TEST_REDIS_SERVERS, DB: 0, }, } p, err := redis.NewProducer(&conf) if err != nil { panic(err) } defer p.Close() // produce message { publishMessages := []map[string]interface{}{ {"name": "luffy", "age": 19}, {"name": "nami", "age": 21}, {"name": "zoro", "age": 21}, } for _, message := range publishMessages { reply, err := p.Write("gotestStream", message) if err != nil { panic(err) } _ = reply fmt.Printf("ID: %s\n", reply) } } } // subscribe { // the config only for test use !! opt := redis.UniversalOptions{ Addrs: __TEST_REDIS_SERVERS, DB: 0, } c := &redis.Consumer{ Group: "gotestGroup", Name: "gotestConsumer", RedisOption: &opt, MaxInFlight: 1, MaxPollingTimeout: 10 * time.Millisecond, ClaimMinIdleTime: 30 * time.Millisecond, IdlingTimeout: 2000 * time.Millisecond, ClaimSensitivity: 2, ClaimOccurrenceRate: 2, MessageHandler: func(message *redis.Message) { fmt.Printf("Message on %s: %v\n", message.Stream, message.XMessage) message.Ack() message.Del() }, RedisErrorHandler: func(err error) (disposed bool) { fmt.Println(err) return true }, } err := c.Subscribe( redis.Stream("gotestStream"), ) if err != nil { panic(err) } ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) select { case <-ctx.Done(): c.Close() break } }
Output:
Index ¶
- Constants
- func DefaultLogger() *log.Logger
- type AdminClient
- func (c *AdminClient) Close() error
- func (c *AdminClient) CreateConsumerGroup(stream, group, offset string) (string, error)
- func (c *AdminClient) CreateConsumerGroupAndStream(stream, group, offset string) (string, error)
- func (c *AdminClient) DeleteConsumer(stream, group, consumer string) (int64, error)
- func (c *AdminClient) DeleteConsumerGroup(stream, group string) (int64, error)
- func (c *AdminClient) Handle() redis.UniversalClient
- func (c *AdminClient) SetConsumerGroupOffset(stream, group, offset string) (string, error)
- type Consumer
- type ConsumerClient
- type ConsumerError
- type CyclicCounter
- type DecodeMessageContentOption
- type DecodeMessageContentOptionFunc
- type DecodeMessageContentSetting
- type Forwarder
- type ForwarderRunner
- type Message
- type MessageContent
- type MessageDelegate
- type MessageHandleProc
- type MessageState
- func (s *MessageState) Del(name string) interface{}
- func (s *MessageState) Has(name string) bool
- func (s *MessageState) Len() int
- func (s *MessageState) Set(name string, value interface{}) (old interface{}, err error)
- func (s *MessageState) SetString(name string, value interface{}) (old interface{}, err error)
- func (s *MessageState) Value(name string) interface{}
- func (s *MessageState) Visit(visit func(name string, value interface{}))
- type ProduceMessageContentOption
- type ProduceMessageIDOption
- type ProduceMessageOption
- type Producer
- func (p *Producer) Close()
- func (p *Producer) Handle() redis.UniversalClient
- func (p *Producer) Write(stream string, values map[string]interface{}, opts ...ProduceMessageOption) (string, error)
- func (p *Producer) WriteContent(stream string, msg *MessageContent, opts ...ProduceMessageOption) (string, error)
- type ProducerConfig
- type RedisError
- type RedisErrorHandleProc
- type Stream
- type StreamOffset
- type StreamOffsetInfo
- type UniversalClient
- type UniversalOptions
- type XMessage
- type XStream
Examples ¶
Constants ¶
View Source
const ( StreamAsteriskID string = "*" StreamLastDeliveredID string = "$" StreamZeroID string = "0" StreamZeroOffset string = "0" StreamNeverDeliveredOffset string = ">" StreamUnspecifiedOffset string = "" Nil = redis.Nil LOGGER_PREFIX string = "[lib-redis-stream] " MAX_PENDING_FETCHING_SIZE int64 = 4096 MIN_PENDING_FETCHING_SIZE int64 = 16 PENDING_FETCHING_SIZE_COEFFICIENT int64 = 3 )
View Source
const ( MESSAGE_STATE_NAME_MAX_LENGTH = 255 MESSAGE_STATE_VALUE_MAX_SIZE = 0x0fff )
Variables ¶
This section is empty.
Functions ¶
func DefaultLogger ¶
Types ¶
type AdminClient ¶
type AdminClient struct {
// contains filtered or unexported fields
}
func NewAdminClient ¶
func NewAdminClient(opt *UniversalOptions) (*AdminClient, error)
func (*AdminClient) Close ¶
func (c *AdminClient) Close() error
func (*AdminClient) CreateConsumerGroup ¶
func (c *AdminClient) CreateConsumerGroup(stream, group, offset string) (string, error)
func (*AdminClient) CreateConsumerGroupAndStream ¶
func (c *AdminClient) CreateConsumerGroupAndStream(stream, group, offset string) (string, error)
func (*AdminClient) DeleteConsumer ¶
func (c *AdminClient) DeleteConsumer(stream, group, consumer string) (int64, error)
func (*AdminClient) DeleteConsumerGroup ¶
func (c *AdminClient) DeleteConsumerGroup(stream, group string) (int64, error)
func (*AdminClient) Handle ¶
func (c *AdminClient) Handle() redis.UniversalClient
func (*AdminClient) SetConsumerGroupOffset ¶
func (c *AdminClient) SetConsumerGroupOffset(stream, group, offset string) (string, error)
type Consumer ¶
type Consumer struct { Group string Name string RedisOption *redis.UniversalOptions MaxInFlight int64 MaxPollingTimeout time.Duration ClaimMinIdleTime time.Duration IdlingTimeout time.Duration // 若沒有任何訊息時等待多久 ClaimSensitivity int // Read 時取得的訊息數小於 n 的話, 執行 Claim ClaimOccurrenceRate int32 // Read 每執行 n 次後 執行 Claim 1 次 MessageHandler MessageHandleProc RedisErrorHandler RedisErrorHandleProc Logger *log.Logger // contains filtered or unexported fields }
func (*Consumer) Subscribe ¶
func (c *Consumer) Subscribe(streams ...StreamOffsetInfo) error
type ConsumerClient ¶
type ConsumerClient struct { Group string Name string RedisOption *redis.UniversalOptions // contains filtered or unexported fields }
type ConsumerError ¶
type ConsumerError struct {
// contains filtered or unexported fields
}
func (*ConsumerError) Error ¶
func (e *ConsumerError) Error() string
func (*ConsumerError) IsRedisError ¶
func (e *ConsumerError) IsRedisError() bool
func (*ConsumerError) Unwrap ¶
func (e *ConsumerError) Unwrap() error
type CyclicCounter ¶
type CyclicCounter struct {
// contains filtered or unexported fields
}
type DecodeMessageContentOption ¶ added in v0.2.1
type DecodeMessageContentOption interface {
// contains filtered or unexported methods
}
func WithMessageStateKeyPrefix ¶ added in v0.2.1
func WithMessageStateKeyPrefix(prefix string) DecodeMessageContentOption
------------------------------
type DecodeMessageContentOptionFunc ¶ added in v0.2.1
type DecodeMessageContentOptionFunc func(setting *DecodeMessageContentSetting)
type DecodeMessageContentSetting ¶ added in v0.2.1
type DecodeMessageContentSetting struct {
MessageStateKeyPrefix string
}
type Forwarder ¶
type Forwarder struct {
*Producer
}
func NewForwarder ¶
func NewForwarder(config *ProducerConfig) (*Forwarder, error)
func (*Forwarder) Runner ¶
func (f *Forwarder) Runner() *ForwarderRunner
type ForwarderRunner ¶
type ForwarderRunner struct {
// contains filtered or unexported fields
}
func (*ForwarderRunner) Start ¶
func (r *ForwarderRunner) Start()
func (*ForwarderRunner) Stop ¶
func (r *ForwarderRunner) Stop()
type Message ¶
type Message struct { *XMessage ConsumerGroup string Stream string Delegate MessageDelegate // contains filtered or unexported fields }
func (*Message) Content ¶
func (m *Message) Content(opts ...DecodeMessageContentOption) *MessageContent
func (*Message) HasResponded ¶
type MessageContent ¶
type MessageContent struct { State MessageState Values map[string]interface{} }
func DecodeMessageContent ¶
func DecodeMessageContent(container map[string]interface{}, opts ...DecodeMessageContentOption) *MessageContent
func NewMessageContent ¶
func NewMessageContent() *MessageContent
func (*MessageContent) WriteTo ¶
func (c *MessageContent) WriteTo(container map[string]interface{})
type MessageDelegate ¶
type MessageState ¶
type MessageState struct {
// contains filtered or unexported fields
}
func (*MessageState) Del ¶
func (s *MessageState) Del(name string) interface{}
func (*MessageState) Has ¶
func (s *MessageState) Has(name string) bool
func (*MessageState) Len ¶
func (s *MessageState) Len() int
func (*MessageState) Set ¶
func (s *MessageState) Set(name string, value interface{}) (old interface{}, err error)
func (*MessageState) SetString ¶
func (s *MessageState) SetString(name string, value interface{}) (old interface{}, err error)
func (*MessageState) Value ¶
func (s *MessageState) Value(name string) interface{}
func (*MessageState) Visit ¶
func (s *MessageState) Visit(visit func(name string, value interface{}))
type ProduceMessageContentOption ¶
type ProduceMessageContentOption func(msg *MessageContent) error
func WithTracePropagation ¶
func WithTracePropagation(ctx context.Context, propagator propagation.TextMapPropagator) ProduceMessageContentOption
type ProduceMessageIDOption ¶
func WithMessageID ¶
func WithMessageID(id string) ProduceMessageIDOption
type ProduceMessageOption ¶
type ProduceMessageOption interface {
// contains filtered or unexported methods
}
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(config *ProducerConfig) (*Producer, error)
func (*Producer) Handle ¶
func (p *Producer) Handle() redis.UniversalClient
func (*Producer) WriteContent ¶
func (p *Producer) WriteContent(stream string, msg *MessageContent, opts ...ProduceMessageOption) (string, error)
type ProducerConfig ¶
type ProducerConfig struct { *UniversalOptions Logger *log.Logger }
type RedisError ¶
type RedisError interface {
RedisError()
}
type Stream ¶
type Stream string
func (Stream) NeverDeliveredOffset ¶
func (s Stream) NeverDeliveredOffset() StreamOffset
func (Stream) Offset ¶
func (s Stream) Offset(offset string) StreamOffset
func (Stream) Zero ¶
func (s Stream) Zero() StreamOffset
type StreamOffset ¶
type StreamOffsetInfo ¶
type StreamOffsetInfo interface {
// contains filtered or unexported methods
}
type UniversalClient ¶
type UniversalClient = redis.UniversalClient
type UniversalOptions ¶
type UniversalOptions = redis.UniversalOptions
Source Files ¶
- adminClient.go
- clientMessageDelegate.go
- consumer.go
- consumerClient.go
- consumerError.go
- cyclicCounter.go
- decodeMessageContentOption.go
- def.go
- forwarder.go
- forwarderRunner.go
- message.go
- messageContent.go
- messageState.go
- noCopy.go
- produceMessageContentOptions.go
- produceMessageIDOptions.go
- producer.go
- producerConfig.go
- stream.go
- streamOffset.go
- util.go
Click to show internal directories.
Click to hide internal directories.