Documentation ¶
Overview ¶
Package core is responsible for actually communicating with redis and providing an abstraction for the data stored in it
This package is not stable! At present it is only intended to be used by other components in bananaq
Index ¶
- Variables
- type Core
- func (c *Core) GetEvent(id ID) (Event, error)
- func (c *Core) KeyNotify(k Key)
- func (c *Core) KeyScan(k Key) ([]Key, error)
- func (c *Core) KeyWait(k Key, stopCh <-chan struct{}) <-chan struct{}
- func (c Core) MonoTS(t TS) (TS, error)
- func (c *Core) NewEvent(now, expire TS, contents string) (Event, error)
- func (c *Core) Query(qas QueryActions) (QueryRes, error)
- func (c *Core) Run(stopCh chan struct{}) chan error
- func (c *Core) SetEvent(e Event, expireBuffer time.Duration) error
- type Event
- type ID
- type Key
- type Opts
- type QueryAction
- type QueryActions
- type QueryAddTo
- type QueryConditional
- type QueryCount
- type QueryFilter
- type QueryIDScoreSelect
- type QueryRangeSelect
- type QueryRemoveByScore
- type QueryRes
- type QueryScoreRange
- type QuerySelector
- type QuerySingleSet
- type TS
Constants ¶
This section is empty.
Variables ¶
var (
ErrNotFound = errors.New("not found")
)
Various errors this package may return
Functions ¶
This section is empty.
Types ¶
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
Core contains all the information needed to interact with the underlying redis instances for bananaq. All methods on Core are thread-safe, except Run which should only be run by a single goroutine at any time.
func New ¶
New initializes a new Core instance based on the given Cmder (which may be a *pool.Pool or *cluster.Cluster) and extra options (which may be nil). Run must be called in order to actually use the Core
func (*Core) GetEvent ¶
GetEvent returns the event identified by the given ID, or ErrNotFound if it's expired or never existed
func (*Core) KeyNotify ¶
KeyNotify will notify all processes currently waiting on the given Key using KeyWait
func (*Core) KeyScan ¶
KeyScan returns all the Keys matching the given Key pattern. At least one field in the given Key should be a "*"
func (*Core) KeyWait ¶
KeyWait returns a channel which will be closed when the given Key is notified by some other process. stopCh can be closed to stop waiting and immediately close the returned channel
func (Core) MonoTS ¶
MonoTS returns a new, unique TS corresponding to the given timestamp All returns are monotonically increasing across the entire cluster. Consequently, the TS returned might differ in the time it represents from the given TS by a very small amount (or a big amount, if the given time is way in the past).
func (*Core) NewEvent ¶
NewEvent creates an event struct with the given information. The returned Event will have the given contents, and its ID will a unique identifier based on the passed in now and expire.
func (*Core) Query ¶
func (c *Core) Query(qas QueryActions) (QueryRes, error)
Query performs the given QueryActions pipeline. Whatever the final output from the pipeline is is returned.
func (*Core) Run ¶
Run performs all the background work needed to support Core. It spawns a background go-routine which does the actual work. If Run encounters an error then the error will be written to the returned channel and the background routine will stop. Run must be called again to keep using the Core.
The returned channel is buffered by 1, and will only ever be written to once, so it's not strictly necessary to read from it.
stopCh is optional and may be used to prematurely stop execution of Run. nil will be written to the returned channel in this case.
type Event ¶
Event describes all the information related to a single event. An event is immutable, nothing in this struct will ever change
type ID ¶
ID identifies a single event across the entire cluster, and is unique for all time. It also represents the point in time at which an event will expire
func IDFromString ¶
IDFromString takes a string previously returned by an ID's String method and returns the corresponding ID for it
type Key ¶
Key describes a location some data can be stored in in redis. Keys with the same Base will be stored together and can be interacted with transactionally. Subs is used a further set of identifiers for the Key.
The only disallowed character in the strings making up key is ':'
func KeyFromString ¶
KeyFromString takes the string form of a Key and returns the associated String. Will probably panic if the given string is not valid, so check yourself before you wreck yourself.
type Opts ¶
type Opts struct { // Default 10. Number of threads which may publish simultaneously. NumPublishers int // Default "bananaq". String to prefix all redis keys with RedisPrefix string }
Opts are extra configuration fields which may be set on Core
type QueryAction ¶
type QueryAction struct { // Selects a set of IDs using various types of logic. See the doc on // QuerySelector for more. By default, a QuerySelector causes this // QueryAction to simply discard its input and output the QuerySelector's // output. *QuerySelector // Counts elements in a Key. See its doc string for more info *QueryCount // If true will count the number of IDs in the input to this action, append // that count to the result, and pass that input through as the output. CountInput bool // Adds the input IDs to the given Keys. See its doc string for more info *QueryAddTo // Removes IDs from Keys by score. See its doc string for more info *QueryRemoveByScore // Removes the input IDs from the given Keys RemoveFrom []Key // Adds an ID to a key. See its doc string for more info *QuerySingleSet // Retrieve an ID that was set at the given Key using SingleSet. The // output will be a set continaing only this ID, or empty output if the // key isn't set SingleGet *Key // Filters Events out of the input. See its doc string for more info *QueryFilter // Deletes whatever the Key's contents are. The input to this action becomes // the output Delete *Key // Stops the pipeline of QueryActions and returns the previous output Break bool // May be set alongside any of the other fields on this struct. See its doc // string for more info QueryConditional // Must be set alongside another field in this Selector. Indicates that // instead of discarding the output of the previous QueryAction, its output // and the output from this action should be merged as a union. That union // then becomes the output of this action. Union bool }
QueryAction describes a single action to take on a set of IDs. Every action has an input and an output, which are both always sorted chronologically (by ID.T). Only one single field, apart from QueryConditional or Union, should be set on a QueryAction.
type QueryActions ¶
type QueryActions struct { // This must match the Base field on all Keys being used in this pipeline KeyBase string QueryActions []QueryAction // Optional, may be passed in if there is a previous notion of "current // time", to maintain consistency Now TS `msg:"-"` }
QueryActions are a set of actions to take sequentially. A set of QueryActions effectively amounts to a pipeline of IDs. Each QueryAction has the potential to output some set of IDs, which become the input for the next QueryAction. The initial set of IDs is empty, so the first QueryAction should always have a QuerySelector to start things off. The final QueryAction's output will be the output set of IDs from the Query method, as well as the set of results from all Count operations which occurred during the query.
type QueryAddTo ¶
QueryAddTo adds its input IDs to the given Keys. If ExpireAsScore is set to true, then each ID's expire time will be used as its score. If Score is given it will be used as the score for all IDs being added, otherwise the T of each individual ID will be used
type QueryConditional ¶
type QueryConditional struct { // Only do the QueryAction if all of the conditionals in this list return // true And []QueryConditional // Only do the QueryAction if there is an empty input. Otherwise do no // action and pass the non-empty input through IfNoInput bool // Only do the QueryAction if there is non-empty input. Otherwise pass the // empty input through IfInput bool // Only do the QueryAction if the given Key has no data in it IfEmpty *Key // Only do the QueryAction if the given Key has data in it IfNotEmpty *Key }
QueryConditional is a field on QueryAction which can affect what the QueryAction does. More than one field may be set on this to have multiple conditionals.
type QueryCount ¶
type QueryCount struct { Key QueryScoreRange }
QueryCount will count the number of elements within the Key which fall into the given QueryScoreRange. The input to this action is passed straight into the output. The count is appended to the Counts field in the QueryRes
type QueryFilter ¶
type QueryFilter struct { // If set, only IDs which have not expired will be allowed through Expired bool // May be set alongside any other filter field. Will invert the filter, so // that whatever IDs would have been allowed through will not be, and // vice-versa Invert bool }
QueryFilter will apply a filter to its input, only outputting the IDs which don't match the filter. Only one filter field should be set per QueryAction
type QueryIDScoreSelect ¶
QueryIDScoreSelect pulls the given ID from a Key. If the ID is not in the Key there is no output. If the ID is in the Key but its score does not match whatever conditions are set by Min/Max/Equal there is no output. Otherwise the output is the single ID.
type QueryRangeSelect ¶
type QueryRangeSelect struct { QueryScoreRange // Optional modifiers. If Offset is nonzero, Limit must be nonzero too (it // can be -1 to indicate no limit) Limit, Offset int64 // If true, reverses the order that IDs in the set are processed and // returned. The meanings of Min/Max stay the same though. Reverse bool }
QueryRangeSelect is used to select all elements within the given range from a Key.
type QueryRemoveByScore ¶
type QueryRemoveByScore struct { Keys []Key QueryScoreRange }
QueryRemoveByScore is used to remove IDs from Keys based on a range of scores. This action does not change the input in anyway, it simply passes the input through as its output.
type QueryScoreRange ¶
type QueryScoreRange struct { Min TS Max TS MinExcl, MaxExcl bool // May be set instead of Min. The newest ID from the input to this // QueryAction will be used as the Min. If the input is empty, Min will be 0 MinFromInput bool // May be set instead of Max. The oldest ID from the input to this // QueryAction will be used as the Max. If the input is empty, Max will be 0 MaxFromInput bool }
QueryScoreRange is used by multiple selectors to describe a range of elements by their scores. If MinExcl is true, Min itself will be excluded from the return if it's in the set (and similarly for MaxExcl/Max). If Min or Max are 0 that indicates -infinity or +infinity, respectively
type QuerySelector ¶
type QuerySelector struct { Key // See QueryRangeSelect doc string *QueryRangeSelect // See QueryIDScoreSelect doc string *QueryIDScoreSelect // Select IDs by their position within the Key, using a two element // slice. 0 is the oldest id, 1 is the second oldest, etc... -1 is the // youngest, -2 the second youngest, etc... PosRangeSelect []int64 // Doesn't actually do a query, the output from this selector will simply be // these IDs. IDs []ID }
QuerySelector describes a set of criteria for selecting a set of IDs from a Key. Key is a required field, only one field apart from it should be set in this selector (unless otherwise noted)
type QuerySingleSet ¶
QuerySingleSet will set the given Key to the first ID in the input. If the input to this action has no IDs then nothing happens. The output from this action will be the input.
If IfNewer is set, the key will only be set if its ID is newer than the ID already in the Key. This does not change the output in any way