internal

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2021 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Coordinator

type Coordinator interface {
	io.Closer

	// Watch for changes on the partition.
	// After called, this method will start a new goroutine that only
	// returns when the Coordinator context is done.
	Watch(received chan<- Event) error

	// Issues an Event.
	// This will have the same effect as broadcasting a message
	// for every participant on the destination.
	Write(ctx context.Context, event Event) error
}

Coordinator interface that should be implemented by the atomic broadcast handler. Commands should be issued through the coordinator to be delivered to other peers.

func NewCoordinator

func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error)

Create a new Coordinator using the given configuration. The current implementation is the EtcdCoordinator, backed by etcd.

type CoordinatorConfiguration

type CoordinatorConfiguration struct {
	// Each Coordinator will handle only a single partition.
	// This will avoid peers with overlapping partitions.
	Partition string

	// Address for etcd server.
	Server string

	// Parent context that the Coordinator will derive it's own context.
	Ctx context.Context

	// Handler for managing goroutines.
	Handler *GoRoutineHandler
}

Configuration for the coordinator.

type Core

type Core interface {
	io.Closer

	// Start listening for new messages.
	// This will receive messages from the atomic broadcast protocol
	// and parse to an object the client can handle.
	Listen() (<-chan Message, error)

	// Send a message asynchronously for the given partition.
	Send(ctx context.Context, dest string, data []byte) <-chan error
}

Core is the interface that will create the link between Relt requests and the Coordinator. Every command issued will be parsed here, and every command received should be handled here before going back to the client. Everything after this stage should care only about the atomic broadcast protocol and everything before should be abstracted as a simple communication primitive. This means that any parsing or state handling for the client should be done here.

func NewCore

func NewCore(configuration CoreConfiguration) (Core, error)

Create a new ReltCore using the given configuration. As an effect, this will instantiate a Coordinator a failure can happen while handling connection to the atomic broadcast server.

type CoreConfiguration

type CoreConfiguration struct {
	// Partition the Coordinator will work with.
	Partition string

	// Server address for the Coordinator.
	Server string

	// Default timeout to be applied when handling channels and
	// asynchronous operations.
	DefaultTimeout time.Duration
}

Configuration for the Core interface.

type CoreFlags

type CoreFlags struct {
	// contains filtered or unexported fields
}

Holds all flags used to manage the Core state. This is the same as an AtomicBoolean and is used internally to manage the core states.

type EtcdCoordinator

type EtcdCoordinator struct {
	// contains filtered or unexported fields
}

EtcdCoordinator will use etcd for atomic broadcast.

func (*EtcdCoordinator) Close

func (e *EtcdCoordinator) Close() error

Stop the etcd client connection and stop the goroutines.

func (*EtcdCoordinator) Watch

func (e *EtcdCoordinator) Watch(received chan<- Event) error

Starts a new coroutine for watching the Coordinator partition. All received information will be published back through the channel received as parameter.

After calling a routine will run bounded to the application lifetime.

func (*EtcdCoordinator) Write

func (e *EtcdCoordinator) Write(ctx context.Context, event Event) error

Write the given event issuing a PUT request through the client.

type Event

type Event struct {
	// Key affected by the event.
	Key string

	// Value that should be applied if is sending the event or
	// applied value if the event was received.
	Value []byte

	// Only used when is a received event from the atomic broadcast,
	// this transport any error that happened.
	Error error
}

Event is a structure handled by the Coordinator. Events are received and issued through the atomic broadcast.

func (Event) IsError

func (e Event) IsError() bool

Verify if the event is an error event.

type Flag

type Flag struct {
	// contains filtered or unexported fields
}

An atomic boolean implementation, to act specifically as a flag.

func (*Flag) Inactivate

func (f *Flag) Inactivate() bool

Transition the flag from `active` to `inactive`.

func (*Flag) IsActive

func (f *Flag) IsActive() bool

Verify if the flag still on `active` state.

func (*Flag) IsInactive

func (f *Flag) IsInactive() bool

Verify if the flag is on `inactive` state.

type GoRoutineHandler

type GoRoutineHandler struct {
	// contains filtered or unexported fields
}

GoRoutineHandler is responsible for handling goroutines. This is used so go routines do not leak and are spawned without any control. Using the handler to spawn new routines will guarantee that any routine that is not controller careful will be known when the application finishes.

func NewRoutineHandler

func NewRoutineHandler() *GoRoutineHandler

Create a new instance of the GoRoutineHandler.

func (*GoRoutineHandler) Close

func (h *GoRoutineHandler) Close()

Blocks while waiting for go routines to stop. This will set the working mode to off, so after this is called any spawned go routine will panic.

func (*GoRoutineHandler) Spawn

func (h *GoRoutineHandler) Spawn(f func())

This method will increase the size of the group count and spawn the new go routine. After the routine is done, the group will be decreased.

If the handler was already closed, the handler will silently ignore the request and will *not* spawn.

type Message

type Message struct {
	// Where the message was originated.
	// If a client `A` sends a messages to client `B`,
	// this value will be `B`.
	Origin string

	// Data transported.
	Data []byte

	// If an error happened.
	Error error
}

Message is the structure handled by the Core. Messages are the available data sent to the client.

type ReltCore

type ReltCore struct {
	// contains filtered or unexported fields
}

Implements the Core interface. Holds all needed configuration for receiving commands from external world and sending through the Coordinator.

This structure will be alive throughout the whole lifetime, opening some routines for handling requests and responses.

func (*ReltCore) Close

func (r *ReltCore) Close() error

The Close method can be called only once. This will cancel the application context and start shutting down all goroutines.

*This method will block until everything is finished.*

func (*ReltCore) Listen

func (r *ReltCore) Listen() (<-chan Message, error)

The Listen method can be called only once. This will start a new goroutine that receives updates from the Coordinator and parse the information to a Message object.

This goroutine will run while the application is not closed.

func (*ReltCore) Send

func (r *ReltCore) Send(ctx context.Context, dest string, data []byte) <-chan error

Send the given data to the peers that listen to the given partition. This is a broadcast message, which means that if _N_ nodes are subscribed for a partition, every node will receive the message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL