arrebato

package
v0.1.0-alpha4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2022 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package arrebato provides the arrebato client implementation used to interact with a cluster. This includes topic management, message consumption and production.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSigningKeyExists is the error given when attempting to create a singing key pair for a client that already
	// has them.
	ErrSigningKeyExists = errors.New("signing key exists")

	// ErrNoPublicKey is the error given when querying a public key that does not exist.
	ErrNoPublicKey = errors.New("no public key")
)
View Source
var (
	// ErrNoTopic is the error given when attempting to perform an operation against a topic that does not exist.
	ErrNoTopic = errors.New("no topic")

	// ErrTopicExists is the error given when attempting to create a topic that already exists.
	ErrTopicExists = errors.New("topic exists")
)
View Source
var ErrNoACL = errors.New("no acl")

ErrNoACL is the error given when querying the server ACL before one has been created.

View Source
var ErrNoNode = errors.New("no node")

ErrNoNode is the error given when querying a node that does not exist.

Functions

This section is empty.

Types

type ACL

type ACL struct {
	Entries []ACLEntry `json:"entries"`
}

The ACL type represents the server's access-control list. It describes which clients are able to produce/consume on desired topics.

type ACLEntry

type ACLEntry struct {
	// The name of the topic.
	Topic string `json:"topic"`
	// The client identifier the entry refers to. In an insecure environment, this can be an arbitrary string that the
	// client will use to identify itself in the request metadata. When using mutual TLS, this will be a SPIFFE ID that
	// the client will include in its TLS certificate.
	Client string `json:"client"`
	// The permissions the client has on the topic.
	Permissions []ACLPermission `json:"permissions"`
}

The ACLEntry type represents the relationship between a single client and topic.

type ACLPermission

type ACLPermission uint

The ACLPermission type is an enumeration that denotes an action a client can make on a topic.

const (
	ACLPermissionUnspecified ACLPermission = iota
	ACLPermissionProduce
	ACLPermissionConsume
)

Constants for ACL permissions.

func (ACLPermission) MarshalJSON

func (p ACLPermission) MarshalJSON() ([]byte, error)

MarshalJSON is called when an ACLPermission type is marshalled to its JSON representation. It marshals as a string that matches the protobuf value.

func (ACLPermission) String

func (p ACLPermission) String() string

func (*ACLPermission) UnmarshalJSON

func (p *ACLPermission) UnmarshalJSON(b []byte) error

UnmarshalJSON is called when an ACLPermission type is unmarshalled from JSON. It assumes the JSON representation is a string that matches the protobuf name.

type Client

type Client struct {
	// contains filtered or unexported fields
}

The Client type is used to interact with an arrebato cluster.

func Dial

func Dial(ctx context.Context, config Config) (*Client, error)

Dial an arrebato cluster, returning a Client that can be used to perform requests against it.

func (*Client) ACL

func (c *Client) ACL(ctx context.Context) (ACL, error)

ACL returns the server's ACL. Returns ErrNoACL if an ACL has not been created.

func (*Client) Backup

func (c *Client) Backup(ctx context.Context) (io.ReadCloser, error)

Backup performs a backup of the server state. It returns an io.ReadCloser implementation that, when read, contains the contents of the backup.

func (*Client) Close

func (c *Client) Close() error

Close the connection to the cluster.

func (*Client) CreateSigningKeyPair

func (c *Client) CreateSigningKeyPair(ctx context.Context) (KeyPair, error)

CreateSigningKeyPair attempts to create a new KeyPair for a client.

func (*Client) CreateTopic

func (c *Client) CreateTopic(ctx context.Context, t Topic) error

CreateTopic creates a new topic described by the provided Topic. Returns ErrTopicExists if the topic already exists.

func (*Client) DeleteTopic

func (c *Client) DeleteTopic(ctx context.Context, name string) error

DeleteTopic removes a named Topic from the cluster. Returns ErrNoTopic if the topic does not exist.

func (*Client) NewConsumer

func (c *Client) NewConsumer(ctx context.Context, config ConsumerConfig) (*Consumer, error)

NewConsumer returns a new instance of the Consumer type configured to read from a desired topic as a desired consumer identifier.

func (*Client) NewProducer

func (c *Client) NewProducer(config ProducerConfig) *Producer

NewProducer returns a new instance of the Producer type that is configured to publish messages for a single topic.

func (*Client) Node

func (c *Client) Node(ctx context.Context, name string) (Node, error)

Node returns information on a named node. Returns ErrNoNode if the node is not known to the client.

func (*Client) Nodes

func (c *Client) Nodes(ctx context.Context) ([]Node, error)

Nodes returns a slice of all nodes known to the client.

func (*Client) PublicKeys

func (c *Client) PublicKeys(ctx context.Context) ([]PublicKey, error)

PublicKeys attempts to return all public keys stored in the server.

func (*Client) SetACL

func (c *Client) SetACL(ctx context.Context, acl ACL) error

SetACL updates the server's ACL.

func (*Client) SigningPublicKey

func (c *Client) SigningPublicKey(ctx context.Context, clientID string) ([]byte, error)

SigningPublicKey attempts to return the public signing key for the specified client identifier.

func (*Client) Topic

func (c *Client) Topic(ctx context.Context, name string) (Topic, error)

Topic returns a named Topic. Returns ErrNoTopic if the topic does not exist.

func (*Client) Topics

func (c *Client) Topics(ctx context.Context) ([]Topic, error)

Topics lists all topics stored in the cluster.

type Config

type Config struct {
	// Addresses of the running servers, multiple addresses are expected here so that the client can correctly
	// route write operations to the leader.
	Addresses []string

	// Configuration for connecting to the server via TLS. When using TLS, the server will expect clients to be
	// issued a SPIFFE ID for identification.
	TLS *tls.Config

	// The identifier for the client, this is only required when running the server in an insecure mode, when using
	// TLS, it is expected that the client certificate will contain a SPIFFE ID that the client will use to
	// identify itself.
	ClientID string
}

The Config type describes configuration values used by a Client.

func DefaultConfig

func DefaultConfig(addrs []string) Config

DefaultConfig returns a Config instance with sane values for a Client's connection.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

The Consumer type is used to read messages for a single topic.

func (*Consumer) Close

func (c *Consumer) Close() error

Close the stream of messages. This should be called regardless of Consume returning an error.

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, fn ConsumerFunc) error

Consume messages from a topic as a consumer. The last known consumed index is sent to the server on a periodic basis so that the consumer can restart from their last known index. Each message consumed will invoke the ConsumerFunc. This method blocks until the context is cancelled, the server returns an error or the ConsumerFunc returns an error. Close should be called regardless of Consume returning an error.

type ConsumerConfig

type ConsumerConfig struct {
	Topic      string
	ConsumerID string
}

The ConsumerConfig type describes configuration values for the Consumer type.

type ConsumerFunc

type ConsumerFunc func(ctx context.Context, m Message) error

The ConsumerFunc is a function invoked for each message consumed by a Consumer.

type KeyPair

type KeyPair struct {
	PublicKey  []byte `json:"publicKey"`
	PrivateKey []byte `json:"privateKey"`
}

The KeyPair type contains the public and private signing keys for the client to use when producing messages.

type Message

type Message struct {
	Value  proto.Message
	Key    proto.Message
	Sender Sender
}

The Message type describes a message that is produced/consumed by arrebato clients.

type Node

type Node struct {
	// The name of the node.
	Name string `json:"name"`
	// Whether this node is the leader.
	Leader bool `json:"leader"`
	// The version of the node.
	Version string `json:"version"`
	// Peers known to the node
	Peers []string `json:"peers"`
	// Topics assigned to the node
	Topics []string `json:"topics"`
}

The Node type describes a single node in the cluster.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

The Producer type is responsible for publishing messages onto a single topic.

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, m Message) error

Produce a message onto the configured topic.

type ProducerConfig

type ProducerConfig struct {
	// An optional signing key used for messages. When producing messages, if both a message key and this signing
	// key are present, a signature is sent to the server along with the message to verify the message was produced
	// by this client.
	SigningKey []byte

	// The topic to produce messages on.
	Topic string
}

The ProducerConfig type contains configuration values for a Producer.

type PublicKey

type PublicKey struct {
	ClientID  string `json:"clientId"`
	PublicKey []byte `json:"publicKey"`
}

The PublicKey type contains the public key used for verifying signatures for a single client.

type Sender

type Sender struct {
	ID           string
	Verified     bool
	KeySignature []byte
}

The Sender type describes the client that produced a message.

type Topic

type Topic struct {
	// The Name of the Topic.
	Name string `json:"name"`

	// The amount of time messages on the Topic will be stored.
	MessageRetentionPeriod time.Duration `json:"messageRetentionPeriod"`

	// The maximum age of a consumer index on a Topic before it is reset to zero.
	ConsumerRetentionPeriod time.Duration `json:"consumerRetentionPeriod"`

	// If true, any attempts to publish an unverified message onto this topic will fail.
	RequireVerifiedMessages bool `json:"requireVerifiedMessages"`
}

The Topic type describes a topic stored within the cluster.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL