river

package
v0.0.0-...-a17a3e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2017 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultTimeout = 30 * time.Millisecond
	MaxTimeout     = 2 * time.Second
)

Timeout constants for Surveys.

Variables

View Source
var (
	// Byte request / response constants.
	HUP     = Survey("HUP")
	OK      = Response("OK")
	UNKNOWN = Response("IDK")

	// RiverBucket stores Rivers and their users.  Buckets in RiverBucket
	// correspond to Streams from StreamBucket by ID, and every River ID in
	// the bucket corresponds to a connected River.
	RiverBucket = store.Bucket("rivers")

	// ResponderBucket is where Responders are kept, for hangup.
	ResponderBucket = store.Bucket("responders")

	// HangupBucket is where Respondents are stored.  Pass this to
	// NewSurvey as the first Bucket argument for most Surveys.
	HangupBucket = store.Bucket("hangups")
)
View Source
var Global = GlobalTopic{}

Global is a package constant for GlobalTopic.

Functions

func AwaitHangup

func AwaitHangup(r Responder) error

AwaitHangup waits for HUP, and replies with OK or UNKNOWN suffixed with ID, returning any error.

func BytesFor

func BytesFor(t Topic, content []byte) []byte

BytesFor returns the bytes for a given []byte on the given Topic.

func CheckMissing

func CheckMissing(in ...store.Bucket) func(*bolt.Tx) error

CheckMissing returns a bolt View function which checks the IDs of m, and if any remain, returns a new Missing with the remaining IDs.

func CheckRiverNotExists

func CheckRiverNotExists(id, streamID string) func(*bolt.Tx) error

CheckRiverNotExists returns an error if the given River exists.

func ClearRivers

func ClearRivers(tx *bolt.Tx) error

ClearRivers eliminates all databased Rivers. Use this on startup.

func DeleteBus

func DeleteBus(id, streamID string, seq uint64) func(*bolt.Tx) error

DeleteBus deletes the Bus River for the given streamID and id from the database. It should be used within a transaction where the Bus River is also closed.

func DeletePub

func DeletePub(id, streamID string, tx *bolt.Tx) error

DeletePub deletes the Pub's entry for the given id in the given stream.

func DeleteResp

func DeleteResp(tx *bolt.Tx, id uint64, buckets ...store.Bucket) error

func IsExists

func IsExists(err error) bool

IsExists returns true if the error is an existent River when one is not expected.

func IsMissing

func IsMissing(err error) bool

IsMissing returns true if the given error is a non-nil Missing error.

func IsStreamExists

func IsStreamExists(err error) bool

func IsStreamMissing

func IsStreamMissing(err error) bool

func IsUnknownSurvey

func IsUnknownSurvey(err error) bool

IsUnknownSurvey returns true if the error is errUnknownSurvey.

func MakeSurvey

func MakeSurvey(s Surveyor, what Survey, expect Response) error

MakeSurvey sends the given Survey to all connected Responders and awaits the expected Response, suffixed with binary.LittleEndian uint64 ID. It retries as necessary up to three times, keeping track of the seen IDs of respondents.

If some Responders don't respond, the error returned will be Missing.

func Wait

func Wait(r Responder, surv Survey, rsp, or Response) error

Wait waits on the given Survey and responds with the given Response. If the Survey is not recognized, it responds with "or".

Types

type Bus

type Bus interface {
	River
	ID() uint64
}

Bus is a River implemented by the mangos BUS protocol.

func NewBus

func NewBus(id, streamID string, tx *bolt.Tx) (r Bus, e error)

NewBus creates a new Bus River, registers it in the DB, connects it to any Rivers in the streamID bucket in RiverBucket, and returns it or any error. It will be created with the given string ID, which is its address.

Buses are created in /rivers/{streamID}/{id}/ bucket sequentially.

type GlobalTopic

type GlobalTopic struct{}

GlobalTopic is a Subscribe Topic which messages not belonging to any other Topic should be sent on.

func (GlobalTopic) Code

func (GlobalTopic) Code() []byte

Code implements Topic.Code on GlobalTopic.

func (GlobalTopic) Len

func (GlobalTopic) Len() int

Len implements Topic.Len on GlobalTopic, returning 1.

func (GlobalTopic) Name

func (GlobalTopic) Name() string

Name implements Topic.Name on GlobalTopic, returning "all".

func (GlobalTopic) Prefix

func (GlobalTopic) Prefix() byte

Prefix returns the GlobalTopic Prefix, 0.

type Missing

type Missing interface {
	error
	IDs() []uint64
}

Missing is an interface which can be implemented by an error value to show that some IDs were missing in a survey.

type Pub

type Pub interface {
	Close() error
	Send([]byte) error
}

Pub is a Publisher River which can only Send. To send on a Topic, prefix the Send with the desired Topic code and Prefix byte. Only Subs which are created on the given Topic, connected to the PubRiver, will receive messages on that Topic.

NOTE: The go-mangos implementation of Pub (returned by NewPub) sends

messages to all connected SubRivers, but they are filtered
before Recv.  This is not a technique which guarantees
unauthorized hosts will not receive the sent bytes; they will.
But their Sub Recv method will not behave as though it received
the message.

func NewPub

func NewPub(id, streamID string, tx *bolt.Tx) (r Pub, e error)

NewPub creates an inproc publisher River in the given Stream bucket in RiverBucket, with address id.

type Responder

type Responder interface {
	Send([]byte) error
	Recv() ([]byte, error)
	Close() error

	ID() uint64
}

Responder is a River which implements the RESPONDENT end of the SURVEYOR/RESPONDENT scalable protocol.

func NewResponder

func NewResponder(
	tx *bolt.Tx,
	buckets ...store.Bucket,
) (rsp Responder, e error)

NewResponder creates a new SURVEYOR/RESPONDENT respondent nested bucket for the given bucket names, if it does not exist. It assigns a new address to a new Responder River based on its path and stores it in the innermost bucket. It then starts listening and returns the Responder.

type Response

type Response []byte

Response is a response request constant (may be suffixed with an ID.)

type River

type River interface {
	Close() error
	Send([]byte) error
	Recv() ([]byte, error)
}

River is a simplified sender and receiver which can be implemented by mangos.Socket.

type Sub

type Sub interface {
	Close() error
	Recv() ([]byte, error)
}

Sub is a Subscriber River which can only Recv. It can be implemented on a byte prefix Topic, which is to be removed before Recv returns.

func NewSub

func NewSub(
	streamID string,
	tx *bolt.Tx,
	topics ...Topic,
) (r Sub, e error)

NewSub creates an inproc subscriber River which connects to all publisher Rivers in the given streamID bucket in RiverBucket, subscribing on the given Topics, or all topics if no Topic is given.

type Survey

type Survey []byte

Survey is a survey request constant.

type Surveyor

type Surveyor interface {
	Recv() ([]byte, error)
	Send([]byte) error
	Close() error

	Expect() map[uint64]bool
}

Surveyor is a River which implements the Surveyor end of the SURVEYOR/RESPONDENT scalable protocol.

func NewSurvey

func NewSurvey(
	tx *bolt.Tx,
	timeout time.Duration,
	buckets ...store.Bucket,
) (Surveyor, error)

NewSurvey is a View function which returns a Surveyor to run the given Survey against all Responders found in the given bucket in RiverBucket, with the contents of req. and the given prefix []byte. Rivers/responders/:user_id/{1,2,3}

Timeout sets the retry timeout for the underlying mangos.Socket.

type Topic

type Topic interface {
	// Prefix must be prepended to Code.  This permits a quick match
	// on message prefix.
	Prefix() byte
	Code() []byte
	Name() string
	Len() int
}

Topic is a Pub/Sub topic. Package users should export and manage their own Topic constants. Note that this means collisions can happen if the package user does not take care to review imported Topics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL