core

package
v0.0.0-...-4075349 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2016 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package core is responsible for actually communicating with redis and providing an abstraction for the data stored in it

This package is not stable! At present it is only intended to be used by other components in bananaq

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound = errors.New("not found")
)

Various errors this package may return

Functions

This section is empty.

Types

type Core

type Core struct {
	// contains filtered or unexported fields
}

Core contains all the information needed to interact with the underlying redis instances for bananaq. All methods on Core are thread-safe, except Run which should only be run by a single goroutine at any time.

func New

func New(cmder util.Cmder, o *Opts) *Core

New initializes a new Core instance based on the given Cmder (which may be a *pool.Pool or *cluster.Cluster) and extra options (which may be nil). Run must be called in order to actually use the Core

func (*Core) GetEvent

func (c *Core) GetEvent(id ID) (Event, error)

GetEvent returns the event identified by the given ID, or ErrNotFound if it's expired or never existed

func (*Core) KeyNotify

func (c *Core) KeyNotify(k Key)

KeyNotify will notify all processes currently waiting on the given Key using KeyWait

func (*Core) KeyScan

func (c *Core) KeyScan(k Key) ([]Key, error)

KeyScan returns all the Keys matching the given Key pattern. At least one field in the given Key should be a "*"

func (*Core) KeyWait

func (c *Core) KeyWait(k Key, stopCh <-chan struct{}) <-chan struct{}

KeyWait returns a channel which will be closed when the given Key is notified by some other process. stopCh can be closed to stop waiting and immediately close the returned channel

func (Core) MonoTS

func (c Core) MonoTS(t TS) (TS, error)

MonoTS returns a new, unique TS corresponding to the given timestamp All returns are monotonically increasing across the entire cluster. Consequently, the TS returned might differ in the time it represents from the given TS by a very small amount (or a big amount, if the given time is way in the past).

func (*Core) NewEvent

func (c *Core) NewEvent(now, expire TS, contents string) (Event, error)

NewEvent creates an event struct with the given information. The returned Event will have the given contents, and its ID will a unique identifier based on the passed in now and expire.

func (*Core) Query

func (c *Core) Query(qas QueryActions) (QueryRes, error)

Query performs the given QueryActions pipeline. Whatever the final output from the pipeline is is returned.

func (*Core) Run

func (c *Core) Run(stopCh chan struct{}) chan error

Run performs all the background work needed to support Core. It spawns a background go-routine which does the actual work. If Run encounters an error then the error will be written to the returned channel and the background routine will stop. Run must be called again to keep using the Core.

The returned channel is buffered by 1, and will only ever be written to once, so it's not strictly necessary to read from it.

stopCh is optional and may be used to prematurely stop execution of Run. nil will be written to the returned channel in this case.

func (*Core) SetEvent

func (c *Core) SetEvent(e Event, expireBuffer time.Duration) error

SetEvent sets the event with the given id to have the given contents. The event will expire based on the ID field in it (which will be truncated to an integer) added with the given buffer

type Event

type Event struct {
	ID       ID
	Contents string
}

Event describes all the information related to a single event. An event is immutable, nothing in this struct will ever change

type ID

type ID struct {
	T      TS
	Expire TS
}

ID identifies a single event across the entire cluster, and is unique for all time. It also represents the point in time at which an event will expire

func IDFromString

func IDFromString(idstr string) (ID, error)

IDFromString takes a string previously returned by an ID's String method and returns the corresponding ID for it

func (ID) String

func (id ID) String() string

String returns the string form of the ID

type Key

type Key struct {
	Base string
	Subs []string
}

Key describes a location some data can be stored in in redis. Keys with the same Base will be stored together and can be interacted with transactionally. Subs is used a further set of identifiers for the Key.

The only disallowed character in the strings making up key is ':'

func KeyFromString

func KeyFromString(key string) Key

KeyFromString takes the string form of a Key and returns the associated String. Will probably panic if the given string is not valid, so check yourself before you wreck yourself.

func (Key) Copy

func (k Key) Copy() Key

Copy returns a deep copy of the Key

func (Key) String

func (k Key) String(prefix string) string

String returns the string form of the Key, as it will appear in redis

type Opts

type Opts struct {
	// Default 10. Number of threads which may publish simultaneously.
	NumPublishers int

	// Default "bananaq". String to prefix all redis keys with
	RedisPrefix string
}

Opts are extra configuration fields which may be set on Core

type QueryAction

type QueryAction struct {
	// Selects a set of IDs using various types of logic. See the doc on
	// QuerySelector for more. By default, a QuerySelector causes this
	// QueryAction to simply discard its input and output the QuerySelector's
	// output.
	*QuerySelector

	// Counts elements in a Key. See its doc string for more info
	*QueryCount

	// If true will count the number of IDs in the input to this action, append
	// that count to the result, and pass that input through as the output.
	CountInput bool

	// Adds the input IDs to the given Keys. See its doc string for more info
	*QueryAddTo

	// Removes IDs from Keys by score. See its doc string for more info
	*QueryRemoveByScore

	// Removes the input IDs from the given Keys
	RemoveFrom []Key

	// Adds an ID to a key. See its doc string for more info
	*QuerySingleSet

	// Retrieve an ID that was set at the given Key using SingleSet. The
	// output will be a set continaing only this ID, or empty output if the
	// key isn't set
	SingleGet *Key

	// Filters Events out of the input. See its doc string for more info
	*QueryFilter

	// Deletes whatever the Key's contents are. The input to this action becomes
	// the output
	Delete *Key

	// Stops the pipeline of QueryActions and returns the previous output
	Break bool

	// May be set alongside any of the other fields on this struct. See its doc
	// string for more info
	QueryConditional

	// Must be set alongside another field in this Selector. Indicates that
	// instead of discarding the output of the previous QueryAction, its output
	// and the output from this action should be merged as a union. That union
	// then becomes the output of this action.
	Union bool
}

QueryAction describes a single action to take on a set of IDs. Every action has an input and an output, which are both always sorted chronologically (by ID.T). Only one single field, apart from QueryConditional or Union, should be set on a QueryAction.

type QueryActions

type QueryActions struct {
	// This must match the Base field on all Keys being used in this pipeline
	KeyBase      string
	QueryActions []QueryAction

	// Optional, may be passed in if there is a previous notion of "current
	// time", to maintain consistency
	Now TS `msg:"-"`
}

QueryActions are a set of actions to take sequentially. A set of QueryActions effectively amounts to a pipeline of IDs. Each QueryAction has the potential to output some set of IDs, which become the input for the next QueryAction. The initial set of IDs is empty, so the first QueryAction should always have a QuerySelector to start things off. The final QueryAction's output will be the output set of IDs from the Query method, as well as the set of results from all Count operations which occurred during the query.

type QueryAddTo

type QueryAddTo struct {
	Keys          []Key
	ExpireAsScore bool
	Score         TS
}

QueryAddTo adds its input IDs to the given Keys. If ExpireAsScore is set to true, then each ID's expire time will be used as its score. If Score is given it will be used as the score for all IDs being added, otherwise the T of each individual ID will be used

type QueryConditional

type QueryConditional struct {
	// Only do the QueryAction if all of the conditionals in this list return
	// true
	And []QueryConditional

	// Only do the QueryAction if there is an empty input. Otherwise do no
	// action and pass the non-empty input through
	IfNoInput bool

	// Only do the QueryAction if there is non-empty input. Otherwise pass the
	// empty input through
	IfInput bool

	// Only do the QueryAction if the given Key has no data in it
	IfEmpty *Key

	// Only do the QueryAction if the given Key has data in it
	IfNotEmpty *Key
}

QueryConditional is a field on QueryAction which can affect what the QueryAction does. More than one field may be set on this to have multiple conditionals.

type QueryCount

type QueryCount struct {
	Key
	QueryScoreRange
}

QueryCount will count the number of elements within the Key which fall into the given QueryScoreRange. The input to this action is passed straight into the output. The count is appended to the Counts field in the QueryRes

type QueryFilter

type QueryFilter struct {
	// If set, only IDs which have not expired will be allowed through
	Expired bool

	// May be set alongside any other filter field. Will invert the filter, so
	// that whatever IDs would have been allowed through will not be, and
	// vice-versa
	Invert bool
}

QueryFilter will apply a filter to its input, only outputting the IDs which don't match the filter. Only one filter field should be set per QueryAction

type QueryIDScoreSelect

type QueryIDScoreSelect struct {
	ID    ID
	Min   TS
	Max   TS
	Equal TS
}

QueryIDScoreSelect pulls the given ID from a Key. If the ID is not in the Key there is no output. If the ID is in the Key but its score does not match whatever conditions are set by Min/Max/Equal there is no output. Otherwise the output is the single ID.

type QueryRangeSelect

type QueryRangeSelect struct {
	QueryScoreRange

	// Optional modifiers. If Offset is nonzero, Limit must be nonzero too (it
	// can be -1 to indicate no limit)
	Limit, Offset int64

	// If true, reverses the order that IDs in the set are processed and
	// returned. The meanings of Min/Max stay the same though.
	Reverse bool
}

QueryRangeSelect is used to select all elements within the given range from a Key.

type QueryRemoveByScore

type QueryRemoveByScore struct {
	Keys []Key
	QueryScoreRange
}

QueryRemoveByScore is used to remove IDs from Keys based on a range of scores. This action does not change the input in anyway, it simply passes the input through as its output.

type QueryRes

type QueryRes struct {
	IDs    []ID
	Counts []uint64
}

QueryRes contains all the return values from a Query

type QueryScoreRange

type QueryScoreRange struct {
	Min              TS
	Max              TS
	MinExcl, MaxExcl bool

	// May be set instead of Min. The newest ID from the input to this
	// QueryAction will be used as the Min. If the input is empty, Min will be 0
	MinFromInput bool

	// May be set instead of Max. The oldest ID from the input to this
	// QueryAction will be used as the Max. If the input is empty, Max will be 0
	MaxFromInput bool
}

QueryScoreRange is used by multiple selectors to describe a range of elements by their scores. If MinExcl is true, Min itself will be excluded from the return if it's in the set (and similarly for MaxExcl/Max). If Min or Max are 0 that indicates -infinity or +infinity, respectively

type QuerySelector

type QuerySelector struct {
	Key

	// See QueryRangeSelect doc string
	*QueryRangeSelect

	// See QueryIDScoreSelect doc string
	*QueryIDScoreSelect

	// Select IDs by their position within the Key, using a two element
	// slice. 0 is the oldest id, 1 is the second oldest, etc... -1 is the
	// youngest, -2 the second youngest, etc...
	PosRangeSelect []int64

	// Doesn't actually do a query, the output from this selector will simply be
	// these IDs.
	IDs []ID
}

QuerySelector describes a set of criteria for selecting a set of IDs from a Key. Key is a required field, only one field apart from it should be set in this selector (unless otherwise noted)

type QuerySingleSet

type QuerySingleSet struct {
	Key
	IfNewer bool
}

QuerySingleSet will set the given Key to the first ID in the input. If the input to this action has no IDs then nothing happens. The output from this action will be the input.

If IfNewer is set, the key will only be set if its ID is newer than the ID already in the Key. This does not change the output in any way

type TS

type TS uint64

TS identifies a single point in time as an integer number of microseconds

func NewTS

func NewTS(t time.Time) TS

NewTS returns a TS corresponding to the given Time (though it may be truncated in precision by some small amount).

func (TS) String

func (ts TS) String() string

String returns the string form of a TS

func (TS) Time

func (ts TS) Time() time.Time

Time returns the Time object this TS corresponds to

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL