server

package
v1.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2022 License: Apache-2.0 Imports: 50 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// DefaultNamespace is the default cluster namespace to use if one is not
	// specified.
	DefaultNamespace = "liftbridge-default"

	// DefaultPort is the port to bind to if one is not specified.
	DefaultPort = 9292
)
View Source
const Version = "v1.9.0"

Version of the Liftbridge server.

Variables

View Source
var (
	// ErrStreamExists is returned by CreateStream when attempting to create a
	// stream that already exists.
	ErrStreamExists = errors.New("stream already exists")

	// ErrStreamNotFound is returned by DeleteStream/PauseStream when
	// attempting to delete/pause a stream that does not exist.
	ErrStreamNotFound = errors.New("stream does not exist")

	// ErrPartitionNotFound is returned by PauseStream when attempting to pause
	// a stream partition that does not exist.
	ErrPartitionNotFound = errors.New("partition does not exist")

	// ErrConsumerGroupExists is returned by createConsumerGroup when
	// attempting to create a group that already exists.
	ErrConsumerGroupExists = errors.New("consumer group already exists")

	// ErrConsumerGroupNotFound is returned by JoinConsumerGroup when
	// attempting to join a group that does not exist.
	ErrConsumerGroupNotFound = errors.New("consumer group does not exist")

	// ErrConsumerAlreadyMember is returned by JoinConsumerGroup when the
	// consumer is already a member of the group.
	ErrConsumerAlreadyMember = errors.New("consumer is already a member of the consumer group")

	// ErrConsumerNotMember is returned by LeaveConsumerGroup when the
	// consumer if not a member of the group.
	ErrConsumerNotMember = errors.New("consumer is not a member of the consumer group")

	// ErrBrokerNotCoordinator is returned by GetConsumerGroupAssignments when
	// this server is not the coordinator for the requested consumer group.
	ErrBrokerNotCoordinator = errors.New("broker is not the consumer group coordinator")

	// ErrGroupEpoch is returned by GetConsumerGroupAssignments when the
	// client-provided group epoch differs from the server-side group epoch.
	ErrGroupEpoch = errors.New("client-provided group epoch differs from broker group epoch")
)

Functions

func AuthzStreamInterceptor added in v1.9.0

func AuthzStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error

AuthzStreamInterceptor gets user from TLS-authenticated stream request and add user to ctx

func AuthzUnaryInterceptor added in v1.9.0

func AuthzUnaryInterceptor(
	ctx context.Context,
	req interface{},
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (interface{}, error)

AuthzUnaryInterceptor gets user from TLS-authenticated request and add user to ctx

func GetLogLevel

func GetLogLevel(level string) (uint32, error)

GetLogLevel converts the level string to its corresponding int value. It returns an error if the level is invalid.

Types

type ActivityStreamConfig

type ActivityStreamConfig struct {
	Enabled          bool
	PublishTimeout   time.Duration
	PublishAckPolicy client.AckPolicy
}

ActivityStreamConfig contains settings for controlling activity stream behavior.

type ClusteringConfig

type ClusteringConfig struct {
	ServerID                string
	Namespace               string
	RaftSnapshots           int
	RaftSnapshotThreshold   uint64
	RaftCacheSize           int
	RaftBootstrapSeed       bool
	RaftBootstrapPeers      []string
	RaftMaxQuorumSize       uint
	ReplicaMaxLagTime       time.Duration
	ReplicaMaxLeaderTimeout time.Duration
	ReplicaFetchTimeout     time.Duration
	ReplicaMaxIdleWait      time.Duration
	MinISR                  int
	ReplicationMaxBytes     int64
}

ClusteringConfig contains settings for controlling cluster behavior.

type Config

type Config struct {
	Listen               HostPort
	Host                 string
	Port                 int
	LogLevel             uint32
	LogRecovery          bool
	LogRaft              bool
	LogNATS              bool
	LogSilent            bool
	DataDir              string
	BatchMaxMessages     int
	BatchMaxTime         time.Duration
	MetadataCacheMaxAge  time.Duration
	TLSKey               string
	TLSCert              string
	TLSClientAuth        bool
	TLSClientAuthCA      string
	TLSClientAuthz       bool
	TLSClientAuthzModel  string
	TLSClientAuthzPolicy string
	NATS                 nats.Options
	EmbeddedNATS         bool
	EmbeddedNATSConfig   string
	Streams              StreamsConfig
	Clustering           ClusteringConfig
	ActivityStream       ActivityStreamConfig
	CursorsStream        CursorsStreamConfig
	Groups               GroupsConfig
}

Config contains all settings for a Liftbridge Server.

func NewConfig

func NewConfig(configFile string) (*Config, error)

NewConfig creates a new Config with default settings and applies any settings from the given configuration file.

func NewDefaultConfig

func NewDefaultConfig() *Config

NewDefaultConfig creates a new Config with default settings.

func (Config) GetConnectionAddress

func (c Config) GetConnectionAddress() HostPort

GetConnectionAddress returns the host if specified and listen otherwise.

func (Config) GetListenAddress

func (c Config) GetListenAddress() HostPort

GetListenAddress returns the address and port to listen to.

func (Config) NATSServersString added in v1.5.0

func (c Config) NATSServersString() string

NATSServersString returns a human-readable string representation of the list of NATS servers.

type CursorsStreamConfig added in v1.3.0

type CursorsStreamConfig struct {
	Partitions        int32
	ReplicationFactor int32
	AutoPauseTime     time.Duration
}

CursorsStreamConfig contains settings for controlling cursors stream behavior.

type EventTimestamps added in v1.4.0

type EventTimestamps struct {
	// contains filtered or unexported fields
}

EventTimestamps contains the first and latest times when an event has occurred.

type GroupsConfig added in v1.8.0

type GroupsConfig struct {
	ConsumerTimeout    time.Duration
	CoordinatorTimeout time.Duration
}

GroupsConfig contains settings for controlling consumer group behavior.

type HostPort

type HostPort struct {
	Host string
	Port int
}

HostPort is simple struct to hold parsed listen/addr strings.

type RaftLog added in v1.6.0

type RaftLog struct {
	*raft.Log
}

RaftLog represents an entry into the Raft log.

type RaftLogListener added in v1.6.0

type RaftLogListener interface {
	Receive(*RaftLog)
}

RaftLogListener is a listener for Raft logs.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the main Liftbridge object. Create it by calling New or RunServerWithConfig.

func New

func New(config *Config) *Server

New creates a new Server with the given configuration. Call Start to run the Server.

func RunServerWithConfig

func RunServerWithConfig(config *Config) (*Server, error)

RunServerWithConfig creates and starts a new Server with the given configuration. It returns an error if the Server failed to start.

func (*Server) AddRaftLogListener added in v1.6.0

func (s *Server) AddRaftLogListener(listener RaftLogListener)

AddRaftLogListener adds a Raft log listener.

func (*Server) Apply

func (s *Server) Apply(l *raft.Log) interface{}

Apply applies a Raft log entry to the controller FSM. This is invoked by Raft once a log entry is committed. It returns a value which will be made available on the ApplyFuture returned by Raft.Apply if that method was called on the same Raft node as the FSM.

Note that, on restart, this can be called for entries that have already been committed to Raft as part of the recovery process. As such, this should be an idempotent call.

func (*Server) GetListenPort added in v1.3.0

func (s *Server) GetListenPort() int

GetListenPort returns the port the server is listening to. Returns 0 if the server is not listening.

func (*Server) IsLeader

func (s *Server) IsLeader() bool

IsLeader indicates if the server is currently the metadata leader or not. If consistency is required for an operation, it should be threaded through the Raft cluster since that is the single source of truth. If a server thinks it's leader when it's not, the operation it proposes to the Raft cluster will fail.

func (*Server) IsRunning

func (s *Server) IsRunning() bool

IsRunning indicates if the server is currently running or has been stopped.

func (*Server) Restore

func (s *Server) Restore(snapshot io.ReadCloser) error

Restore is used to restore an FSM from a snapshot. It is not called concurrently with any other command. The FSM must discard all previous state.

func (*Server) Snapshot

func (s *Server) Snapshot() (raft.FSMSnapshot, error)

Snapshot is used to support log compaction. This call should return an FSMSnapshot which can be used to save a point-in-time snapshot of the FSM. Apply and Snapshot are not called in multiple threads, but Apply will be called concurrently with Persist. This means the FSM should be implemented in a fashion that allows for concurrent updates while a snapshot is happening.

func (*Server) Start

func (s *Server) Start() (err error)

Start the Server. This is not a blocking call. It will return an error if the Server cannot start properly.

func (*Server) Stop

func (s *Server) Stop() error

Stop will attempt to gracefully shut the Server down by signaling the stop and waiting for all goroutines to return.

type StreamsConfig

type StreamsConfig struct {
	RetentionMaxBytes             int64
	RetentionMaxMessages          int64
	RetentionMaxAge               time.Duration
	CleanerInterval               time.Duration
	SegmentMaxBytes               int64
	SegmentMaxAge                 time.Duration
	Compact                       bool
	CompactMaxGoroutines          int
	AutoPauseTime                 time.Duration
	AutoPauseDisableIfSubscribers bool
	MinISR                        int
	ConcurrencyControl            bool
	Encryption                    bool
}

StreamsConfig contains settings for controlling the message log for streams.

func (*StreamsConfig) ApplyOverrides added in v1.2.0

func (l *StreamsConfig) ApplyOverrides(c *proto.StreamConfig)

ApplyOverrides applies the values from the StreamConfig protobuf to the StreamsConfig struct. If the value is present in the request's config section, it will be set in StreamsConfig.

func (StreamsConfig) AutoPauseString added in v1.3.0

func (l StreamsConfig) AutoPauseString() string

AutoPauseString returns a human-readable string representation of the auto pause setting.

func (StreamsConfig) RetentionString

func (l StreamsConfig) RetentionString() string

RetentionString returns a human-readable string representation of the retention policy.

Directories

Path Synopsis
Package commitlog provides an implementation for a file-backed write-ahead log.
Package commitlog provides an implementation for a file-backed write-ahead log.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL