redis

package module
v0.0.0-...-a4910e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2023 License: MIT Imports: 5 Imported by: 6

README

A redis integrated gadget

Depends on redis 5.0.3

Examples

Store long-term data
r, err := redis.NewRediStore(10, "tcp", "127.0.0.1:6379", "")
if err != nil {
	t.Fatal(err)
}
defer r.Close()

id := 1
if err := r.PutJSON("testing", id, 0); err != nil {
	t.Fatal(err)
}
defer func(){
    if err := r.Delete("testing"); err != nil {
    	t.Fatal(err)
    }
}()

outId := 0
if err := r.ScanJSON("testing", &outId); err != nil {
	t.Fatal(err)
}
Store timed data
r, err := redis.NewRediStore(10, "tcp", "127.0.0.1:6379", "")
if err != nil {
	t.Fatal(err)
}
defer r.Close()

id := 1
timeout := 60 // seconds
if err := r.PutJSON("testing", id, timeout); err != nil {
	t.Fatal(err)
}

outId := 0
if err := r.ScanJSON("testing", &outId); err != nil {
	t.Fatal(err)
}
Distributed lock
r, err := redis.NewRediStore(10, "tcp", "127.0.0.1:6379", "")
if err != nil {
	t.Fatal(err)
}
defer r.Close()

key := fmt.Sprintf("%d", time.Now().UnixNano())
owner := "testing"
if err := r.Lock(key, owner, 60); err != nil{
    t.Fatal(err)
}
defer r.Unlock(key, owner)
Message queue
	streamName := "logs-stream"
	r, err := redis.NewRediStore(10, "tcp", "127.0.0.1:6379", "")
	if err != nil {
		log.Fatal(err)
	}
	defer r.Close()

	p := NewMsqProducer(r, streamName)
	if err := p.Put("msg title", []byte("msg body")); err != nil {
		log.Fatal(err)
	}

	delayOverdue := 5 * time.Minute
	consumer, err := NewMsqConsumer(context.TODO(),
		r,
		streamName,
        "0", // if you have a multiply reader, you need set a different client id.
		delayOrverdue,
	)
	if err != nil {
		log.Fatal(err)
	}

	handle := func(m *redis.MessageEntry) bool {
		// TODO: handle something
		//return false
		return true
	}

	// consume
    // for test, you can set the loop with times.
    for {
        if err := consumer.Next(handle); err != nil{
            log.Warn(errors.As(err))
            time.Sleep(time.Second)
        }
    }

License

MIT License for part of lock/msq, For others, please refer to:

https://github.com/gomodule/redigo/redis
https://github.com/garyburd/redigo
https://github.com/boj/redistore

Documentation

Overview

see PR: https://github.com/gomodule/redigo/pull/557/files

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNil       = redis.ErrNil
	ErrDataExist = errors.New("Data already exist")
)
View Source
var (
	ErrLocked = errors.New("Another client has locked")
)

Functions

This section is empty.

Types

type FieldEntry

type FieldEntry struct {
	Key   string
	Value []byte
}

func FieldEntries

func FieldEntries(replay interface{}, err error) ([]FieldEntry, error)

type MessageEntry

type MessageEntry struct {
	ID     string
	Fields []FieldEntry
}

func MessageEntries

func MessageEntries(reply interface{}, err error) ([]MessageEntry, error)

Need the replay is a message entry format.

type RediStore

type RediStore struct {
	Pool *redis.Pool
}

RediStore stores sessions in a redis backend.

func NewRediStore

func NewRediStore(size int, network, address, password string) (*RediStore, error)

NewRediStore returns a new RediStore. size: maximum number of idle connections.

func NewRediStoreWithDB

func NewRediStoreWithDB(size int, network, address, password, DB string) (*RediStore, error)

NewRediStoreWithDB - like NewRedisStore but accepts `DB` parameter to select redis DB instead of using the default one ("0")

func NewRediStoreWithPool

func NewRediStoreWithPool(pool *redis.Pool) (*RediStore, error)

NewRediStoreWithPool instantiates a RediStore with a *redis.Pool passed in.

func (*RediStore) Close

func (s *RediStore) Close() error

Close closes the underlying *redis.Pool

func (*RediStore) Delete

func (s *RediStore) Delete(key string) error

Delete removes the session from redis.

func (*RediStore) Do

func (s *RediStore) Do(cmd string, args ...interface{}) (interface{}, error)

func (*RediStore) Lock

func (s *RediStore) Lock(key, owner string, age time.Duration) error

func (*RediStore) PutJSON

func (s *RediStore) PutJSON(key string, val interface{}, age time.Duration) error

save stores the session in redis. store data with json format age -- 0 for no expired.

func (*RediStore) ScanJSON

func (s *RediStore) ScanJSON(key string, val interface{}) error

scan the data to a json interface.

func (*RediStore) ScanKey

func (s *RediStore) ScanKey(cursor int64, pattern string, limit int) (int64, [][]byte, error)

When the next corsor return is 0, it means that there is no more data

func (*RediStore) TryLock

func (s *RediStore) TryLock(key, owner string, age time.Duration) error

func (*RediStore) Unlock

func (s *RediStore) Unlock(key, owner string) error

func (*RediStore) XACK

func (s *RediStore) XACK(streamName, groupName, entryId string) (int, error)

func (*RediStore) XAdd

func (s *RediStore) XAdd(streamName string, key string, val interface{}, kv ...interface{}) (string, error)

for stream xadd return the system id

func (*RediStore) XClaim

func (s *RediStore) XClaim(streamName, groupName, entryId, toConsumerName string, overDuration time.Duration) error

transfer the timeout task to another consumer. XAUTOCLAIM Available since 6.2.0.

func (*RediStore) XCreateGroup

func (s *RediStore) XCreateGroup(streamName, groupName string) error

TODO: confirm the MKSTREAM in which versoin

func (*RediStore) XDel

func (s *RediStore) XDel(streamName, msgId string, otherIds ...interface{}) (int, error)

func (*RediStore) XDeleteGroupUser

func (s *RediStore) XDeleteGroupUser(streamName, groupName, consumerName string) error

func (*RediStore) XDestroyGroup

func (s *RediStore) XDestroyGroup(streamName, groupName string) error

func (*RediStore) XLen

func (s *RediStore) XLen(streamName string) (int, error)

func (*RediStore) XPending

func (s *RediStore) XPending(streamName, groupName string, limit int64) ([]interface{}, error)

get the pending task. Since version 6.2 it is possible to filter entries by their idle-time,

func (*RediStore) XReadGroup

func (s *RediStore) XReadGroup(streamName, groupName, consumerName string, limit int, timeout time.Duration) ([]StreamEntry, error)

for multi-consumer the group name is set as the stream name Need create first before using.

func (*RediStore) XTrim

func (s *RediStore) XTrim(streamName string, maxLen int) (int, error)

capped the stream size

type StreamEntry

type StreamEntry struct {
	StreamName string
	Messages   []MessageEntry
}

func StreamEntries

func StreamEntries(reply interface{}, err error) ([]StreamEntry, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL