gochan: github.com/rajveermalviya/gochan Index | Files | Directories

package unifrost

import "github.com/rajveermalviya/gochan"

Package unifrost is a go module for relaying pubsub messages to the web using SSE(Eventsource). It is based on Twitter's implementation for real-time event-streaming in their new web app.

Supported brokers

Google Cloud Pub/Sub
Amazon Simple Queueing Service
Azure Service Bus (Pending)
In-memory (Only for testing)

For examples check https://github.com/unifrost/unifrost/tree/master/examples/


Package Files

doc.go unifrost.go


const (
    // ERROR .
    ERROR = iota
    // EVENT .


var (
    // ErrConsumerNotFound is returned if the consumer-id is not registered in the StreamHandler.
    ErrConsumerNotFound = errors.New("stream handler: consumer doesn't exists")

type Consumer Uses

type Consumer struct {
    ID string
    // contains filtered or unexported fields

Consumer manages all the topic subscriptions.

type Option Uses

type Option func(*StreamHandler) error

Option is a self-refrential function for configuration parameters

func ConsumerTTL Uses

func ConsumerTTL(t time.Duration) Option

ConsumerTTL is an option that is used to set the consumer's TTL default TTL is 1 minute

type StreamHandler Uses

type StreamHandler struct {
    // contains filtered or unexported fields

StreamHandler handles all the consumers and subscriptions. It implements the http.Handler interface for easy embedding with any API server.

func NewStreamHandler Uses

func NewStreamHandler(ctx context.Context, subClient drivers.SubscriberClient, options ...Option) (*StreamHandler, error)

NewStreamHandler returns *unifrost.StreamHandler, handles all the consumers and subscriptions.

Additional configuration options can be added with unifrost.Option functions.

func (*StreamHandler) Close Uses

func (s *StreamHandler) Close(ctx context.Context) error

Close closes the StreamHandler and also closes all the connected consumers.

func (*StreamHandler) CloseConsumer Uses

func (s *StreamHandler) CloseConsumer(ctx context.Context, consumerID string) error

CloseConsumer closes the specified consumer and removes it.

func (*StreamHandler) GetConsumerByID Uses

func (s *StreamHandler) GetConsumerByID(consumerID string) (*Consumer, error)

GetConsumerByID returns a pointer consumer struct.

If the consumer id specified is invalid or doesn't exists an error 'unifrost.ErrConsumerNotFound' is returned

func (*StreamHandler) GetConsumerTopics Uses

func (s *StreamHandler) GetConsumerTopics(ctx context.Context, c *Consumer) []string

GetConsumerTopics returns a slice of all the topics the consumer is subscribed to.

func (*StreamHandler) IsConsumerConnected Uses

func (s *StreamHandler) IsConsumerConnected(c *Consumer) bool

IsConsumerConnected reports whether consumer is connected to the server.

func (*StreamHandler) NewConsumer Uses

func (s *StreamHandler) NewConsumer(ctx context.Context) (*Consumer, error)

NewConsumer creates a new consumer with an autogenerated consumer id.

func (*StreamHandler) NewCustomConsumer Uses

func (s *StreamHandler) NewCustomConsumer(ctx context.Context, consumerID string) (*Consumer, error)

NewCustomConsumer creates a new consumer with the specified consumer id.

func (*StreamHandler) ServeHTTP Uses

func (s *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP is the http handler for eventsource. For connecting query parameter 'id' is required i.e consumer_id.

func (*StreamHandler) Subscribe Uses

func (s *StreamHandler) Subscribe(ctx context.Context, consumerID string, topic string) error

Subscribe subscribes the specified consumer to the specified topic. If specified consumer doesn't exists ErrConsumerNotFound error is returned.

func (*StreamHandler) TotalConsumerTopics Uses

func (s *StreamHandler) TotalConsumerTopics(ctx context.Context, c *Consumer) int

TotalConsumerTopics returns the number of topics the consumer is subscribed to.

func (*StreamHandler) TotalConsumers Uses

func (s *StreamHandler) TotalConsumers(ctx context.Context) int

TotalConsumers returns the number of consumer connected to the stream handler.

func (*StreamHandler) Unsubscribe Uses

func (s *StreamHandler) Unsubscribe(ctx context.Context, consumerID string, topic string) error

Unsubscribe method unsubscribes the specified consumer to the specified topic and shutdowns the subscription. If specified consumer doesn't exists ErrConsumerNotFound error is returned.


driversPackage drivers contains all the drivers required to connect to different brokers, under a single easy to use interface.
drivers/gcpdriverPackage gcpdriver contains Google Cloud Pub/Sub driver for unifrost.StreamHandler
drivers/kafkadriverPackage kafkadriver contains Apache Kafka message bus driver for unifrost.StreamHandler
drivers/memdriverPackage memdriver contains In-memory testing driver for unifrost.StreamHandler
drivers/natsdriverPackage natsdriver contains NATS driver for unifrost.StreamHandler
drivers/rabbitdriverPackage rabbitdriver contains RabbitMQ driver for unifrost.StreamHandler
drivers/sqsdriverPackage sqsdriver contains Amazon SQS driver for unifrost.StreamHandler

Package unifrost imports 10 packages (graph). Updated 2020-11-23. Refresh now. Tools for package owners.