internal

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2022 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CodeNotAuthorized = iota + 1
	CodeTopicNotAvailable
	CodeFailedToCreateStream
	CodeFailedToSendOffer
	CodeUnknown
)
View Source
const (
	DistributeEvent = "Distribute"
	AcceptClient    = "Accept"
)
View Source
const DELIMITER = '\n'

DELIMITER is the delimiter used to separate messages in streams.

View Source
const ErrorTopic = "error"

Variables

View Source
var (
	ErrNotAuthorized        = errors.New("not authorized")
	ErrFailedToCreateStream = errors.New("failed to create send/receive stream to client")
	ErrFailedToReadOffer    = errors.New("failed to read offer from client")
	ErrFailedToSendOffer    = errors.New("failed to send offer to server")
	ErrFailedToMarshal      = errors.New("failed to marshal/unmarshal data")
)
View Source
var DefaultOnError = func(code int, data map[string]any) {
	log.Printf("code: %d, data: %v", code, data)
}

DefaultOnError Default handler for processing errors. it listen to topic "error".

View Source
var DefaultOnMessage = func(topic string, message []byte) {
	log.Printf("topic: %s, message: %s\n", topic, string(message))
}

DefaultOnMessage Default handler for processing incoming events without a handler.

Functions

func AppendIfMissing added in v1.1.2

func AppendIfMissing(topics []string, topic string) []string

AppendIfMissing check if item missing append item to list.

func CloseClientConnection added in v1.1.7

func CloseClientConnection(connection quic.Connection, code int, err error) error

func DefaultAuthenticationFunc

func DefaultAuthenticationFunc(token string) bool

DefaultAuthenticationFunc is the default authentication function. it accepts all clients.

func DefaultAuthorizationFunc

func DefaultAuthorizationFunc(token, topic string) bool

DefaultAuthorizationFunc is the default authorization function. it accepts all clients.

func IsSubscribeTopicValid added in v1.1.2

func IsSubscribeTopicValid(topic string, topics []string) bool

IsSubscribeTopicValid check the subscribed topic exist or matched with client topics.

func NewLogger added in v1.2.0

func NewLogger() *zap.Logger

func SendError added in v1.1.7

func SendError(sendStream quic.SendStream, e *Error) error

SendError send input error to client.

func TopicHasWildcard

func TopicHasWildcard(topic string) bool

TopicHasWildcard checks if the topic is a wildcard.

func WriteData

func WriteData(data any, sendStream quic.SendStream) error

WriteData writes data to stream.

Types

type Client

type Client struct {
	Connection quic.Connection
	Token      string
	Topics     []string
	Logger     *zap.Logger
	Finder     Finder

	OnEvent   map[string]func(event []byte)
	OnMessage func(topic string, message []byte)
	OnError   func(code int, data map[string]any)
}

func (*Client) AcceptEvents

func (c *Client) AcceptEvents(reader *bufio.Reader)

AcceptEvents reads events from the stream and calls the proper handler. order of calling handlers is as follows: 1. OnError if topic is "error" 2. OnEvent[topic] 3. OnMessage.

func (*Client) SetErrorHandler

func (c *Client) SetErrorHandler(handler func(code int, data map[string]any))

SetErrorHandler sets the handler for "error" topic.

func (*Client) SetEventHandler

func (c *Client) SetEventHandler(topic string, handler func([]byte))

SetEventHandler sets the handler for the given topic.

func (*Client) SetMessageHandler

func (c *Client) SetMessageHandler(handler func(topic string, message []byte))

SetMessageHandler sets the handler for all topics without handler and "error" topic.

type DistributeWork added in v1.2.0

type DistributeWork struct {
	Event       []byte
	EventSource *EventSource
}

func NewDistributeWork added in v1.2.0

func NewDistributeWork(event []byte, eventSource *EventSource) *DistributeWork

type Error

type Error struct {
	Code int            `json:"code,omitempty"`
	Data map[string]any `json:"data,omitempty"`
}

func NewErr

func NewErr(code int, data map[string]any) *Error

func UnmarshalError

func UnmarshalError(bytes []byte) (Error, error)

type Event

type Event struct {
	Topic string `json:"topic,omitempty"`
	Data  []byte `json:"data,omitempty"`
}

func NewEvent

func NewEvent(topic string, data []byte) *Event

type EventSource

type EventSource struct {
	Topic                 string
	DataChannel           chan []byte
	Subscribers           []Subscriber
	IncomingSubscribers   chan Subscriber
	SubscriberWaitingList []Subscriber
	Metrics               Metrics
	Cleaning              *atomic.Bool
	CleaningInterval      time.Duration
}

EventSource is a struct for topic channel and its subscribers.

func NewEventSource

func NewEventSource(
	topic string,
	dataChannel chan []byte,
	subscribers []Subscriber,
	metric Metrics,
	cleaningInterval time.Duration,
) *EventSource

func (*EventSource) CleanCorruptSubscribers added in v1.2.0

func (e *EventSource) CleanCorruptSubscribers()

func (*EventSource) DistributeEvents added in v1.2.0

func (e *EventSource) DistributeEvents(worker Worker)

DistributeEvents distribute events from channel between subscribers.

func (*EventSource) HandleNewSubscriber added in v1.2.0

func (e *EventSource) HandleNewSubscriber()

type Finder added in v1.2.0

type Finder struct {
	Logger *zap.Logger
}

Finder for topics.

func (*Finder) FindRelatedWildcardTopics added in v1.2.0

func (f *Finder) FindRelatedWildcardTopics(topic string, topics []string) []string

FindRelatedWildcardTopics find topics patterns that are applicable to the given topic.

func (*Finder) FindTopicsList added in v1.2.0

func (f *Finder) FindTopicsList(topics []string, pattern string) []string

FindTopicsList find topics that match the topic pattern.

type Metrics added in v1.1.4

type Metrics struct {
	EventCounter      *prometheus.GaugeVec
	SubscriberCounter *prometheus.GaugeVec
}

func NewMetrics added in v1.1.4

func NewMetrics(namespace, subSystem string) Metrics

func (Metrics) DecEvent added in v1.1.7

func (m Metrics) DecEvent(topic string)

func (Metrics) DecSubscriber added in v1.1.7

func (m Metrics) DecSubscriber(topic string)

func (Metrics) IncEvent added in v1.1.7

func (m Metrics) IncEvent(topic string)

func (Metrics) IncSubscriber added in v1.1.7

func (m Metrics) IncSubscriber(topic string)

type Offer

type Offer struct {
	Token  string   `json:"token,omitempty"`
	Topics []string `json:"topics,omitempty"`
}

func AcceptOffer added in v1.2.0

func AcceptOffer(connection quic.Connection) (*Offer, error)

func NewOffer

func NewOffer(token string, topics []string) Offer

type Server

type Server struct {
	Worker       Worker
	Listener     quic.Listener
	EventSources map[string]*EventSource
	Topics       []string
	Logger       *zap.Logger
	Finder       Finder

	Authenticator auth.Authenticator
	Authorizer    auth.Authorizer
	Metrics       Metrics

	CleaningInterval time.Duration
}

Server is the main struct for the server.

func (*Server) GenerateEventSources

func (s *Server) GenerateEventSources(topics []string)

GenerateEventSources generates eventSources for each topic.

func (*Server) MetricHandler added in v1.1.8

func (s *Server) MetricHandler() http.Handler

func (*Server) Publish

func (s *Server) Publish(topic string, event []byte)

Publish publishes an event to all the subscribers of the given topic.

func (*Server) SetAuthenticator

func (s *Server) SetAuthenticator(authenticator auth.Authenticator)

SetAuthenticator replaces the authentication function.

func (*Server) SetAuthenticatorFunc

func (s *Server) SetAuthenticatorFunc(authenticator auth.AuthenticatorFunc)

SetAuthenticatorFunc replaces the authentication function.

func (*Server) SetAuthorizer

func (s *Server) SetAuthorizer(authorizer auth.Authorizer)

SetAuthorizer replaces the authentication function.

func (*Server) SetAuthorizerFunc

func (s *Server) SetAuthorizerFunc(authorizer auth.AuthorizerFunc)

SetAuthorizerFunc replaces the authentication function.

type Subscriber

type Subscriber struct {
	Stream  quic.SendStream
	Corrupt *atomic.Bool
}

func NewSubscriber

func NewSubscriber(stream quic.SendStream) Subscriber

type Worker

type Worker struct {
	Pond   *koi.Pond
	Logger *zap.Logger
}

func NewWorker

func NewWorker(cfg WorkerConfig, l *zap.Logger) Worker

func (*Worker) AddAcceptClientWork added in v1.2.0

func (w *Worker) AddAcceptClientWork(server *Server, count int)

func (*Worker) AddDistributeWork added in v1.2.0

func (w *Worker) AddDistributeWork(work *DistributeWork)

type WorkerConfig added in v1.2.0

type WorkerConfig struct {
	ClientAcceptorCount       int64
	ClientAcceptorQueueSize   int
	EventDistributorCount     int64
	EventDistributorQueueSize int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL