hub

package
v0.10.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2020 License: AGPL-3.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const EarliestLastEventID = "earliest"

EarliestLastEventID is the reserved value representing the earliest available event id.

Variables

View Source
var (
	// ErrInvalidAuthorizationHeader is returned when the Authorization header is invalid.
	ErrInvalidAuthorizationHeader = errors.New(`invalid "Authorization" HTTP header`)
	// ErrNoOrigin is returned when the cookie authorization mechanism is used and no Origin nor Referer headers are presents.
	ErrNoOrigin = errors.New(`an "Origin" or a "Referer" HTTP header must be present to use the cookie-based authorization mechanism`)
	// ErrOriginNotAllowed is returned when the Origin is not allowed to post updates.
	ErrOriginNotAllowed = errors.New("origin not allowed to post updates")
	// ErrUnexpectedSigningMethod is returned when the signing JWT method is not supported.
	ErrUnexpectedSigningMethod = errors.New("unexpected signing method")
	// ErrInvalidJWT is returned when the JWT is invalid.
	ErrInvalidJWT = errors.New("invalid JWT")
	// ErrPublicKey is returned when there is an error with the public key.
	ErrPublicKey = errors.New("public key error")
)
View Source
var (
	// ErrInvalidTransportDSN is returned when the Transport's DSN is invalid.
	ErrInvalidTransportDSN = errors.New("invalid transport DSN")
	// ErrClosedTransport is returned by the Transport's Dispatch and AddSubscriber methods after a call to Close.
	ErrClosedTransport = errors.New("hub: read/write on closed Transport")
)
View Source
var ErrInvalidConfig = errors.New("invalid config")

ErrInvalidConfig is returned when the configuration is invalid.

Functions

func AssignUUID added in v0.10.1

func AssignUUID(u *Update)

AssignUUID generates a new UUID an assign it to the given update if no ID is already set.

func Demo added in v0.8.0

func Demo(w http.ResponseWriter, r *http.Request)

Demo exposes INSECURE Demo endpoints to test discovery and authorization mechanisms. Add a query parameter named "body" to define the content to return in the response's body. Add a query parameter named "jwt" set a "mercureAuthorization" cookie containing this token. The Content-Type header will automatically be set according to the URL's extension.

func InitConfig added in v0.8.0

func InitConfig(v *viper.Viper)

InitConfig reads in config file and ENV variables if set.

func InitLogrus added in v0.8.0

func InitLogrus()

InitLogrus configures the global logger.

func SetConfigDefaults added in v0.8.0

func SetConfigDefaults(v *viper.Viper)

SetConfigDefaults sets defaults on a Viper instance.

func SetFlags added in v0.8.0

func SetFlags(fs *pflag.FlagSet, v *viper.Viper)

SetFlags creates flags and bind them to Viper.

func Start added in v0.8.0

func Start()

Start is an helper method to start the Mercure Hub.

func ValidateConfig added in v0.8.0

func ValidateConfig(v *viper.Viper) error

ValidateConfig validates a Viper instance.

Types

type BoltTransport added in v0.8.0

type BoltTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

BoltTransport implements the TransportInterface using the Bolt database.

func NewBoltTransport added in v0.8.0

func NewBoltTransport(u *url.URL) (*BoltTransport, error)

NewBoltTransport create a new BoltTransport.

func (*BoltTransport) AddSubscriber added in v0.10.0

func (t *BoltTransport) AddSubscriber(s *Subscriber) error

AddSubscriber adds a new subscriber to the transport.

func (*BoltTransport) Close added in v0.8.0

func (t *BoltTransport) Close() (err error)

Close closes the Transport.

func (*BoltTransport) Dispatch added in v0.10.0

func (t *BoltTransport) Dispatch(update *Update) error

Dispatch dispatches an update to all subscribers and persists it in BoltDB.

func (*BoltTransport) GetSubscribers added in v0.10.0

func (t *BoltTransport) GetSubscribers() (lastEventID string, subscribers []*Subscriber)

GetSubscribers get the list of active subscribers.

type Event

type Event struct {
	// The updates' data, encoded in the sever-sent event format: every line starts with the string "data: "
	// https://www.w3.org/TR/eventsource/#dispatchMessage
	Data string

	// The globally unique identifier corresponding to update
	ID string

	// The event type, will be attached to the "event" field
	Type string

	// The reconnection time
	Retry uint64
}

Event is the actual Server Sent Event that will be dispatched.

func (*Event) String

func (e *Event) String() string

String serializes the event in a "text/event-stream" representation.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub stores channels with clients currently subscribed and allows to dispatch updates.

func NewHub

func NewHub(v *viper.Viper) (*Hub, error)

NewHub creates a hub using the Viper configuration.

func NewHubWithTransport added in v0.8.0

func NewHubWithTransport(v *viper.Viper, t Transport, tss *TopicSelectorStore) *Hub

NewHubWithTransport creates a hub.

func (*Hub) PublishHandler

func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request)

PublishHandler allows publisher to broadcast updates to all subscribers.

func (*Hub) Serve

func (h *Hub) Serve()

Serve starts the HTTP server.

func (*Hub) Stop

func (h *Hub) Stop() error

Stop stops disconnect all connected clients.

func (*Hub) SubscribeHandler

func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request)

SubscribeHandler creates a keep alive connection and sends the events to the subscribers.

func (*Hub) SubscriptionHandler added in v0.10.0

func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request)

func (*Hub) SubscriptionsHandler added in v0.10.0

func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request)

type LocalTransport added in v0.8.0

type LocalTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalTransport implements the TransportInterface without database and simply broadcast the live Updates.

func NewLocalTransport added in v0.8.0

func NewLocalTransport() *LocalTransport

NewLocalTransport create a new LocalTransport.

func (*LocalTransport) AddSubscriber added in v0.10.0

func (t *LocalTransport) AddSubscriber(s *Subscriber) error

AddSubscriber adds a new subscriber to the transport.

func (*LocalTransport) Close added in v0.8.0

func (t *LocalTransport) Close() (err error)

Close closes the Transport.

func (*LocalTransport) Dispatch added in v0.10.0

func (t *LocalTransport) Dispatch(update *Update) error

Dispatch dispatches an update to all subscribers.

func (*LocalTransport) GetSubscribers added in v0.10.0

func (t *LocalTransport) GetSubscribers() (lastEventID string, subscribers []*Subscriber)

GetSubscribers get the list of active subscribers.

type Metrics added in v0.10.0

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics store Hub collected metrics.

func NewMetrics added in v0.10.0

func NewMetrics() *Metrics

NewMetrics creates a Prometheus metrics collector.

func (*Metrics) NewSubscriber added in v0.10.0

func (m *Metrics) NewSubscriber(s *Subscriber)

NewSubscriber collects metrics about new subscriber events.

func (*Metrics) NewUpdate added in v0.10.0

func (m *Metrics) NewUpdate(u *Update)

NewUpdate collects metrics on new update event.

func (*Metrics) Register added in v0.10.0

func (m *Metrics) Register(r *mux.Router)

Register configures the Prometheus registry with all collected metrics.

func (*Metrics) SubscriberDisconnect added in v0.10.0

func (m *Metrics) SubscriberDisconnect(s *Subscriber)

SubscriberDisconnect collects metrics about subscriber disconnection events.

type Subscriber

type Subscriber struct {
	ID                 string
	EscapedID          string
	Claims             *claims
	Topics             []string
	EscapedTopics      []string
	RequestLastEventID string
	LogFields          log.Fields
	Debug              bool
	// contains filtered or unexported fields
}

Subscriber represents a client subscribed to a list of topics.

func NewSubscriber added in v0.4.1

func NewSubscriber(lastEventID string, tss *TopicSelectorStore) *Subscriber

NewSubscriber creates a new subscriber.

func (*Subscriber) CanDispatch added in v0.10.0

func (s *Subscriber) CanDispatch(u *Update) bool

CanDispatch checks if an update can be dispatched to this subsriber.

func (*Subscriber) Disconnect added in v0.10.0

func (s *Subscriber) Disconnect()

Disconnect disconnects the subscriber.

func (*Subscriber) Dispatch added in v0.10.0

func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool

Dispatch an update to the subscriber.

func (*Subscriber) HistoryDispatched added in v0.10.0

func (s *Subscriber) HistoryDispatched(responseLastEventID string)

HistoryDispatched must be called when all messages coming from the history have been dispatched.

func (*Subscriber) Receive added in v0.10.0

func (s *Subscriber) Receive() <-chan *Update

Receive returns a chan when incoming updates are dispatched.

type TopicSelectorStore added in v0.10.2

type TopicSelectorStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

topicSelectorStore caches compiled templates to improve memory and CPU usage.

func NewTopicSelectorStore added in v0.10.2

func NewTopicSelectorStore() *TopicSelectorStore

NewTopicSelectorStore creates a new topic selector store.

type Transport added in v0.8.0

type Transport interface {
	// Dispatch dispatches an update to all subscribers.
	Dispatch(update *Update) error

	// AddSubscriber adds a new subscriber to the transport.
	AddSubscriber(s *Subscriber) error

	// Close closes the Transport.
	Close() error
}

Transport provides methods to dispatch and persist updates.

func NewTransport added in v0.8.0

func NewTransport(config *viper.Viper) (Transport, error)

NewTransport create a transport using the backend matching the given TransportURL.

type TransportSubscribers added in v0.10.1

type TransportSubscribers interface {
	// GetSubscribers gets the last event ID and the list of active subscribers at this time.
	GetSubscribers() (string, []*Subscriber)
}

TransportSubscribers provide a method to retrieve the list of active subscribers.

type Update

type Update struct {
	// The topics' Internationalized Resource Identifier (RFC3987) (will most likely be URLs).
	// The first one is the canonical IRI, while next ones are alternate IRIs.
	Topics []string

	// Private updates can only be dispatched to subscribers authorized to receive them.
	Private bool

	// The Server-Sent Event to send.
	Event
}

Update represents an update to send to subscribers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL