server

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2017 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RootPath     = "/streaming"
	ServerPath   = RootPath + "/serverz"
	StorePath    = RootPath + "/storez"
	ClientsPath  = RootPath + "/clientsz"
	ChannelsPath = RootPath + "/channelsz"
)

Routes for the monitoring pages

View Source
const (
	// VERSION is the current version for the NATS Streaming server.
	VERSION = "0.5.0"

	DefaultClusterID      = "test-cluster"
	DefaultDiscoverPrefix = "_STAN.discover"
	DefaultPubPrefix      = "_STAN.pub"
	DefaultSubPrefix      = "_STAN.sub"
	DefaultSubClosePrefix = "_STAN.subclose"
	DefaultUnSubPrefix    = "_STAN.unsub"
	DefaultClosePrefix    = "_STAN.close"
	DefaultStoreType      = stores.TypeMemory

	// DefaultHeartBeatInterval is the interval at which server sends heartbeat to a client
	DefaultHeartBeatInterval = 30 * time.Second
	// DefaultClientHBTimeout is how long server waits for a heartbeat response
	DefaultClientHBTimeout = 10 * time.Second
	// DefaultMaxFailedHeartBeats is the number of failed heartbeats before server closes
	// the client connection (total= (heartbeat interval + heartbeat timeout) * (fail count + 1)
	DefaultMaxFailedHeartBeats = int((5 * time.Minute) / DefaultHeartBeatInterval)

	// DefaultIOBatchSize is the maximum number of messages to accumulate before flushing a store.
	DefaultIOBatchSize = 1024

	// DefaultIOSleepTime is the duration (in micro-seconds) the server waits for more messages
	// before starting processing. Set to 0 (or negative) to disable the wait.
	DefaultIOSleepTime = int64(0)
)

Server defaults.

View Source
const LogPrefix = "STREAM: "

LogPrefix is prefixed to all NATS Streaming log messages

Variables

View Source
var (
	ErrInvalidSubject     = errors.New("stan: invalid subject")
	ErrInvalidStart       = errors.New("stan: invalid start position")
	ErrInvalidSub         = errors.New("stan: invalid subscription")
	ErrInvalidClient      = errors.New("stan: clientID already registered")
	ErrMissingClient      = errors.New("stan: clientID missing")
	ErrInvalidClientID    = errors.New("stan: invalid clientID: only alphanumeric and `-` or `_` characters allowed")
	ErrInvalidAckWait     = errors.New("stan: invalid ack wait time, should be >= 1s")
	ErrInvalidMaxInflight = errors.New("stan: invalid MaxInflight, should be >= 1")
	ErrInvalidConnReq     = errors.New("stan: invalid connection request")
	ErrInvalidPubReq      = errors.New("stan: invalid publish request")
	ErrInvalidSubReq      = errors.New("stan: invalid subscription request")
	ErrInvalidUnsubReq    = errors.New("stan: invalid unsubscribe request")
	ErrInvalidCloseReq    = errors.New("stan: invalid close request")
	ErrDupDurable         = errors.New("stan: duplicate durable registration")
	ErrInvalidDurName     = errors.New("stan: durable name of a durable queue subscriber can't contain the character ':'")
	ErrUnknownClient      = errors.New("stan: unknown clientID")
	ErrNoChannel          = errors.New("stan: no configured channel")
)

Errors.

View Source
var DefaultNatsServerOptions = server.Options{
	Host:   "localhost",
	Port:   4222,
	NoLog:  true,
	NoSigs: true,
}

DefaultNatsServerOptions are default options for the NATS server

Functions

func ConfigureLogger

func ConfigureLogger(stanOpts *Options, natsOpts *natsd.Options)

ConfigureLogger configures logging for STAN and the embedded NATS server based on options passed.

func Debugf

func Debugf(format string, v ...interface{})

Debugf logs a debug statement

func Errorf

func Errorf(format string, v ...interface{})

Errorf logs an error

func Fatalf

func Fatalf(format string, v ...interface{})

Fatalf logs a fatal error

func Noticef

func Noticef(format string, v ...interface{})

Noticef logs a notice statement

func ProcessConfigFile added in v0.3.0

func ProcessConfigFile(configFile string, opts *Options) error

ProcessConfigFile parses the configuration file `configFile` and updates the given Streaming options `opts`.

func RemoveLogger

func RemoveLogger()

RemoveLogger clears the logger instance and debug/trace flags. Used for testing.

func Tracef

func Tracef(format string, v ...interface{})

Tracef logs a trace statement

Types

type Channelsz added in v0.5.0

type Channelsz struct {
	ClusterID string      `json:"cluster_id"`
	ServerID  string      `json:"server_id"`
	Now       time.Time   `json:"now"`
	Offset    int         `json:"offset"`
	Limit     int         `json:"limit"`
	Count     int         `json:"count"`
	Total     int         `json:"total"`
	Names     []string    `json:"names,omitempty"`
	Channels  []*Channelz `json:"channels,omitempty"`
}

Channelsz lists the name of all NATS Streaming Channelsz

type Channelz added in v0.5.0

type Channelz struct {
	Name          string           `json:"name"`
	Msgs          int              `json:"msgs"`
	Bytes         uint64           `json:"bytes"`
	FirstSeq      uint64           `json:"first_seq"`
	LastSeq       uint64           `json:"last_seq"`
	Subscriptions []*Subscriptionz `json:"subscriptions,omitempty"`
}

Channelz describes a NATS Streaming Channel

type Clientsz added in v0.5.0

type Clientsz struct {
	ClusterID string     `json:"cluster_id"`
	ServerID  string     `json:"server_id"`
	Now       time.Time  `json:"now"`
	Offset    int        `json:"offset"`
	Limit     int        `json:"limit"`
	Count     int        `json:"count"`
	Total     int        `json:"total"`
	Clients   []*Clientz `json:"clients"`
}

Clientsz lists the client connections

type Clientz added in v0.5.0

type Clientz struct {
	ID            string                      `json:"id"`
	HBInbox       string                      `json:"hb_inbox"`
	Subscriptions map[string][]*Subscriptionz `json:"subscriptions,omitempty"`
}

Clientz describes a NATS Streaming Client connection

type Options

type Options struct {
	ID                 string
	DiscoverPrefix     string
	StoreType          string
	FilestoreDir       string
	FileStoreOpts      stores.FileStoreOptions
	stores.StoreLimits               // Store limits (MaxChannels, etc..)
	Trace              bool          // Verbose trace
	Debug              bool          // Debug trace
	HandleSignals      bool          // Should the server setup a signal handler (for Ctrl+C, etc...)
	Secure             bool          // Create a TLS enabled connection w/o server verification
	ClientCert         string        // Client Certificate for TLS
	ClientKey          string        // Client Key for TLS
	ClientCA           string        // Client CAs for TLS
	IOBatchSize        int           // Maximum number of messages collected from clients before starting their processing.
	IOSleepTime        int64         // Duration (in micro-seconds) the server waits for more message to fill up a batch.
	NATSServerURL      string        // URL for external NATS Server to connect to. If empty, NATS Server is embedded.
	ClientHBInterval   time.Duration // Interval at which server sends heartbeat to a client.
	ClientHBTimeout    time.Duration // How long server waits for a heartbeat response.
	ClientHBFailCount  int           // Number of failed heartbeats before server closes client connection.
	AckSubsPoolSize    int           // Number of internal subscriptions handling incoming ACKs (0 means one per client's subscription).
	FTGroupName        string        // Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore.
	Partitioning       bool          // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits.
}

Options for STAN Server

func GetDefaultOptions

func GetDefaultOptions() (o *Options)

GetDefaultOptions returns default options for the STAN server

type Serverz added in v0.5.0

type Serverz struct {
	ClusterID     string    `json:"cluster_id"`
	ServerID      string    `json:"server_id"`
	Version       string    `json:"version"`
	GoVersion     string    `json:"go"`
	State         string    `json:"state"`
	Now           time.Time `json:"now"`
	Start         time.Time `json:"start_time"`
	Uptime        string    `json:"uptime"`
	Clients       int       `json:"clients"`
	Subscriptions int       `json:"subscriptions"`
	Channels      int       `json:"channels"`
	TotalMsgs     int       `json:"total_msgs"`
	TotalBytes    uint64    `json:"total_bytes"`
}

Serverz describes the NATS Streaming Server

type StanServer

type StanServer struct {
	// contains filtered or unexported fields
}

StanServer structure represents the STAN server

func RunServer

func RunServer(ID string) (*StanServer, error)

RunServer will startup an embedded STAN server and a nats-server to support it.

func RunServerWithOpts

func RunServerWithOpts(stanOpts *Options, natsOpts *server.Options) (newServer *StanServer, returnedError error)

RunServerWithOpts will startup an embedded STAN server and a nats-server to support it.

func (*StanServer) ClusterID

func (s *StanServer) ClusterID() string

ClusterID returns the STAN Server's ID.

func (*StanServer) LastError added in v0.4.0

func (s *StanServer) LastError() error

LastError returns the last fatal error the server experienced.

func (*StanServer) Shutdown

func (s *StanServer) Shutdown()

Shutdown will close our NATS connection and shutdown any embedded NATS server.

func (*StanServer) State added in v0.4.0

func (s *StanServer) State() State

State returns the state of this server.

type State added in v0.4.0

type State int8

State represents the possible server states

const (
	Standalone State = iota
	FTActive
	FTStandby
	Failed
	Shutdown
)

Possible server states

func (State) String added in v0.4.0

func (state State) String() string

type Storez added in v0.5.0

type Storez struct {
	ClusterID  string             `json:"cluster_id"`
	ServerID   string             `json:"server_id"`
	Now        time.Time          `json:"now"`
	Type       string             `json:"type"`
	Limits     stores.StoreLimits `json:"limits"`
	TotalMsgs  int                `json:"total_msgs"`
	TotalBytes uint64             `json:"total_bytes"`
}

Storez describes the NATS Streaming Store

type Subscriptionz added in v0.5.0

type Subscriptionz struct {
	Inbox        string `json:"inbox"`
	AckInbox     string `json:"ack_inbox"`
	DurableName  string `json:"durable_name,omitempty"`
	QueueName    string `json:"queue_name,omitempty"`
	IsDurable    bool   `json:"is_durable"`
	MaxInflight  int    `json:"max_inflight"`
	AckWait      int    `json:"ack_wait"`
	LastSent     uint64 `json:"last_sent"`
	PendingCount int    `json:"pending_count"`
	IsStalled    bool   `json:"is_stalled"`
}

Subscriptionz describes a NATS Streaming Subscription

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL