redbytes

package module
v0.0.0-...-855f4fb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2019 License: MIT Imports: 7 Imported by: 0

README

redbytes

A Go based utility for using Redis as a streaming file server. (In this case "streaming" means being able to retrieve the partially uploaded sequence of bytes before the full set of bytes is done being written.)

History

While working at Orion Labs, I did an initial open-source implementation of a project similar to this one. After a third-party contractor accidentally made some licensed software public for a short while, we un-public-ed many of our repos in an overabundance of caution.

So, I thought I'd take another crack at it. See what I've learned in the meantime. To borrow a concept from martial arts, this project has sort of become my kata.

Only after starting in on this implementation did I find that someone had actually forked the original open source implementation, which is found here: https://github.com/adnaan/go-redis-bytestream. Since it was open and available, I've forked that into my own Github space, and frozen the repo for posterity: https://github.com/nelz9999/go-redis-bytestream.

Documentation

Index

Constants

View Source
const (

	// DefaultMaxChunkSize is the base defined width of each data field
	DefaultMaxChunkSize = 1024

	// DefaultStarveInterval is the default maximum amount of time
	// that a Read will wait between receipts of stream data before
	// giving up and returning ErrIncomplete.
	DefaultStarveInterval = time.Minute
)
View Source
const (
	// ErrKeyExists is returned when we are trying to write a key
	// that collides with existing data in Redis
	ErrKeyExists = ErrorString("redbytes: key already exists")

	// ErrNoStimulus is returned from NewRedisByteStreamReader when
	// neither PollInterval(...) nor Subscribe(...) were configured.
	ErrNoStimulus = ErrorString("redbytes: no read stimulus defined")
)

Variables

This section is empty.

Functions

func NewRedisByteStreamReader

func NewRedisByteStreamReader(parent context.Context, doer Doer, key string, opts ...ReadOption) ([]byte, io.ReadCloser, error)

NewRedisByteStreamReader is a blocking call that waits for positive sign that a stream with the given key exists, or times out waiting for such. On success, it returns any metadata information that the write-side defined, and an io.ReadCloser. Individual Read(...) calls also block while waiting for new information provided by the write-side, or until a timeout. If a timeout occurs waiting for the end of the stream, the Read(...) returns io.ErrUnexpectedEOF. Retries looking for updated stream data require some kind of stimulus, therefore at least one of Subscribe(...) or PollInterval(...) must be defined.

func NewRedisByteStreamWriter

func NewRedisByteStreamWriter(ctx context.Context, client Doer, key string, opts ...WriteOption) (io.WriteCloser, error)

NewRedisByteStreamWriter returns an io.WriteCloser that stores the written byte stream as a hash in Redis at the given key. This allows a receiver on the other end to get access to the data before the whole stream is finished being written, even if the write-side takes a while to finish. The Redis client is not closed upon call to the Close() method, so it should be safe to re-use the client afterwards, as well as concurrently (as long as no other operations are blocking on the client, like for a Subscribe/PSubscribe).

Types

type Doer

type Doer interface {
	Do(commandName string, args ...interface{}) (reply interface{}, err error)
}

Doer is a lightweight subset of the redigo/redis.Conn interface

type ErrorString

type ErrorString string

ErrorString is an adapter that allows a string to serve as an error

func (ErrorString) Error

func (es ErrorString) Error() string

Error conforms to the stdlib error interface

type ReadOption

type ReadOption func(ro *ReadOptions) error

ReadOption sets up some of the conditional behavior associated with reading a stream of bytes from Redis

func Lookahead

func Lookahead(count int) ReadOption

Lookahead sets the number of optimistic forward-looking chunks to be requested upon a fetch from Redis. This can reduce the overall number of round-trips to Redis over the life of a stream. If not set, no lookahead is used.

func PollInterval

func PollInterval(d time.Duration) ReadOption

PollInterval sets a maximum period between requests for more data when waiting on the given stream. At least one of Subscribe(...) or PollInterval(...) MUST be configured as read stimulus, both are acceptable and can coexists.

func StarveInterval

func StarveInterval(d time.Duration) ReadOption

StarveInterval sets the maximum amount of time to wait between successful receipts of data on the stream, after which it gives up an returns ErrIncomplete. If not set, the DefaultStarveInterval will be used. If you are only using PollInterval as read stimulus, this value should be greater than that interval.

func Subscribe

func Subscribe(client redis.Conn, channel string) ReadOption

Subscribe sets up a Redis PubSub subscription to the given channel, and listens on this channel for notifications that more information may be available for our given stream. The given client MUST necessarily be a different instance than the client given to read information from the stream, as a SUBSCRIBE puts the client into a long-lived connection to the server. At least one of Subscribe(...) or PollInterval(...) MUST be configured as read stimulus, both are acceptable and can coexists.

type ReadOptions

type ReadOptions struct {
	// contains filtered or unexported fields
}

ReadOptions hold the conditional aspects of setup for NewRedisByteStreamReader

type WriteOption

type WriteOption func(wo *WriteOptions) error

WriteOption sets up some of the conditional behavior associated with writing a stream of bytes to Redis

func Expires

func Expires(d time.Duration) WriteOption

Expires sets the given stream of data to expire out of Redis after the given amount of time (rounded to the nearest second). If not set, the stream will be written without an expiry time.

func MaxChunkSize

func MaxChunkSize(max int) WriteOption

MaxChunkSize sets the upper bound of how "wide" a single chunck can be when written to Redis as a hash field. This has implications on the read-side - if the read buffers are smaller than what is stored in a single field, they may receive an io.ErrShortBuffer. If not set, the value used is DefaultMaxChunkSize.

func Metadata

func Metadata(info []byte) WriteOption

Metadata stores arbitrary data with the stream of bytes being stored. For example, if you are using this in an HTTP context, maybe you want to know the mime type of the file? If not set, an empty value is used.

func PublishChannel

func PublishChannel(channel string) WriteOption

PublishChannel sets the Redis PubSub channel where notifications (with bodies of zero length) are sent to signify that new data is available in the stream. If not set, no PubSub notifications are emitted.

func PublishClient

func PublishClient(client Doer) WriteOption

PublishClient allows the caller to set a different Redis client for sending PubSub messages. This is probably only needed for Redis Cluster users, when the PublishChannel might get mapped to a different node than the base hash key. If not set, the initial Redis client will be used by default. Setting the PublishClient(...) alone is insufficient, the PublishChannel(...) must also be set to be effective.

type WriteOptions

type WriteOptions struct {
	// contains filtered or unexported fields
}

WriteOptions hold the conditional aspects of setup for NewRedisByteStreamWriter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL