Documentation ¶
Index ¶
- Constants
- Variables
- func IsAllSpace(s string) bool
- func ReplaceLogger(logger Logger)
- type AuthConfig
- type AuthHelper
- type BadRequest
- type BasicAuthHandler
- type BoundedQueue
- type ClientCmd
- type CloseCmd
- type Config
- type Conn
- type ConnOptions
- type ConnectRequest
- type Counter
- type ErrorCmd
- type Gauge
- type HasSize
- type HeartbeatHelper
- type Info
- type InfoRequest
- type LimitsConfig
- type LogConfig
- type Logger
- type Matcher
- type Message
- type NATSError
- type PingConfig
- type PingRequest
- type PongRequest
- type ProfileConfig
- type PublishCmd
- type PublishRequest
- type Rate
- type RateSample
- type Registry
- func (r *Registry) Loop()
- func (r *Registry) Metrics(fn func(map[string]fmt.Stringer))
- func (r *Registry) NewCounter(name string, addr *int64)
- func (r *Registry) NewGauge(name string, fn func() string)
- func (r *Registry) NewRate(name string, addr *int64, interval time.Duration)
- func (r *Registry) NewRates(name string, addr *int64, durations ...string)
- func (r *Registry) NewStringVal(name string, val string)
- func (r *Registry) NewStringerGauge(name string, stringer fmt.Stringer)
- type Request
- func ParseConnectRequest(c Conn, args string) (Request, error)
- func ParseInfoRequest(c Conn, args string) (Request, error)
- func ParsePingRequest(c Conn, args string) (Request, error)
- func ParsePongRequest(c Conn, args string) (Request, error)
- func ParsePublishRequest(c Conn, args string) (Request, error)
- func ParseSubscriptionRequest(c Conn, args string) (Request, error)
- func ParseUnsubscriptionRequest(c Conn, args string) (Request, error)
- type Response
- type Server
- type ServerCmd
- type Stats
- type StringVal
- type SubscribeCmd
- type SubscribedMessage
- type Subscription
- type SubscriptionRequest
- type TCPConn
- type Trie
- type UnregisterConnCmd
- type UnsubscribeCmd
- type UnsubscriptionRequest
- type VarzConfig
Constants ¶
const ( DEFAULT_MAX_CONTROL = 1024 DEFAULT_MAX_PAYLOAD = 1024 * 1024 DEFAULT_MAX_PENDING = 10 * 1024 * 1024 )
const ( INFO = "INFO" PUB = "PUB" SUB = "SUB" UNSUB = "UNSUB" PING = "PING" PONG = "PONG" CONNECT = "CONNECT" )
const ( MAX_CONN_CHAN_BACKLOG = 16 MAX_OUTBOUND_QUEUE_SIZE = 32 )
const ( VERSION = "0.0.1" DEFAULT_SERVER_BACKLOG = 1024 )
const (
DEFAULT_RATE_UPDATE_INTERVAL = 5 * time.Second
)
Variables ¶
var ( ErrQueueFull = errors.New("Full Queue") ErrQueueClosed = errors.New("Closed Queue") )
var ( // not a const so we can change it for testing BUF_IO_SIZE = 64 * 1024 REQUESTS = []string{INFO, PUB, SUB, UNSUB, PING, PONG, CONNECT} )
var ( ErrPayloadTooBig = &NATSError{"-ERR 'Payload size exceeded'", true} ErrProtocolOpTooBig = &NATSError{"-ERR 'Protocol Operation size exceeded'", true} ErrInvalidSubject = &NATSError{"-ERR 'Invalid Subject'", false} ErrInvalidSidTaken = &NATSError{"-ERR 'Invalid Subject Identifier (sid), already taken'", false} ErrInvalidSidNoexist = &NATSError{"-ERR 'Invalid Subject-Identifier (sid), no subscriber registered'", false} ErrInvalidConfig = &NATSError{"-ERR 'Invalid config, valid JSON required for connection configuration'", false} ErrAuthRequired = &NATSError{"-ERR 'Authorization is required'", true} ErrAuthFailed = &NATSError{"-ERR 'Authorization failed'", true} ErrUnknownOp = &NATSError{"-ERR 'Unknown Protocol Operation'", false} ErrSlowConsumer = &NATSError{"-ERR 'Slow consumer detected, connection dropped'", true} ErrUnresponsive = &NATSError{"-ERR 'Unresponsive client detected, connection dropped'", true} ErrMaxConnsExceeded = &NATSError{"-ERR 'Maximum client connections exceeded, connection dropped'", true} )
var ( PING_REQUEST = &PingRequest{} PONG_REQUEST = &PongRequest{} INFO_REQUEST = &InfoRequest{} )
var BasicMatcher = func(node *trieNode, token string) ([]*trieNode, []*trieNode) { match := node.Children[token] if match != nil { return []*trieNode{match}, nil } return emptyNodeSlice, nil }
var (
CLOSE_CMD = &CloseCmd{}
)
var INFO_PRELUDE = "INFO "
var NewHeartbeatHelper = newHeartbeatHelper
var OK = "OK"
var REQUEST_PARSERS = map[string]func(Conn, string) (Request, error){ PUB: ParsePublishRequest, SUB: ParseSubscriptionRequest, UNSUB: ParseUnsubscriptionRequest, PING: ParsePingRequest, PONG: ParsePongRequest, CONNECT: ParseConnectRequest, INFO: ParseInfoRequest, }
Map of all request parsers.
var WildcardMatcher = func(node *trieNode, token string) ([]*trieNode, []*trieNode) { matches := make([]*trieNode, 0, 3) match := node.Children[token] if match != nil { matches = append(matches, match) } match = node.Children["*"] if match != nil { matches = append(matches, match) } match = node.Children[">"] if match != nil { return matches, []*trieNode{match} } return matches, nil }
Functions ¶
func IsAllSpace ¶
func ReplaceLogger ¶
func ReplaceLogger(logger Logger)
Types ¶
type AuthConfig ¶
type AuthHelper ¶
type AuthHelper struct {
// contains filtered or unexported fields
}
func NewAuthHelper ¶
Create a new AuthHelper for the connection with the specified user map and auth timeout.
func (*AuthHelper) Auth ¶
func (h *AuthHelper) Auth(req Request) (bool, *NATSError)
Authenticate and authorize the request. Returns true iff the request is authed, otherwise false with an error.
func (*AuthHelper) Stop ¶
func (h *AuthHelper) Stop()
Stop the AuthHelper. Must be called to cleanup the internal timers.
func (*AuthHelper) Timeout ¶
func (h *AuthHelper) Timeout()
Initiates auth timeout when the Timer() fires.
func (*AuthHelper) Timer ¶
func (h *AuthHelper) Timer() <-chan time.Time
Returns the internal timer channel, which should be used to call Timeout(). It's exposed to avoid an additional goroutine, instead it can be handled from a single select statement.
type BadRequest ¶
type BadRequest struct {
Error *NATSError
}
A BadRequest represents a "Request" that had a problem for which an error will be returned to the client.
func (*BadRequest) Dispatch ¶
func (r *BadRequest) Dispatch(c Conn)
func (*BadRequest) Serve ¶
func (r *BadRequest) Serve(c Conn) *Response
type BasicAuthHandler ¶
type BasicAuthHandler struct {
// contains filtered or unexported fields
}
func NewBasicAuthHandler ¶
func NewBasicAuthHandler(users map[string]string, handlerFunc http.HandlerFunc) *BasicAuthHandler
func (*BasicAuthHandler) ServeHTTP ¶
func (h *BasicAuthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
type BoundedQueue ¶
type BoundedQueue struct {
// contains filtered or unexported fields
}
func NewBoundedQueue ¶
func NewBoundedQueue(maxSize int32) *BoundedQueue
Create a new BoundedQueue with the specified max size
func (*BoundedQueue) Close ¶
func (q *BoundedQueue) Close()
Close the queue. Must be called to cleanup the internal goroutine.
func (*BoundedQueue) Dequeue ¶
func (q *BoundedQueue) Dequeue() (HasSize, error)
Dequeue element. Will block until there is something to dequeue.
func (*BoundedQueue) HasMore ¶
func (q *BoundedQueue) HasMore() bool
Returns true if the queue has more elements to dequeue without blocking.
type ClientCmd ¶
type ClientCmd interface { // Process the command in context of the client dispatch loop. Process(Conn) }
Simple interface for commands that need to be handled by the client dispatch loop.
type Config ¶
type Config struct { BindAddress string `yaml:"bind_address"` Ping PingConfig `yaml:"ping"` Profile ProfileConfig `yaml:"pprof"` Varz VarzConfig `yaml:"varz"` Auth AuthConfig `yaml:"auth"` Log LogConfig `yaml:"logging"` Limits LimitsConfig `yaml:"limits"` }
func ParseConfig ¶
Parse the server configuration.
type Conn ¶
type Conn interface { // Serve a subscribed message on the dispatch loop. ServeMessage(*SubscribedMessage) // Serve a client request on the dispatch loop. ServeRequest(Request) // Serve a client command on the dispatch loop. ServeCommand(ClientCmd) // Read from the client. Read([]byte) (int, error) // Read a single NATS control line from the client. ReadControlLine() (string, error) // Write a response to the client. Write(*Response) // Send a command to the server. SendServerCmd(ServerCmd) // Start the connection handling. Start() // Close the connection. Close() // Returns true iff the connection is closing/closed. Closed() bool // Close the connection and send the error to the client. CloseWithError(*NATSError) // Return a map<SID, subscription>. Subscriptions() map[int]*Subscription // Server reference. Server() Server // Return the connection heartbeat helper. HeartbeatHelper() HeartbeatHelper // Returns the connection options. Options() *ConnOptions // Returns the client remote address. RemoteAddr() net.Addr }
Client connection.
type ConnOptions ¶
type ConnectRequest ¶
type ConnectRequest struct { Verbose *bool `json:"verbose"` Pedantic *bool `json:"pedantic"` User *string `json:"user"` Password *string `json:"pass"` }
A ConnectRequest represents a Request sent to authenticate (if needed) and negotiate any connection options.
func (*ConnectRequest) Dispatch ¶
func (r *ConnectRequest) Dispatch(c Conn)
func (*ConnectRequest) Serve ¶
func (r *ConnectRequest) Serve(c Conn) *Response
type HeartbeatHelper ¶
type InfoRequest ¶
type InfoRequest struct { }
A InfoRequest represents a Request sent to return the server configuration.
func (*InfoRequest) Dispatch ¶
func (r *InfoRequest) Dispatch(c Conn)
func (*InfoRequest) Serve ¶
func (r *InfoRequest) Serve(c Conn) *Response
type LimitsConfig ¶
type PingConfig ¶
type PingRequest ¶
type PingRequest struct { }
A PingRequest represents a Request sent to verify that the server is alive.
func (*PingRequest) Dispatch ¶
func (r *PingRequest) Dispatch(c Conn)
func (*PingRequest) Serve ¶
func (r *PingRequest) Serve(c Conn) *Response
type PongRequest ¶
type PongRequest struct { }
A PongRequest represents a Request sent as a response to the server's ping request. The terminology is reversed because the server is the one making the request.
func (*PongRequest) Dispatch ¶
func (r *PongRequest) Dispatch(c Conn)
func (*PongRequest) Serve ¶
func (r *PongRequest) Serve(c Conn) *Response
type ProfileConfig ¶
type ProfileConfig struct {
BindAddress string `yaml:"bind_address"`
}
type PublishCmd ¶
type PublishCmd struct {
Message *Message
}
func (*PublishCmd) Process ¶
func (cmd *PublishCmd) Process(s Server)
type PublishRequest ¶
type PublishRequest struct {
Message *Message
}
A PublishRequest represents a Request sent to publish a message.
func (*PublishRequest) Dispatch ¶
func (r *PublishRequest) Dispatch(c Conn)
func (*PublishRequest) Serve ¶
func (r *PublishRequest) Serve(c Conn) *Response
type RateSample ¶
type RateSample struct {
// contains filtered or unexported fields
}
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
var DefaultRegistry *Registry = NewRegistry(DEFAULT_RATE_UPDATE_INTERVAL)
func NewRegistry ¶
func (*Registry) NewCounter ¶
func (*Registry) NewStringVal ¶
type Request ¶
type Request interface { // Serves the request by returning an optional payload to the client. Serve(Conn) *Response // Dispatches the request to the proper processing channel(s). Dispatch(Conn) }
A Request represents an incoming client request.
type Response ¶
NATS response payload. Instead of having a single string or byte array we can pass the original message instead of copying it to the client.
func NewByteResponse ¶
func NewResponse ¶
func NewStringResponse ¶
type Server ¶
type SubscribeCmd ¶
type SubscribeCmd struct { Subscription *Subscription Done chan bool }
func (*SubscribeCmd) Process ¶
func (cmd *SubscribeCmd) Process(s Server)
type SubscribedMessage ¶
type SubscribedMessage struct { Subscription *Subscription Message *Message Last bool }
type Subscription ¶
type SubscriptionRequest ¶
type SubscriptionRequest struct { Subscription *Subscription Done chan bool }
A SubscriptionRequest
func (*SubscriptionRequest) Dispatch ¶
func (r *SubscriptionRequest) Dispatch(c Conn)
func (*SubscriptionRequest) Serve ¶
func (r *SubscriptionRequest) Serve(c Conn) *Response
type UnregisterConnCmd ¶
func (*UnregisterConnCmd) Process ¶
func (cmd *UnregisterConnCmd) Process(s Server)
type UnsubscribeCmd ¶
type UnsubscribeCmd struct { Subscription *Subscription MaxResponses int Unsubscribed chan bool }
func (*UnsubscribeCmd) Process ¶
func (cmd *UnsubscribeCmd) Process(s Server)
type UnsubscriptionRequest ¶
An UnsubscriptionRequest
func (*UnsubscriptionRequest) Dispatch ¶
func (r *UnsubscriptionRequest) Dispatch(c Conn)
func (*UnsubscriptionRequest) Serve ¶
func (r *UnsubscriptionRequest) Serve(c Conn) *Response