rbs

package module
v0.0.0-...-c17f2a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2016 License: Apache-2.0 Imports: 8 Imported by: 0

README

go-redis-bytestream

Store and Retrieve streaming data in Redis via io.Reader and io.Writer utilities

Building and Testing

Pre-reqs

$ go get github.com/garyburd/redigo/redis
$ go get github.com/rafaeljusto/redigomock

Integration Tests

If you'd like to run the integration tests against an actual Redis server, you just set the host:port via the environment variable RBS_TEST_REDIS:

$ RBS_TEST_REDIS=localhost:6379 go test -v

Usage

Look in rbs_test.go to see more specifics. Outlined here is the basic intended usage.

Writing

name := "example_stream_name"
tx, _ := redis.Dial(...)

writer, err := rbs.NewWriter(tx, name, rbs.WriteStdPub())
if err != nil {...}
defer writer.Close()

var src io.Reader = ...

io.Copy(writer, src)

Reading

ctx := context.Background()
name := "example_stream_name"
rx, _ := redis.Dial(...)  // Transactional connection
ps, _ := redis.Dial(...)  // PubSub connection

reader, err := rbs.NewSyncReader(ctx, rbs.NewReader(rx, name), ps, rbs.SyncStdSub())
if err != nil {...}
defer reader.Close()

var dst io.Writer = ...

io.Copy(dst, reader)

Credits

(c) 2016 Orion Labs, Inc.

See AUTHORS.md for more details.

Documentation

Index

Constants

View Source
const DefaultMaxChunkSize uint16 = 2 ^ 10 // 1024

DefaultMaxChunkSize is 1kb

Variables

View Source
var ErrStarveEOF = errors.New("data not yet available")

ErrStarveEOF marks when we are returning a zero size buffer, in the cases where either we do not yet know the full size of the stream, or we do, and have not received that data yet.

Functions

func NewReader

func NewReader(conn redis.Conn, name string, options ...func(*RedReader)) io.ReadCloser

NewReader creates an io.ReadCloser that assembles a byte stream as stored in complete chunks on a Redis hash. The underlying implementation is a *RedReader, and is the intended counterpart to the *RedWriter.

The name parameter is used as the key for the Redis hash.

Configuration is by functional options, as inspired by this blog post: http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis

See: ReadLookahead

func NewSyncReader

func NewSyncReader(
	parent context.Context,
	r io.Reader,
	subs redis.Conn,
	options ...func(*SyncReader) error,
) (io.ReadCloser, error)

NewSyncReader creates an io.ReadCloser that reads bytes from an underlying io.Reader. However, if the underlying io.Reader returned an ErrStarveEOF, this implementation will block until receiving a Redis PubSub stimulus, whereupon it will attempt to read from the underlying io.Reader again.

The intendend usage for SyncReader is to be used with a RedReader (or any io.Reader that sends the same ErrStarveEOF), but it would function as a simple pass through for any other io.Reader.

This accepts a Context object, so a client can set a timeout, or cancel reading mid-stream.

The passed-in Redis connection *MUST* be different the one used in the underlying RedReader, as this pub/sub subscription cannot also handle concurrent transactional needs.

The client is expected to provide (at least) one pub/sub channel upon which this implementation will listen for syncronization events.

Configuration is by functional options, as inspired by this blog post: http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis

See: SyncSub, SyncStdSub, SyncStarve

func NewWriter

func NewWriter(conn redis.Conn, name string, options ...func(*RedWriter) error) (io.WriteCloser, error)

NewWriter creates an io.WriteCloser that will write byte chunks to Redis. The underlying implementation is a *RedWriter.

The intendend usage for this is a growing stream of bytes where the full length is not known until the stream is closed. (Though, this does not preclude the ability to be used for streams of known/fixed length.)

A single Redis hash is the sole underlying data structure. The name parameter is used as the Redis key, and each chunk of bytes is stored as a field+value pair on the hash. This implementation reserves all field names starting with "c:" (for chunk), but is open to clients using any arbitrary other field names on this hash.

Network hits are minimized by using Redis pipelining. Per Write invocation, the buffer is apportioned into configurable-sized chunks, any trailers (arbitrary Redis command configured by the client) are assembled, and all of it is written out to Redis at once.

Upon Close, if >0 bytes had been written, a last pipeline is constructed, writing out any buffered data, an end-of-stream marker, and any configured trailers.

The default configuration has a 1024-byte maximum chunk size, and a 0-byte minimum chunk size, and no trailers. Clients will want to adjust these parameters to their use cases. (Some factors that may be considered in this tuning: desired ingress/egress data rates; Redis tuning; etc.)

Configuration is by functional options, as inspired by this blog post: http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis

See: WriteExpire, WriteMaxChunk, WriteMinChunk, WriteStdPub, WriteTrailer

func ReadLookahead

func ReadLookahead(peek uint8) func(*RedReader)

ReadLookahead sets the number of chunks to optimistically look for when hitting the network doing a single fetch from Redis

func SyncStarve

func SyncStarve(dur time.Duration) func(*SyncReader) error

SyncStarve sets the maximum amount of time to wait between stimulus/Read attempts. If this timeout is tripped, the reader will return io.ErrUnexpectedEOF

func SyncStdSub

func SyncStdSub() func(*SyncReader) error

SyncStdSub sets up the "standard" channel subscription. This is the Read-side counterpart to WriteStdPub

func SyncSub

func SyncSub(pubSubChan interface{}) func(*SyncReader) error

SyncSub allows a client to add an arbitrary Redis PubSub channel to listen to for stimulus

func WriteExpire

func WriteExpire(sec uint16) func(*RedWriter) error

WriteExpire sends a Redis EXPIRE command for the underlying hash upon each pipeline flush

func WriteMaxChunk

func WriteMaxChunk(max uint16) func(*RedWriter) error

WriteMaxChunk overrides the default maximum size per chunk written to Redis

func WriteMinChunk

func WriteMinChunk(min uint16) func(*RedWriter) error

WriteMinChunk sets the minimums size per chunk written to Redis

func WriteStdPub

func WriteStdPub() func(*RedWriter) error

WriteStdPub sends a standard Redis PUBLISH message for this hash on every pipeline flush. This is inteneded to be used in concert with a standard read-side subscription for synchronizing when there are new bytes to be read.

func WriteTrailer

func WriteTrailer(c CmdBuilder) func(*RedWriter) error

WriteTrailer provides clients a way to configure arbitrary Redis commands to be included per pipeline flush

Types

type CmdBuilder

type CmdBuilder interface {
	Build() (string, []interface{}, error)
}

A CmdBuilder spits out the parameters that are used for invocation of a Redis command

type CmdBuilderFunc

type CmdBuilderFunc func() (string, []interface{}, error)

CmdBuilderFunc is a function adapter for the CmdBuilder interface

func (CmdBuilderFunc) Build

func (f CmdBuilderFunc) Build() (string, []interface{}, error)

Build conforms to the CmdBuilder interface

type RedReader

type RedReader struct {
	// contains filtered or unexported fields
}

RedReader is the implementation that retrieves a variable-length stream of bytes from a Redis hash, as written by its RedWriter counterpart

func (*RedReader) Close

func (rr *RedReader) Close() error

Close conforms to the io.Closer interface

This will also close the underlying Redis connection

func (*RedReader) Read

func (rr *RedReader) Read(p []byte) (int, error)

Read conforms to the io.Reader interface

It is recommended that the buffer p is at least as big as ((lookahead + 1) X maxChunkSize), otherwise you run the risk of receiving an io.ErrShortBuffer error.

(Receiving a io.ErrShortBuffer is not necessarily a fatal condition: the Read will try to "keep its place" in the stream and delivery what bytes it can per invocation.)

This implementation has one very specific behavior: if the end-of-stream marker has not yet been written, but there are currently no bytes to deliver, the error ErrStarveEOF is returned.

If the byte stream is already fully written, or would be by the Read's get to the end-of-stream marker, this acts just like any other io.Reader

type RedWriter

type RedWriter struct {
	// contains filtered or unexported fields
}

An RedWriter is the implementation that stores a variable-length stream of bytes into a Redis hash.

func (*RedWriter) Close

func (rw *RedWriter) Close() error

Close conforms to the io.Closer interface.

It is necessary to write out any buffered (leftover) bytes, and to add the end-of-stream marker

This finishes by calling Close on the underlyling Redis connection.

func (*RedWriter) Write

func (rw *RedWriter) Write(p []byte) (int, error)

Write conforms to the io.Writer interface.

This is where the incoming buffer is apportioned into chunks, and written out to Redis via a pipeline.

There should be at most 1 hit to Redis per Write invocation. (Only fewer if a chunk is less than the configured minimum chunk size.)

type SyncReader

type SyncReader struct {
	// contains filtered or unexported fields
}

A SyncReader is the implementation that reads from an underlying io.Reader, and will block then retry upon stimulus if the underlying Reader returned ErrStarveEOF.

func (*SyncReader) Close

func (sr *SyncReader) Close() error

Close conforms to the io.Closer interface

This will close the passed in Redis connection AND the passed in Reader if it also implements io.Closer

func (*SyncReader) Read

func (sr *SyncReader) Read(p []byte) (n int, err error)

Read conforms to the io.Reader interface

This will pass through any results from the underlying Read, unless we recieved an ErrStarveEOF, whereupon it will block and re-attempt a Read when it receives pub/sub stimulus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL