Documentation ¶
Index ¶
- Constants
- Variables
- func AwaitHangup(r Responder) error
- func BytesFor(t Topic, content []byte) []byte
- func CheckMissing(in ...store.Bucket) func(*bolt.Tx) error
- func CheckRiverNotExists(id, streamID string) func(*bolt.Tx) error
- func ClearRivers(tx *bolt.Tx) error
- func DeleteBus(id, streamID string, seq uint64) func(*bolt.Tx) error
- func DeletePub(id, streamID string, tx *bolt.Tx) error
- func DeleteResp(tx *bolt.Tx, id uint64, buckets ...store.Bucket) error
- func IsExists(err error) bool
- func IsMissing(err error) bool
- func IsStreamExists(err error) bool
- func IsStreamMissing(err error) bool
- func IsUnknownSurvey(err error) bool
- func MakeSurvey(s Surveyor, what Survey, expect Response) error
- func Wait(r Responder, surv Survey, rsp, or Response) error
- type Bus
- type GlobalTopic
- type Missing
- type Pub
- type Responder
- type Response
- type River
- type Sub
- type Survey
- type Surveyor
- type Topic
Constants ¶
const ( DefaultTimeout = 30 * time.Millisecond MaxTimeout = 2 * time.Second )
Timeout constants for Surveys.
Variables ¶
var ( // Byte request / response constants. HUP = Survey("HUP") OK = Response("OK") UNKNOWN = Response("IDK") // RiverBucket stores Rivers and their users. Buckets in RiverBucket // correspond to Streams from StreamBucket by ID, and every River ID in // the bucket corresponds to a connected River. RiverBucket = store.Bucket("rivers") // ResponderBucket is where Responders are kept, for hangup. ResponderBucket = store.Bucket("responders") // HangupBucket is where Respondents are stored. Pass this to // NewSurvey as the first Bucket argument for most Surveys. HangupBucket = store.Bucket("hangups") )
var Global = GlobalTopic{}
Global is a package constant for GlobalTopic.
Functions ¶
func AwaitHangup ¶
AwaitHangup waits for HUP, and replies with OK or UNKNOWN suffixed with ID, returning any error.
func CheckMissing ¶
CheckMissing returns a bolt View function which checks the IDs of m, and if any remain, returns a new Missing with the remaining IDs.
func CheckRiverNotExists ¶
CheckRiverNotExists returns an error if the given River exists.
func ClearRivers ¶
ClearRivers eliminates all databased Rivers. Use this on startup.
func DeleteBus ¶
DeleteBus deletes the Bus River for the given streamID and id from the database. It should be used within a transaction where the Bus River is also closed.
func IsStreamExists ¶
func IsStreamMissing ¶
func IsUnknownSurvey ¶
IsUnknownSurvey returns true if the error is errUnknownSurvey.
func MakeSurvey ¶
MakeSurvey sends the given Survey to all connected Responders and awaits the expected Response, suffixed with binary.LittleEndian uint64 ID. It retries as necessary up to three times, keeping track of the seen IDs of respondents.
If some Responders don't respond, the error returned will be Missing.
Types ¶
type Bus ¶
Bus is a River implemented by the mangos BUS protocol.
type GlobalTopic ¶
type GlobalTopic struct{}
GlobalTopic is a Subscribe Topic which messages not belonging to any other Topic should be sent on.
func (GlobalTopic) Code ¶
func (GlobalTopic) Code() []byte
Code implements Topic.Code on GlobalTopic.
func (GlobalTopic) Len ¶
func (GlobalTopic) Len() int
Len implements Topic.Len on GlobalTopic, returning 1.
func (GlobalTopic) Name ¶
func (GlobalTopic) Name() string
Name implements Topic.Name on GlobalTopic, returning "all".
func (GlobalTopic) Prefix ¶
func (GlobalTopic) Prefix() byte
Prefix returns the GlobalTopic Prefix, 0.
type Missing ¶
Missing is an interface which can be implemented by an error value to show that some IDs were missing in a survey.
type Pub ¶
Pub is a Publisher River which can only Send. To send on a Topic, prefix the Send with the desired Topic code and Prefix byte. Only Subs which are created on the given Topic, connected to the PubRiver, will receive messages on that Topic.
NOTE: The go-mangos implementation of Pub (returned by NewPub) sends
messages to all connected SubRivers, but they are filtered before Recv. This is not a technique which guarantees unauthorized hosts will not receive the sent bytes; they will. But their Sub Recv method will not behave as though it received the message.
type Responder ¶
Responder is a River which implements the RESPONDENT end of the SURVEYOR/RESPONDENT scalable protocol.
func NewResponder ¶
NewResponder creates a new SURVEYOR/RESPONDENT respondent nested bucket for the given bucket names, if it does not exist. It assigns a new address to a new Responder River based on its path and stores it in the innermost bucket. It then starts listening and returns the Responder.
type Response ¶
type Response []byte
Response is a response request constant (may be suffixed with an ID.)
type Sub ¶
Sub is a Subscriber River which can only Recv. It can be implemented on a byte prefix Topic, which is to be removed before Recv returns.
type Surveyor ¶
type Surveyor interface { Recv() ([]byte, error) Send([]byte) error Close() error Expect() map[uint64]bool }
Surveyor is a River which implements the Surveyor end of the SURVEYOR/RESPONDENT scalable protocol.
func NewSurvey ¶
NewSurvey is a View function which returns a Surveyor to run the given Survey against all Responders found in the given bucket in RiverBucket, with the contents of req. and the given prefix []byte. Rivers/responders/:user_id/{1,2,3}
Timeout sets the retry timeout for the underlying mangos.Socket.
type Topic ¶
type Topic interface { // Prefix must be prepended to Code. This permits a quick match // on message prefix. Prefix() byte Code() []byte Name() string Len() int }
Topic is a Pub/Sub topic. Package users should export and manage their own Topic constants. Note that this means collisions can happen if the package user does not take care to review imported Topics.