topics

package
v0.0.0-...-73c317a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2024 License: MIT Imports: 12 Imported by: 2

Documentation

Index

Constants

View Source
const MessageExpiry = time.Millisecond * 500
View Source
const MessageTimeout = time.Millisecond * 100

TODO: revise? configurable?

Variables

This section is empty.

Functions

This section is empty.

Types

type LateJoinerMessagesRequest

type LateJoinerMessagesRequest struct {
	// messages the late joiner already knows about
	KnownMessages []MessageIdentifier `json:"known_messages"`
}

type LateJoinerMessagesResponse

type LateJoinerMessagesResponse struct {
	// messages that the endpoint is holding that the late joiner doesn't have already
	HeldMessages []Message `json:"held_messages"`
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(
	endpointID ksuid.KSUID,
	endpointName string,
	transportManager *transport.Manager,
) *Manager

func (*Manager) HandleReceive

func (m *Manager) HandleReceive(container *types.Container)

func (*Manager) Publish

func (m *Manager) Publish(
	topicName string,
	topicType string,
	expiry time.Duration,
	payload []byte,
) error

func (*Manager) Start

func (m *Manager) Start()

func (*Manager) Stop

func (m *Manager) Stop()

func (*Manager) Subscribe

func (m *Manager) Subscribe(
	topicName string,
	topicType string,
	onReceive func(*Message),
) error

func (*Manager) Unsubscribe

func (m *Manager) Unsubscribe(
	topicName string,
) error

type Message

type Message struct {
	// trust the sent timestamp as it's the source of truth for the state in this message
	Timestamp time.Time `json:"timestamp"`

	// discard the message after it's this much older
	Expiry time.Duration `json:"expiry"`

	// original source
	EndpointID   ksuid.KSUID `json:"endpoint_id"`
	EndpointName string      `json:"endpoint_name"`

	// incremented by sending endpoint
	SequenceNumber int64 `json:"sequence_number"`

	// topic
	TopicName string `json:"topic_name"`
	TopicType string `json:"topic_type"`

	// to identify and route the payload
	MessageType MessageType `json:"message_type"`

	// the actual user payload / control message content
	Payload []byte `json:"payload"`
}

type MessageIdentifier

type MessageIdentifier struct {
	EndpointID     ksuid.KSUID
	SequenceNumber int64
}

type MessageType

type MessageType int
const (
	// a normal, directly delivered message
	StandardMessageType MessageType = 1

	// a message that came via the forwarded / late joiner path
	ForwardedMessageType MessageType = 2

	// a later joiner sends this when a new endpoint is discovered
	LateJoinerMessagesRequestType MessageType = 3

	// the new endpoint sends this in response
	LateJoinerMessagesResponseType MessageType = 4
)

not using iota as a means of being explicit

type Publication

type Publication struct {
	// contains filtered or unexported fields
}

func NewPublication

func NewPublication(
	endpointID ksuid.KSUID,
	endpointName string,
	topicName string,
	topicType string,
	transportManager *transport.Manager,
	subscriber **Subscriber,
) *Publication

func (*Publication) Publish

func (p *Publication) Publish(
	expiry time.Duration,
	payload []byte,
) error

func (*Publication) Start

func (p *Publication) Start()

func (*Publication) Stop

func (p *Publication) Stop()

func (*Publication) TopicType

func (p *Publication) TopicType() string

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(
	endpointID ksuid.KSUID,
	endpointName string,
	transportManager *transport.Manager,
	subscriber **Subscriber,
) *Publisher

func (*Publisher) Publish

func (p *Publisher) Publish(
	topicName string,
	topicType string,
	expiry time.Duration,
	payload []byte,
) error

func (*Publisher) Start

func (p *Publisher) Start()

func (*Publisher) Stop

func (p *Publisher) Stop()

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(
	endpointID ksuid.KSUID,
	endpointName string,
	transportManager *transport.Manager,
	publisher **Publisher,
) *Subscriber

func (*Subscriber) HandleReceive

func (s *Subscriber) HandleReceive(container *types.Container)

func (*Subscriber) Start

func (s *Subscriber) Start()

func (*Subscriber) Stop

func (s *Subscriber) Stop()

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(
	topicName string,
	topicType string,
	onReceive func(*Message),
) error

func (*Subscriber) Unsubscribe

func (s *Subscriber) Unsubscribe(
	topicName string,
) error

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(
	endpointID ksuid.KSUID,
	endpointName string,
	topicName string,
	topicType string,
	transportManager *transport.Manager,
	onReceive func(*Message),
) *Subscription

func (*Subscription) HandleReceive

func (s *Subscription) HandleReceive(message *Message)

func (*Subscription) OnReceive

func (s *Subscription) OnReceive(onReceive func(*Message))

func (*Subscription) Start

func (s *Subscription) Start()

func (*Subscription) Stop

func (s *Subscription) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL