gonatsd

package
v0.0.0-...-8b836c9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2013 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEFAULT_MAX_CONTROL = 1024
	DEFAULT_MAX_PAYLOAD = 1024 * 1024
	DEFAULT_MAX_PENDING = 10 * 1024 * 1024
)
View Source
const (
	INFO    = "INFO"
	PUB     = "PUB"
	SUB     = "SUB"
	UNSUB   = "UNSUB"
	PING    = "PING"
	PONG    = "PONG"
	CONNECT = "CONNECT"
)
View Source
const (
	MAX_CONN_CHAN_BACKLOG   = 16
	MAX_OUTBOUND_QUEUE_SIZE = 32
)
View Source
const (
	VERSION                = "0.0.1"
	DEFAULT_SERVER_BACKLOG = 1024
)
View Source
const (
	DEFAULT_RATE_UPDATE_INTERVAL = 5 * time.Second
)

Variables

View Source
var (
	ErrQueueFull   = errors.New("Full Queue")
	ErrQueueClosed = errors.New("Closed Queue")
)
View Source
var (
	// not a const so we can change it for testing
	BUF_IO_SIZE = 64 * 1024

	REQUESTS = []string{INFO, PUB, SUB, UNSUB, PING, PONG, CONNECT}
)
View Source
var (
	ErrPayloadTooBig     = &NATSError{"-ERR 'Payload size exceeded'", true}
	ErrProtocolOpTooBig  = &NATSError{"-ERR 'Protocol Operation size exceeded'", true}
	ErrInvalidSubject    = &NATSError{"-ERR 'Invalid Subject'", false}
	ErrInvalidSidTaken   = &NATSError{"-ERR 'Invalid Subject Identifier (sid), already taken'", false}
	ErrInvalidSidNoexist = &NATSError{"-ERR 'Invalid Subject-Identifier (sid), no subscriber registered'", false}
	ErrInvalidConfig     = &NATSError{"-ERR 'Invalid config, valid JSON required for connection configuration'", false}
	ErrAuthRequired      = &NATSError{"-ERR 'Authorization is required'", true}
	ErrAuthFailed        = &NATSError{"-ERR 'Authorization failed'", true}
	ErrUnknownOp         = &NATSError{"-ERR 'Unknown Protocol Operation'", false}
	ErrSlowConsumer      = &NATSError{"-ERR 'Slow consumer detected, connection dropped'", true}
	ErrUnresponsive      = &NATSError{"-ERR 'Unresponsive client detected, connection dropped'", true}
	ErrMaxConnsExceeded  = &NATSError{"-ERR 'Maximum client connections exceeded, connection dropped'", true}
)
View Source
var (
	PING_REQUEST = &PingRequest{}
	PONG_REQUEST = &PongRequest{}
	INFO_REQUEST = &InfoRequest{}
)
View Source
var BasicMatcher = func(node *trieNode, token string) ([]*trieNode, []*trieNode) {
	match := node.Children[token]
	if match != nil {
		return []*trieNode{match}, nil
	}
	return emptyNodeSlice, nil
}
View Source
var (
	CLOSE_CMD = &CloseCmd{}
)
View Source
var INFO_PRELUDE = "INFO "
View Source
var NewHeartbeatHelper = newHeartbeatHelper
View Source
var OK = "OK"

Map of all request parsers.

View Source
var WildcardMatcher = func(node *trieNode, token string) ([]*trieNode, []*trieNode) {
	matches := make([]*trieNode, 0, 3)

	match := node.Children[token]
	if match != nil {
		matches = append(matches, match)
	}

	match = node.Children["*"]
	if match != nil {
		matches = append(matches, match)
	}

	match = node.Children[">"]
	if match != nil {
		return matches, []*trieNode{match}
	}

	return matches, nil
}

Functions

func IsAllSpace

func IsAllSpace(s string) bool

func ReplaceLogger

func ReplaceLogger(logger Logger)

Types

type AuthConfig

type AuthConfig struct {
	Users           map[string]string `yaml:"users"`
	Timeout         string            `yaml:"timeout"`
	TimeoutDuration time.Duration
}

type AuthHelper

type AuthHelper struct {
	// contains filtered or unexported fields
}

func NewAuthHelper

func NewAuthHelper(conn Conn, users map[string]string, timeout time.Duration) *AuthHelper

Create a new AuthHelper for the connection with the specified user map and auth timeout.

func (*AuthHelper) Auth

func (h *AuthHelper) Auth(req Request) (bool, *NATSError)

Authenticate and authorize the request. Returns true iff the request is authed, otherwise false with an error.

func (*AuthHelper) Stop

func (h *AuthHelper) Stop()

Stop the AuthHelper. Must be called to cleanup the internal timers.

func (*AuthHelper) Timeout

func (h *AuthHelper) Timeout()

Initiates auth timeout when the Timer() fires.

func (*AuthHelper) Timer

func (h *AuthHelper) Timer() <-chan time.Time

Returns the internal timer channel, which should be used to call Timeout(). It's exposed to avoid an additional goroutine, instead it can be handled from a single select statement.

type BadRequest

type BadRequest struct {
	Error *NATSError
}

A BadRequest represents a "Request" that had a problem for which an error will be returned to the client.

func (*BadRequest) Dispatch

func (r *BadRequest) Dispatch(c Conn)

func (*BadRequest) Serve

func (r *BadRequest) Serve(c Conn) *Response

type BasicAuthHandler

type BasicAuthHandler struct {
	// contains filtered or unexported fields
}

func NewBasicAuthHandler

func NewBasicAuthHandler(users map[string]string, handlerFunc http.HandlerFunc) *BasicAuthHandler

func (*BasicAuthHandler) ServeHTTP

func (h *BasicAuthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

type BoundedQueue

type BoundedQueue struct {
	// contains filtered or unexported fields
}

func NewBoundedQueue

func NewBoundedQueue(maxSize int32) *BoundedQueue

Create a new BoundedQueue with the specified max size

func (*BoundedQueue) Close

func (q *BoundedQueue) Close()

Close the queue. Must be called to cleanup the internal goroutine.

func (*BoundedQueue) Dequeue

func (q *BoundedQueue) Dequeue() (HasSize, error)

Dequeue element. Will block until there is something to dequeue.

func (*BoundedQueue) Enqueue

func (q *BoundedQueue) Enqueue(o HasSize) error

Enqueue element.

func (*BoundedQueue) HasMore

func (q *BoundedQueue) HasMore() bool

Returns true if the queue has more elements to dequeue without blocking.

type ClientCmd

type ClientCmd interface {
	// Process the command in context of the client dispatch loop.
	Process(Conn)
}

Simple interface for commands that need to be handled by the client dispatch loop.

type CloseCmd

type CloseCmd struct {
}

Close the connection

func (*CloseCmd) Process

func (c *CloseCmd) Process(conn Conn)

type Config

type Config struct {
	BindAddress string        `yaml:"bind_address"`
	Ping        PingConfig    `yaml:"ping"`
	Profile     ProfileConfig `yaml:"pprof"`
	Varz        VarzConfig    `yaml:"varz"`
	Auth        AuthConfig    `yaml:"auth"`
	Log         LogConfig     `yaml:"logging"`
	Limits      LimitsConfig  `yaml:"limits"`
}

func ParseConfig

func ParseConfig(filename string) (*Config, error)

Parse the server configuration.

type Conn

type Conn interface {
	// Serve a subscribed message on the dispatch loop.
	ServeMessage(*SubscribedMessage)

	// Serve a client request on the dispatch loop.
	ServeRequest(Request)

	// Serve a client command on the dispatch loop.
	ServeCommand(ClientCmd)

	// Read from the client.
	Read([]byte) (int, error)

	// Read a single NATS control line from the client.
	ReadControlLine() (string, error)

	// Write a response to the client.
	Write(*Response)

	// Send a command to the server.
	SendServerCmd(ServerCmd)

	// Start the connection handling.
	Start()

	// Close the connection.
	Close()

	// Returns true iff the connection is closing/closed.
	Closed() bool

	// Close the connection and send the error to the client.
	CloseWithError(*NATSError)

	// Return a map<SID, subscription>.
	Subscriptions() map[int]*Subscription

	// Server reference.
	Server() Server

	// Return the connection heartbeat helper.
	HeartbeatHelper() HeartbeatHelper

	// Returns the connection options.
	Options() *ConnOptions

	// Returns the client remote address.
	RemoteAddr() net.Addr
}

Client connection.

func NewConn

func NewConn(server Server, tc TCPConn) Conn

Creates a new connection given a server and an underlying TCP connection.

type ConnOptions

type ConnOptions struct {
	Verbose  bool
	Pedantic bool
}

type ConnectRequest

type ConnectRequest struct {
	Verbose  *bool   `json:"verbose"`
	Pedantic *bool   `json:"pedantic"`
	User     *string `json:"user"`
	Password *string `json:"pass"`
}

A ConnectRequest represents a Request sent to authenticate (if needed) and negotiate any connection options.

func (*ConnectRequest) Dispatch

func (r *ConnectRequest) Dispatch(c Conn)

func (*ConnectRequest) Serve

func (r *ConnectRequest) Serve(c Conn) *Response

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

func (*Counter) String

func (c *Counter) String() string

type ErrorCmd

type ErrorCmd struct {
	Error error
}

Close the connection due to a low level error.

func (*ErrorCmd) Process

func (c *ErrorCmd) Process(conn Conn)

type Gauge

type Gauge struct {
	// contains filtered or unexported fields
}

func (*Gauge) String

func (g *Gauge) String() string

type HasSize

type HasSize interface {
	Size() int32
}

type HeartbeatHelper

type HeartbeatHelper interface {
	Ticker() <-chan time.Time
	Ping()
	Pong()
	Stop()
}

type Info

type Info struct {
	ServerId     string `json:"server_id"`
	Host         string `json:"host"`
	Port         int    `json:"port"`
	Version      string `json:"version"`
	AuthRequired bool   `json:"auth_required"`
	SslRequired  bool   `json:"ssl_required"`
	MaxPayload   int    `json:"max_payload"`
}

type InfoRequest

type InfoRequest struct {
}

A InfoRequest represents a Request sent to return the server configuration.

func (*InfoRequest) Dispatch

func (r *InfoRequest) Dispatch(c Conn)

func (*InfoRequest) Serve

func (r *InfoRequest) Serve(c Conn) *Response

type LimitsConfig

type LimitsConfig struct {
	Payload     int `yaml:"payload"`
	Pending     int `yaml:"pending"`
	ControlLine int `yaml:"control"`
	Connections int `yaml:"connections"`
}

type LogConfig

type LogConfig struct {
	MinLevel string `yaml:"level"`
	Out      string `yaml:"file"`
}

type Logger

type Logger interface {
	steno.Logger
}
var (
	Log Logger // Global singleton for current logger
)

func NewLogger

func NewLogger(file *os.File, level string) (Logger, error)

type Matcher

type Matcher func(*trieNode, string) ([]*trieNode, []*trieNode)

type Message

type Message struct {
	Subject string
	ReplyTo string
	Content []byte
}

type NATSError

type NATSError struct {
	Message string
	Close   bool
}

func (*NATSError) Error

func (e *NATSError) Error() string

type PingConfig

type PingConfig struct {
	Interval         string `yaml:"interval"`
	IntervalDuration time.Duration
	MaxOutstanding   int `yaml:"max_outstanding"`
}

type PingRequest

type PingRequest struct {
}

A PingRequest represents a Request sent to verify that the server is alive.

func (*PingRequest) Dispatch

func (r *PingRequest) Dispatch(c Conn)

func (*PingRequest) Serve

func (r *PingRequest) Serve(c Conn) *Response

type PongRequest

type PongRequest struct {
}

A PongRequest represents a Request sent as a response to the server's ping request. The terminology is reversed because the server is the one making the request.

func (*PongRequest) Dispatch

func (r *PongRequest) Dispatch(c Conn)

func (*PongRequest) Serve

func (r *PongRequest) Serve(c Conn) *Response

type ProfileConfig

type ProfileConfig struct {
	BindAddress string `yaml:"bind_address"`
}

type PublishCmd

type PublishCmd struct {
	Message *Message
}

func (*PublishCmd) Process

func (cmd *PublishCmd) Process(s Server)

type PublishRequest

type PublishRequest struct {
	Message *Message
}

A PublishRequest represents a Request sent to publish a message.

func (*PublishRequest) Dispatch

func (r *PublishRequest) Dispatch(c Conn)

func (*PublishRequest) Serve

func (r *PublishRequest) Serve(c Conn) *Response

type Rate

type Rate struct {
	// contains filtered or unexported fields
}

func (*Rate) Snapshot

func (r *Rate) Snapshot(ts int64)

func (*Rate) String

func (r *Rate) String() string

type RateSample

type RateSample struct {
	// contains filtered or unexported fields
}

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry(rateUpdateInterval time.Duration) *Registry

func (*Registry) Loop

func (r *Registry) Loop()

func (*Registry) Metrics

func (r *Registry) Metrics(fn func(map[string]fmt.Stringer))

func (*Registry) NewCounter

func (r *Registry) NewCounter(name string, addr *int64)

func (*Registry) NewGauge

func (r *Registry) NewGauge(name string, fn func() string)

func (*Registry) NewRate

func (r *Registry) NewRate(name string, addr *int64, interval time.Duration)

func (*Registry) NewRates

func (r *Registry) NewRates(name string, addr *int64, durations ...string)

func (*Registry) NewStringVal

func (r *Registry) NewStringVal(name string, val string)

func (*Registry) NewStringerGauge

func (r *Registry) NewStringerGauge(name string, stringer fmt.Stringer)

type Request

type Request interface {

	// Serves the request by returning an optional payload to the client.
	Serve(Conn) *Response

	// Dispatches the request to the proper processing channel(s).
	Dispatch(Conn)
}

A Request represents an incoming client request.

func ParseConnectRequest

func ParseConnectRequest(c Conn, args string) (Request, error)

func ParseInfoRequest

func ParseInfoRequest(c Conn, args string) (Request, error)

func ParsePingRequest

func ParsePingRequest(c Conn, args string) (Request, error)

func ParsePongRequest

func ParsePongRequest(c Conn, args string) (Request, error)

func ParsePublishRequest

func ParsePublishRequest(c Conn, args string) (Request, error)

func ParseSubscriptionRequest

func ParseSubscriptionRequest(c Conn, args string) (Request, error)

func ParseUnsubscriptionRequest

func ParseUnsubscriptionRequest(c Conn, args string) (Request, error)

type Response

type Response struct {
	Value *string
	Bytes *[]byte
}

NATS response payload. Instead of having a single string or byte array we can pass the original message instead of copying it to the client.

func NewByteResponse

func NewByteResponse(bytes []byte) *Response

func NewResponse

func NewResponse(value string, bytes []byte) *Response

func NewStringResponse

func NewStringResponse(value string) *Response

func (*Response) Size

func (r *Response) Size() (result int32)

func (*Response) Write

func (r *Response) Write(writer io.Writer) (err error)

type Server

type Server interface {
	Start()
	DeliverMessage(subscription *Subscription, message *Message)
	Commands() chan<- ServerCmd
	Subscriptions() *Trie
	Info() *[]byte
	Stats() *Stats
	Config() *Config
}

func NewServer

func NewServer(config *Config) (Server, error)

type ServerCmd

type ServerCmd interface {
	Process(Server)
}

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

func NewStats

func NewStats() *Stats

type StringVal

type StringVal struct {
	// contains filtered or unexported fields
}

func (*StringVal) String

func (v *StringVal) String() string

type SubscribeCmd

type SubscribeCmd struct {
	Subscription *Subscription
	Done         chan bool
}

func (*SubscribeCmd) Process

func (cmd *SubscribeCmd) Process(s Server)

type SubscribedMessage

type SubscribedMessage struct {
	Subscription *Subscription
	Message      *Message
	Last         bool
}

type Subscription

type Subscription struct {
	Id           int
	Subject      string
	Queue        *string
	Conn         Conn
	MaxResponses int
	Responses    uint64
}

type SubscriptionRequest

type SubscriptionRequest struct {
	Subscription *Subscription
	Done         chan bool
}

A SubscriptionRequest

func (*SubscriptionRequest) Dispatch

func (r *SubscriptionRequest) Dispatch(c Conn)

func (*SubscriptionRequest) Serve

func (r *SubscriptionRequest) Serve(c Conn) *Response

type TCPConn

type TCPConn interface {
	net.Conn
	CloseRead() error
}

TCP connection interface for testing.

type Trie

type Trie struct {
	// contains filtered or unexported fields
}

Trie - prefix tree.

func NewTrie

func NewTrie(sep string) *Trie

func (*Trie) Delete

func (t *Trie) Delete(key string, value interface{}) bool

func (*Trie) Insert

func (t *Trie) Insert(key string, value interface{})

func (*Trie) Match

func (t *Trie) Match(key string, matcher Matcher) []interface{}

func (*Trie) Nodes

func (t *Trie) Nodes() int

func (*Trie) Values

func (t *Trie) Values() int

type UnregisterConnCmd

type UnregisterConnCmd struct {
	Conn Conn
	Done chan bool
}

func (*UnregisterConnCmd) Process

func (cmd *UnregisterConnCmd) Process(s Server)

type UnsubscribeCmd

type UnsubscribeCmd struct {
	Subscription *Subscription
	MaxResponses int
	Unsubscribed chan bool
}

func (*UnsubscribeCmd) Process

func (cmd *UnsubscribeCmd) Process(s Server)

type UnsubscriptionRequest

type UnsubscriptionRequest struct {
	SubscriptionId int
	MaxResponses   int
}

An UnsubscriptionRequest

func (*UnsubscriptionRequest) Dispatch

func (r *UnsubscriptionRequest) Dispatch(c Conn)

func (*UnsubscriptionRequest) Serve

func (r *UnsubscriptionRequest) Serve(c Conn) *Response

type VarzConfig

type VarzConfig struct {
	BindAddress string            `yaml:"bind_address"`
	Users       map[string]string `yaml:"users"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL