broadcast

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2019 License: MIT Imports: 14 Imported by: 1

README

RoadRunner: Event Broadcasting

Latest Stable Version GoDoc Build Status Go Report Card Codecov

Features:

  • Produce and consume events
  • Subscribe to topic from Golang or PHP
  • Multi-Topic subscription
  • WebSocket support
    • Server and topic access verification via PHP
  • Redis and In-Memory brokers
  • Custom topic command handlers
  • Automatic reconnects
  • Prometheus metrics

License:

The MIT License (MIT). Please see LICENSE for more information. Maintained by Spiral Scout.

Documentation

Index

Constants

View Source
const (
	// EventWebsocketConnect fired when new client is connected, the context is *websocket.Conn.
	EventWebsocketConnect = iota + 2500

	// EventWebsocketDisconnect fired when websocket is disconnected, context is empty.
	EventWebsocketDisconnect

	// EventWebsocketJoin caused when topics are being consumed, context if *TopicEvent.
	EventWebsocketJoin

	// EventWebsocketLeave caused when topic consumption are stopped, context if *TopicEvent.
	EventWebsocketLeave

	// EventWebsocketError when any broadcast error occurred, the context is *ErrorEvent.
	EventWebsocketError

	// EventBrokerError the context is error.
	EventBrokerError
)
View Source
const ID = "broadcast"

ID defines public service name.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	// Serve serves broker.
	Serve() error

	// close the consumption and disconnect broker.
	Stop()

	// Subscribe broker to one or multiple topics.
	Subscribe(upstream chan *Message, topics ...string) error

	// Unsubscribe broker from one or multiple topics.
	Unsubscribe(upstream chan *Message, topics ...string)

	// Broadcast one or multiple Messages.
	Broadcast(messages ...*Message) error
}

Broker defines the ability to operate as message passing broker.

type Client added in v1.1.2

type Client struct {
	// contains filtered or unexported fields
}

NewClient subscribes to a given topic and consumes or publish messages to it. NewClient will be receiving messages it produced.

func (*Client) Close added in v1.1.2

func (c *Client) Close()

Close the client and consumption.

func (*Client) Publish added in v1.1.2

func (c *Client) Publish(msg ...*Message) error

Publish message into associated topic or topics.

func (*Client) Subscribe added in v1.1.2

func (c *Client) Subscribe(topics ...string) error

NewClient client to specific topics.

func (*Client) Topics added in v1.1.2

func (c *Client) Topics() []string

Topics return all the topics client subscribed to.

func (*Client) Unsubscribe added in v1.1.2

func (c *Client) Unsubscribe(topics ...string)

Unsubscribe client from specific topics

type Command

type Command struct {
	// Cmd type.
	Cmd string `json:"cmd"`

	// Args contains command specific payload.
	Args json.RawMessage `json:"args"`
}

Command contains information send by user.

func (*Command) Unmarshal

func (cmd *Command) Unmarshal(v interface{}) error

Unmarshal command data.

type CommandHandler

type CommandHandler func(ctx *ConnContext, cmd []byte)

CommandHandler handles custom commands.

type Config

type Config struct {
	// Path defines on this URL the middleware must be activated. Same path must be handled by underlying
	// application kernel to authorize the consumption. Optional.
	Path string

	// RedisConfig configures redis broker.
	Redis *RedisConfig
}

Config configures the broadcast extension.

func (*Config) Hydrate

func (c *Config) Hydrate(cfg service.Config) error

Hydrate reads the configuration values from the source configuration.

func (*Config) InitDefaults added in v1.1.2

func (c *Config) InitDefaults() error

InitDefaults enables in memory broadcast configuration.

type ConnContext

type ConnContext struct {
	// Upstream to push Messages into.
	Upstream chan *Message

	// Conn to the client.
	Conn *websocket.Conn

	// Topics contain list of currently subscribed topics.
	Topics []string
}

ConnContext represents the connection and it's state.

type ErrorEvent

type ErrorEvent struct {
	// Conn specific to the error.
	Conn *websocket.Conn

	// Caused contains job specific error.
	Caused error
}

ErrorEvent represents singular broadcast error event.

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

Memory manages broadcasting in memory.

func (*Memory) Broadcast

func (m *Memory) Broadcast(messages ...*Message) error

Broadcast one or multiple Messages.

func (*Memory) Serve

func (m *Memory) Serve() error

Serve serves broker.

func (*Memory) Stop

func (m *Memory) Stop()

close the consumption and disconnect broker.

func (*Memory) Subscribe

func (m *Memory) Subscribe(upstream chan *Message, topics ...string) error

Subscribe broker to one or multiple channels.

func (*Memory) Unsubscribe

func (m *Memory) Unsubscribe(upstream chan *Message, topics ...string)

Unsubscribe broker from one or multiple channels.

type Message

type Message struct {
	// Topic message been pushed into.
	Topic string `json:"topic"`

	// Payload to be broadcasted. Must be valid JSON.
	Payload json.RawMessage `json:"payload"`
}

Message represent single message.

func NewMessage

func NewMessage(topic string, payload interface{}) *Message

NewMessage creates new message with JSON payload.

type Redis

type Redis struct {
	// contains filtered or unexported fields
}

Redis based broadcast router.

func (*Redis) Broadcast

func (r *Redis) Broadcast(messages ...*Message) error

Broadcast one or multiple Messages.

func (*Redis) Serve

func (r *Redis) Serve() error

Serve serves broker.

func (*Redis) Stop

func (r *Redis) Stop()

close the consumption and disconnect broker.

func (*Redis) Subscribe

func (r *Redis) Subscribe(upstream chan *Message, topics ...string) error

Subscribe broker to one or multiple channels.

func (*Redis) Unsubscribe

func (r *Redis) Unsubscribe(upstream chan *Message, topics ...string)

Unsubscribe broker from one or multiple channels.

type RedisConfig

type RedisConfig struct {
	Addr     string
	Password string
	DB       int
}

RedisConfig configures redis broker.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages even broadcasting and websocket interface.

func (*Service) AddCommand

func (s *Service) AddCommand(name string, cmd CommandHandler)

AddCommand attached custom client command handler, for websocket only.

func (*Service) AddListener

func (s *Service) AddListener(l func(event int, ctx interface{}))

AddListener attaches server event controller.

func (*Service) Broadcast added in v1.1.1

func (s *Service) Broadcast(msg ...*Message) error

Broadcast one or multiple Messages.

func (*Service) Broker

func (s *Service) Broker() Broker

Broker returns associated broker.

func (*Service) Init

func (s *Service) Init(cfg *Config, r *rpc.Service, h *rhttp.Service, e env.Environment) (bool, error)

Init service.

func (*Service) NewClient added in v1.1.2

func (s *Service) NewClient(upstream chan *Message) *Client

NewClient returns single connected client with ability to consume or produce into topic.

func (*Service) Serve

func (s *Service) Serve() (err error)

Serve broadcast broker.

func (*Service) Stop

func (s *Service) Stop()

close broadcast broker.

func (*Service) Subscribe added in v1.1.0

func (s *Service) Subscribe(upstream chan *Message, topics ...string) error

Subscribe broker to one or multiple topics.

func (*Service) Unsubscribe added in v1.1.0

func (s *Service) Unsubscribe(upstream chan *Message, topics ...string)

Unsubscribe broker from one or multiple topics.

type TopicEvent

type TopicEvent struct {
	// Conn associated with topics.
	Conn *websocket.Conn

	// Topics specific to event.
	Topics []string
}

TopicEvent caused when topic is joined or left.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL