redis

package
v0.0.0-...-f3f7f80 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2019 License: AGPL-3.0 Imports: 8 Imported by: 0

Documentation

Overview

redis wraps around github.com/gomodule/redigo/redis to provide convenience functions.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func TimeFromMillisecond

func TimeFromMillisecond(t int64) time.Time

func TimeToMillisecond

func TimeToMillisecond(t time.Time) int64

Types

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline wraps a Redis connection to provide pipelining utilities. It supports multiple concurrent callers.

func (*Pipeline) Close

func (p *Pipeline) Close() error

func (*Pipeline) Do

func (p *Pipeline) Do(callback func(interface{}, error), cmdName string, args ...interface{})

Add a command to pipeline. Note that callback will not receive connection errors, only redis errors. Callback can be nil, in which case it will be ignored. The callback will be called in order in a dedicated goroutine.

func (*Pipeline) Flush

func (p *Pipeline) Flush(ctx context.Context) error

Flush the pipeline and wait until all previous commands have returned.

type RedisResult

type RedisResult struct {
	Result interface{}
	Err    error
}

A wrapper class to provide a callback that collects the results.

func (*RedisResult) Save

func (r *RedisResult) Save(a interface{}, e error)

type RedisService

type RedisService struct {
	*redis.Pool

	// For redis keyspace scanning
	BatchSize    int
	Ratio        float64
	ErrorHandler func(error)
	// contains filtered or unexported fields
}

Redis service.

func DefaultRedisService

func DefaultRedisService(p *redis.Pool) *RedisService

Creates a Redis service with default settings.

func (*RedisService) DefaultTimer

func (r *RedisService) DefaultTimer(key string, handler func(context.Context, string) error) *RedisTimer

func (*RedisService) Do

func (s *RedisService) Do(cmdName string, args ...interface{}) (interface{}, error)

Do takes a connection from the pool and does the command.

func (*RedisService) DoContext

func (s *RedisService) DoContext(ctx context.Context, cmdName string, args ...interface{}) (interface{}, error)

DoContext takes a connection from the pool and does the command. Context deadline is respected.

func (*RedisService) EvalContext

func (r *RedisService) EvalContext(ctx context.Context, sc *Script, keys []string, data []interface{}) (interface{}, error)

Evaluates a Lua script.

func (*RedisService) NewPipeline

func (r *RedisService) NewPipeline(ctx context.Context) (*Pipeline, error)

Create a new pipeline. The pipeline must be closed after use.

func (*RedisService) ReadStream

func (r *RedisService) ReadStream(ctx context.Context, key string, lastId string) (<-chan *XREADMsg, <-chan error)

Continuously reads from a single Redis 5 stream until context is done. When the operation completes, an error is returned, and both channels will be closed. Be sure to read from the returned channels or the stream will block. Set lastId to "0" to read from beginning, or "$" to read new entries (racy).

func (*RedisService) ReadStreamGroup

func (r *RedisService) ReadStreamGroup(ctx context.Context, key string, group string, consumer string, sema <-chan struct{}) (<-chan *XREADMsg, <-chan error)

Reads from a consumer group in a stream, using sema as the semaphore. It receives struct{}{} as "tokens" and reads a message for every token.

func (*RedisService) ReadStreamOnce

func (r *RedisService) ReadStreamOnce(ctx context.Context, key string, lastId string, count int) ([]*XREADMsg, error)

Read up to <count> entries from a Redis 5 stream until context is done. If count==0, it reads as many entries as possible.

func (*RedisService) RunScanner

func (r *RedisService) RunScanner(ctx context.Context) error

func (*RedisService) ScanPrefix

func (s *RedisService) ScanPrefix(prefix string, handler func(string))

Handles a Redis prefix. The prefix must either be empty or end in a colon.

func (*RedisService) WithCache

func (s *RedisService) WithCache(ctx context.Context, key string, f func() ([]byte, time.Duration, error)) ([]byte, error)

WithCache queries redis for cached content before calling func.

type RedisTimer

type RedisTimer struct {
	Key       string
	Handler   func(context.Context, string) error
	Timeout   time.Duration
	Interval  time.Duration // Maximum interval between polls
	BatchSize int
	// contains filtered or unexported fields
}

Redis timer is designed around a ZSET. The score is used as Unix time in milliseconds. It polls a Redis ZSET to fetch latest messages. Instead of popping items out of ZSET, it modifies the score to some time in the future so items can't be lost. The item will be deleted automatically after the associated work is done.

func (*RedisTimer) Delete

func (t *RedisTimer) Delete(ctx context.Context, key string) error

Delete key.

func (*RedisTimer) Run

func (t *RedisTimer) Run(ctx context.Context) error

func (*RedisTimer) Schedule

func (t *RedisTimer) Schedule(ctx context.Context, key string, at time.Time) error

Schedule key at the specified time. If key already exists, it is rescheduled.

type Script

type Script struct {
	Script []byte
	SHA1   [20]byte
	Hash   string
}

Script wraps a Lua script to be used in Redis.

func NewScript

func NewScript(sc []byte) *Script

type XREAD

type XREAD struct {
	Key  string
	Msgs []*XREADMsg
}

func ParseXREAD

func ParseXREAD(data interface{}, err error) ([]*XREAD, error)

Parse XREAD and XREADGROUP result.

type XREADMsg

type XREADMsg struct {
	ID   string
	Data map[string]string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL