liftbridge: github.com/liftbridge-io/liftbridge/server Index | Files | Directories

package server

import "github.com/liftbridge-io/liftbridge/server"

Index

Package Files

activity.go api.go config.go fsm.go metadata.go partition.go raft.go replicator.go server.go signal.go stream.go version.go

Constants

const (
    // DefaultNamespace is the default cluster namespace to use if one is not
    // specified.
    DefaultNamespace = "liftbridge-default"

    // DefaultPort is the port to bind to if one is not specified.
    DefaultPort = 9292
)
const Version = "v1.2.0"

Version of the Liftbridge server.

Variables

var (
    // ErrStreamExists is returned by CreateStream when attempting to create a
    // stream that already exists.
    ErrStreamExists = errors.New("stream already exists")

    // ErrStreamNotFound is returned by DeleteStream/PauseStream when
    // attempting to delete/pause a stream that does not exist.
    ErrStreamNotFound = errors.New("stream does not exist")

    // ErrPartitionNotFound is returned by PauseStream when attempting to pause
    // a stream partition that does not exist.
    ErrPartitionNotFound = errors.New("partition does not exist")
)

func GetLogLevel Uses

func GetLogLevel(level string) (uint32, error)

GetLogLevel converts the level string to its corresponding int value. It returns an error if the level is invalid.

type ActivityStreamConfig Uses

type ActivityStreamConfig struct {
    Enabled          bool
    PublishTimeout   time.Duration
    PublishAckPolicy client.AckPolicy
}

ActivityStreamConfig contains settings for controlling activity stream behavior.

type ClusteringConfig Uses

type ClusteringConfig struct {
    ServerID                string
    Namespace               string
    RaftSnapshots           int
    RaftSnapshotThreshold   uint64
    RaftCacheSize           int
    RaftBootstrapSeed       bool
    RaftBootstrapPeers      []string
    ReplicaMaxLagTime       time.Duration
    ReplicaMaxLeaderTimeout time.Duration
    ReplicaFetchTimeout     time.Duration
    ReplicaMaxIdleWait      time.Duration
    MinISR                  int
}

ClusteringConfig contains settings for controlling cluster behavior.

type Config Uses

type Config struct {
    Listen              HostPort
    Host                string
    Port                int
    LogLevel            uint32
    LogRecovery         bool
    LogRaft             bool
    LogSilent           bool
    DataDir             string
    BatchMaxMessages    int
    BatchMaxTime        time.Duration
    MetadataCacheMaxAge time.Duration
    TLSKey              string
    TLSCert             string
    TLSClientAuth       bool
    TLSClientAuthCA     string
    NATS                nats.Options
    Streams             StreamsConfig
    Clustering          ClusteringConfig
    ActivityStream      ActivityStreamConfig
}

Config contains all settings for a Liftbridge Server.

func NewConfig Uses

func NewConfig(configFile string) (*Config, error)

NewConfig creates a new Config with default settings and applies any settings from the given configuration file.

func NewDefaultConfig Uses

func NewDefaultConfig() *Config

NewDefaultConfig creates a new Config with default settings.

func (Config) GetConnectionAddress Uses

func (c Config) GetConnectionAddress() HostPort

GetConnectionAddress returns the host if specified and listen otherwise.

func (Config) GetListenAddress Uses

func (c Config) GetListenAddress() HostPort

GetListenAddress returns the address and port to listen to.

type HostPort Uses

type HostPort struct {
    Host string
    Port int
}

HostPort is simple struct to hold parsed listen/addr strings.

type Server Uses

type Server struct {
    // contains filtered or unexported fields
}

Server is the main Liftbridge object. Create it by calling New or RunServerWithConfig.

func New Uses

func New(config *Config) *Server

New creates a new Server with the given configuration. Call Start to run the Server.

func RunServerWithConfig Uses

func RunServerWithConfig(config *Config) (*Server, error)

RunServerWithConfig creates and starts a new Server with the given configuration. It returns an error if the Server failed to start.

func (*Server) Apply Uses

func (s *Server) Apply(l *raft.Log) interface{}

Apply applies a Raft log entry to the controller FSM. This is invoked by Raft once a log entry is committed. It returns a value which will be made available on the ApplyFuture returned by Raft.Apply if that method was called on the same Raft node as the FSM.

Note that, on restart, this can be called for entries that have already been committed to Raft as part of the recovery process. As such, this should be an idempotent call.

func (*Server) IsLeader Uses

func (s *Server) IsLeader() bool

IsLeader indicates if the server is currently the metadata leader or not. If consistency is required for an operation, it should be threaded through the Raft cluster since that is the single source of truth. If a server thinks it's leader when it's not, the operation it proposes to the Raft cluster will fail.

func (*Server) IsRunning Uses

func (s *Server) IsRunning() bool

IsRunning indicates if the server is currently running or has been stopped.

func (*Server) Restore Uses

func (s *Server) Restore(snapshot io.ReadCloser) error

Restore is used to restore an FSM from a snapshot. It is not called concurrently with any other command. The FSM must discard all previous state.

func (*Server) Snapshot Uses

func (s *Server) Snapshot() (raft.FSMSnapshot, error)

Snapshot is used to support log compaction. This call should return an FSMSnapshot which can be used to save a point-in-time snapshot of the FSM. Apply and Snapshot are not called in multiple threads, but Apply will be called concurrently with Persist. This means the FSM should be implemented in a fashion that allows for concurrent updates while a snapshot is happening.

func (*Server) Start Uses

func (s *Server) Start() (err error)

Start the Server. This is not a blocking call. It will return an error if the Server cannot start properly.

func (*Server) Stop Uses

func (s *Server) Stop() error

Stop will attempt to gracefully shut the Server down by signaling the stop and waiting for all goroutines to return.

type StreamsConfig Uses

type StreamsConfig struct {
    RetentionMaxBytes             int64
    RetentionMaxMessages          int64
    RetentionMaxAge               time.Duration
    CleanerInterval               time.Duration
    SegmentMaxBytes               int64
    SegmentMaxAge                 time.Duration
    Compact                       bool
    CompactMaxGoroutines          int
    AutoPauseTime                 time.Duration
    AutoPauseDisableIfSubscribers bool
}

StreamsConfig contains settings for controlling the message log for streams.

func (*StreamsConfig) ApplyOverrides Uses

func (l *StreamsConfig) ApplyOverrides(c *proto.StreamConfig)

ApplyOverrides applies the values from the StreamConfig protobuf to the StreamsConfig struct. If the value is present in the request's config section, it will be set in StreamsConfig.

func (StreamsConfig) AutoPauseString Uses

func (l StreamsConfig) AutoPauseString() string

AutoPauseString returns a human-readable string representation of the auto pause setting.

func (StreamsConfig) RetentionString Uses

func (l StreamsConfig) RetentionString() string

RetentionString returns a human-readable string representation of the retention policy.

Directories

PathSynopsis
commitlogPackage commitlog provides an implementation for a file-backed write-ahead log.
health
logger
protocol

Package server imports 42 packages (graph) and is imported by 2 packages. Updated 2020-09-23. Refresh now. Tools for package owners.