rs

package module
v0.0.0-...-c75f188 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2023 License: MIT Imports: 18 Imported by: 1

README

RS

RS Is a message queue for redis streams

⚙ Installation

go get -u github.com/eininst/rs

⚡ Quickstart

cli := rs.New(rcli *redis.Client)

You can customize it all you want:

cli := rs.New(examples.GetRedis(), rs.Config{
    Sender: rs.SenderConfig{
    	//Evicts entries as long as the stream's length exceeds the specified threshold
        MaxLen: rs.Int64(100),
    },
})

Send a message

cli.Send("simple", rs.H{
    "title": "this a simple message",
})

cli.Send("test", rs.H{
    "something": "hello word",
})

cli.Send("order_status_change", rs.H{
    "order_id": 100,
})

Send a delay message

cli.SendWithDelay("simple", rs.H{
    "title": "this a delay message",
}, time.Second*10)

cli.SendWithTime("simple", rs.H{
    "title": "this a timing message",
}, time.Now().Add(time.Minute * 5))

Receive message

package main

import (
	"encoding/json"
	"github.com/eininst/flog"
	"github.com/eininst/rs"
	examples "github.com/eininst/rs/examples/redis"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
    cli := rs.New(examples.GetRedis(), rs.Config{
        //default configuration for receiving messages
        Receive: rs.ReceiveConfig{
            Work:       rs.Int(10),       //Per stream goroutine number,
            Timeout:    time.Second * 20, //Retry after timeout
            MaxRetries: rs.Int64(3),      //Max retries
            ReadCount:  rs.Int64(50),     //XReadGroup Count
            BlockTime:  time.Second * 20, //XReadGroup Block Time
        },
    })

    cli.Handler("simple", func(ctx *rs.Context) {
        defer ctx.Ack()
        flog.Info(ctx.JSON.Raw)
    })

    cli.Receive(rs.Rctx{
        Stream: "simple",
        Handler: func(ctx *rs.Context) {
            defer ctx.Ack()
            jstr, _ := json.Marshal(ctx.Msg.Values)
            flog.Info("received simple msg:", string(jstr))
        },
    })

    cli.Receive(rs.Rctx{
        Stream:     "test",
        Group:      "group1",
        MaxRetries: nil, //no retries
        Handler: func(ctx *rs.Context) {
            defer ctx.Ack()
            jstr, _ := json.Marshal(ctx.Msg.Values)
            flog.Info("received test msg:", string(jstr))
        },
    })

    cli.Receive(rs.Rctx{
        Stream:     "test",
        Group:      "group2",
        MaxRetries: nil, //no retries
        Handler: func(ctx *rs.Context) {
            defer ctx.Ack()
            jstr, _ := json.Marshal(ctx.Msg.Values)
            flog.Info("received test msg:", string(jstr))
        },
    })

    cli.Receive(rs.Rctx{
        Stream:     "order_status_change",
        Work:       rs.Int(20),
        Timeout:    time.Second * 120,
        MaxRetries: rs.Int64(6),
        Handler: func(ctx *rs.Context) {
	    defer ctx.Ack()
            orderId := ctx.Msg.Values["order_id"]
            flog.Info("received order_status_change msg:", orderId)
        },
    })

    cli.Listen()
}
2022/08/29 21:10:04 [RS] Stream "simple" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "test:group1" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "test:group2" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "order_status_change" working... # BlockTime=20s MaxRetries=6 ReadCount=50 Timeout=2m0s Work=20
2022/08/29 21:10:20 [INFO] receive.go:31 received simple msg: {"title":"this a simple message"}
2022/08/29 21:10:20 [INFO] receive.go:53 received test msg: {"something":"hello word"}
2022/08/29 21:10:20 [INFO] receive.go:42 received test msg: {"something":"hello word"}
2022/08/29 21:10:20 [INFO] receive.go:65 received order_status_change msg: 100

Graceful Shutdown

go func () {
    quit := make(chan os.Signal)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
    <-quit
    
    cli.Shutdown()
    flog.Info("Graceful Shutdown")
}()

cli.Listen()

See examples

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultPrefix     = "RS_"
	DefaultReceiveCfg = ReceiveConfig{
		Work:           Int(10),
		ReadCount:      Int64(20),
		BlockTime:      time.Second * 15,
		MaxRetries:     Int64(3),
		Timeout:        time.Second * 300,
		ZRangeInterval: time.Millisecond * 500,
	}
	DefaultSenderConfig = SenderConfig{MaxLen: Int64(2048)}
)

Functions

func Bool

func Bool(b bool) *bool

Bool 复制 bool 对象,并返回复制体的指针

func CronSend

func CronSend(spec string, stream string)

func Float32

func Float32(f float32) *float32

Float32 复制 float32 对象,并返回复制体的指针

func Float64

func Float64(f float64) *float64

Float64 复制 float64 对象,并返回复制体的指针

func Handler

func Handler(stream string, call Call, opts ...Option)

func Int

func Int(i int) *int

func Int32

func Int32(i int32) *int32

Int32 复制 int64 对象,并返回复制体的指针

func Int64

func Int64(i int64) *int64

Int64 复制 int64 对象,并返回复制体的指针

func Listen

func Listen()

func Receive

func Receive(rctx Rctx)

func Send

func Send(stream string, msg map[string]interface{}) error

func SendWithDelay

func SendWithDelay(stream string, msg map[string]interface{}, delay time.Duration) error

func SendWithTime

func SendWithTime(stream string, msg map[string]interface{}, datetime time.Time) error

func SetDefault

func SetDefault(rcli *redis.Client, configs ...Config)

func SetDefaultByClient

func SetDefaultByClient(client Client)

func Shutdown

func Shutdown()

func Start

func Start()

func String

func String(s string) *string

String 复制 string 对象,并返回复制体的指针

func Time

func Time(t time.Time) *time.Time

Time 复制 time.Time 对象,并返回复制体的指针

func ToMap

func ToMap(data interface{}) map[string]interface{}

Types

type Call

type Call func(ctx *Context)

type Client

type Client interface {
	Send(stream string, msg map[string]interface{}) error
	SendWithDelay(stream string, msg map[string]interface{}, delay time.Duration) error
	SendWithTime(stream string, msg map[string]interface{}, datetime time.Time) error
	CronSend(spec string, stream string)

	Handler(stream string, call Call, opts ...Option)
	Receive(rctx Rctx)
	Listen()
	Shutdown()
	Start()
}

func New

func New(rcli *redis.Client, configs ...Config) Client

type Config

type Config struct {
	Prefix  string        `json:"prefix"`
	Sender  SenderConfig  `json:"sender"`
	Receive ReceiveConfig `json:"receive"`
}

type Context

type Context struct {
	context.Context
	Stream     string
	Group      string
	ConsumerId string
	Msg        redis.XMessage
	JSON       gjson.Result
	Client     *redis.Client
	Delay      time.Duration
	Ack        func()
}

type H

type H map[string]interface{}

type Msg

type Msg struct {
	Stream string
	Body   map[string]interface{}
	MaxLen *int64
}

type Option

type Option struct {
	F func(o *Options)
}

func WithBlockTime

func WithBlockTime(blockTime time.Duration) Option

func WithGroup

func WithGroup(group string) Option

func WithMaxRetries

func WithMaxRetries(maxRetries int64) Option

func WithReadCount

func WithReadCount(readCount int64) Option

func WithTimeout

func WithTimeout(timeout time.Duration) Option

func WithWork

func WithWork(work int) Option

type Options

type Options struct {
	Group      string
	Work       *int
	ReadCount  *int64
	BlockTime  time.Duration
	MaxRetries *int64
	Timeout    time.Duration
}

func (*Options) Apply

func (o *Options) Apply(opts []Option)

type Rctx

type Rctx struct {
	Stream     string
	Group      string
	Work       *int
	ReadCount  *int64
	BlockTime  time.Duration
	MaxRetries *int64
	Timeout    time.Duration
	Handler    Call
}

type ReceiveConfig

type ReceiveConfig struct {
	Work           *int          `json:"work"`
	ReadCount      *int64        `json:"readCount"`
	BlockTime      time.Duration `json:"blockTime"`
	MaxRetries     *int64        `json:"maxRetries"`
	Timeout        time.Duration `json:"timeout"`
	ZRangeInterval time.Duration `json:"zRangeInterval"`
}

type SenderConfig

type SenderConfig struct {
	MaxLen *int64 `json:"maxLen"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL