server

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2020 License: Apache-2.0 Imports: 60 Imported by: 3

Documentation

Index

Constants

View Source
const (
	// CLIENT is an end user.
	CLIENT = iota
	// ROUTER represents another server in the cluster.
	ROUTER
	// GATEWAY is a link between 2 clusters.
	GATEWAY
	// SYSTEM is an internal system client.
	SYSTEM
	// LEAF is for leaf node connections.
	LEAF
	// JETSTREAM is an internal jetstream client.
	JETSTREAM
	// ACCOUNT is for the internal client for accounts.
	ACCOUNT
)

Type of client connection.

View Source
const (
	// ClientProtoZero is the original Client protocol from 2009.
	// http://nats.io/documentation/internals/nats-protocol/
	ClientProtoZero = iota
	// ClientProtoInfo signals a client can receive more then the original INFO block.
	// This can be used to update clients on other cluster members, etc.
	ClientProtoInfo
)
View Source
const (
	ClientClosed = ClosedState(iota + 1)
	AuthenticationTimeout
	AuthenticationViolation
	TLSHandshakeError
	SlowConsumerPendingBytes
	SlowConsumerWriteDeadline
	WriteError
	ReadError
	ParseError
	StaleConnection
	ProtocolViolation
	BadClientProtocolVersion
	WrongPort
	MaxAccountConnectionsExceeded
	MaxConnectionsExceeded
	MaxPayloadExceeded
	MaxControlLineExceeded
	MaxSubscriptionsExceeded
	DuplicateRoute
	RouteRemoved
	ServerShutdown
	AuthenticationExpired
	WrongGateway
	MissingAccount
	Revocation
	InternalClient
	MsgHeaderViolation
	NoRespondersRequiresHeaders
	ClusterNameConflict
)
View Source
const (
	CommandStop   = Command("stop")
	CommandQuit   = Command("quit")
	CommandReopen = Command("reopen")
	CommandReload = Command("reload")
)

Valid Command values.

View Source
const (
	// VERSION is the current version for the server.
	VERSION = "2.2.0-beta.23"

	// PROTO is the currently supported protocol.
	// 0 was the original
	// 1 maintains proto 0, adds echo abilities for CONNECT from the client. Clients
	// should not send echo unless proto in INFO is >= 1.
	PROTO = 1

	// DEFAULT_PORT is the default port for client connections.
	DEFAULT_PORT = 4222

	// RANDOM_PORT is the value for port that, when supplied, will cause the
	// server to listen on a randomly-chosen available port. The resolved port
	// is available via the Addr() method.
	RANDOM_PORT = -1

	// DEFAULT_HOST defaults to all interfaces.
	DEFAULT_HOST = "0.0.0.0"

	// MAX_CONTROL_LINE_SIZE is the maximum allowed protocol control line size.
	// 4k should be plenty since payloads sans connect/info string are separate.
	MAX_CONTROL_LINE_SIZE = 4096

	// MAX_PAYLOAD_SIZE is the maximum allowed payload size. Should be using
	// something different if > 1MB payloads are needed.
	MAX_PAYLOAD_SIZE = (1024 * 1024)

	// MAX_PENDING_SIZE is the maximum outbound pending bytes per client.
	MAX_PENDING_SIZE = (64 * 1024 * 1024)

	// DEFAULT_MAX_CONNECTIONS is the default maximum connections allowed.
	DEFAULT_MAX_CONNECTIONS = (64 * 1024)

	// TLS_TIMEOUT is the TLS wait time.
	TLS_TIMEOUT = 500 * time.Millisecond

	// AUTH_TIMEOUT is the authorization wait time.
	AUTH_TIMEOUT = 2 * TLS_TIMEOUT

	// DEFAULT_PING_INTERVAL is how often pings are sent to clients and routes.
	DEFAULT_PING_INTERVAL = 2 * time.Minute

	DEFAULT_SCTP_PING_INTERVAL = 10 * time.Second

	// DEFAULT_PING_MAX_OUT is maximum allowed pings outstanding before disconnect.
	DEFAULT_PING_MAX_OUT = 2

	// CR_LF string
	CR_LF = "\r\n"

	// LEN_CR_LF hold onto the computed size.
	LEN_CR_LF = len(CR_LF)

	// DEFAULT_FLUSH_DEADLINE is the write/flush deadlines.
	DEFAULT_FLUSH_DEADLINE = 10 * time.Second

	// DEFAULT_HTTP_PORT is the default monitoring port.
	DEFAULT_HTTP_PORT = 8222

	// DEFAULT_HTTP_BASE_PATH is the default base path for monitoring.
	DEFAULT_HTTP_BASE_PATH = "/"

	// ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors.
	ACCEPT_MIN_SLEEP = 10 * time.Millisecond

	// ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors
	ACCEPT_MAX_SLEEP = 1 * time.Second

	// DEFAULT_ROUTE_CONNECT Route solicitation intervals.
	DEFAULT_ROUTE_CONNECT = 1 * time.Second

	// DEFAULT_ROUTE_RECONNECT Route reconnect intervals.
	DEFAULT_ROUTE_RECONNECT = 1 * time.Second

	// DEFAULT_ROUTE_DIAL Route dial timeout.
	DEFAULT_ROUTE_DIAL = 1 * time.Second

	// DEFAULT_LEAF_NODE_RECONNECT LeafNode reconnect interval.
	DEFAULT_LEAF_NODE_RECONNECT = time.Second

	// DEFAULT_LEAF_TLS_TIMEOUT TLS timeout for LeafNodes
	DEFAULT_LEAF_TLS_TIMEOUT = 2 * time.Second

	// PROTO_SNIPPET_SIZE is the default size of proto to print on parse errors.
	PROTO_SNIPPET_SIZE = 32

	// MAX_MSG_ARGS Maximum possible number of arguments from MSG proto.
	MAX_MSG_ARGS = 4

	// MAX_RMSG_ARGS Maximum possible number of arguments from RMSG proto.
	MAX_RMSG_ARGS = 6

	// MAX_HMSG_ARGS Maximum possible number of arguments from HMSG proto.
	MAX_HMSG_ARGS = 7

	// MAX_PUB_ARGS Maximum possible number of arguments from PUB proto.
	MAX_PUB_ARGS = 3

	// MAX_HPUB_ARGS Maximum possible number of arguments from HPUB proto.
	MAX_HPUB_ARGS = 4

	// DEFAULT_MAX_CLOSED_CLIENTS is the maximum number of closed connections we hold onto.
	DEFAULT_MAX_CLOSED_CLIENTS = 10000

	// DEFAULT_LAME_DUCK_DURATION is the time in which the server spreads
	// the closing of clients when signaled to go in lame duck mode.
	DEFAULT_LAME_DUCK_DURATION = 2 * time.Minute

	// DEFAULT_LAME_DUCK_GRACE_PERIOD is the duration the server waits, after entering
	// lame duck mode, before starting closing client connections.
	DEFAULT_LAME_DUCK_GRACE_PERIOD = 10 * time.Second

	// DEFAULT_LEAFNODE_INFO_WAIT Route dial timeout.
	DEFAULT_LEAFNODE_INFO_WAIT = 1 * time.Second

	// DEFAULT_LEAFNODE_PORT is the default port for remote leafnode connections.
	DEFAULT_LEAFNODE_PORT = 7422

	// DEFAULT_CONNECT_ERROR_REPORTS is the number of attempts at which a
	// repeated failed route, gateway or leaf node connection is reported.
	// This is used for initial connection, that is, when the server has
	// never had a connection to the given endpoint. Once connected, and
	// if a disconnect occurs, DEFAULT_RECONNECT_ERROR_REPORTS is used
	// instead.
	// The default is to report every 3600 attempts (roughly every hour).
	DEFAULT_CONNECT_ERROR_REPORTS = 3600

	// DEFAULT_RECONNECT_ERROR_REPORTS is the default number of failed
	// attempt to reconnect a route, gateway or leaf node connection.
	// The default is to report every attempt.
	DEFAULT_RECONNECT_ERROR_REPORTS = 1

	// DEFAULT_RTT_MEASUREMENT_INTERVAL is how often we want to measure RTT from
	// this server to clients, routes, gateways or leafnode connections.
	DEFAULT_RTT_MEASUREMENT_INTERVAL = time.Hour

	// DEFAULT_ALLOW_RESPONSE_MAX_MSGS is the default number of responses allowed
	// for a reply subject.
	DEFAULT_ALLOW_RESPONSE_MAX_MSGS = 1

	// DEFAULT_ALLOW_RESPONSE_EXPIRATION is the default time allowed for a given
	// dynamic response permission.
	DEFAULT_ALLOW_RESPONSE_EXPIRATION = 2 * time.Minute

	// DEFAULT_SERVICE_EXPORT_RESPONSE_THRESHOLD is the default time that the system will
	// expect a service export response to be delivered. This is used in corner cases for
	// time based cleanup of reverse mapping structures.
	DEFAULT_SERVICE_EXPORT_RESPONSE_THRESHOLD = 2 * time.Minute

	// DEFAULT_SERVICE_LATENCY_SAMPLING is the default sampling rate for service
	// latency metrics
	DEFAULT_SERVICE_LATENCY_SAMPLING = 100

	// DEFAULT_SYSTEM_ACCOUNT
	DEFAULT_SYSTEM_ACCOUNT = "$SYS"

	// DEFAULT GLOBAL_ACCOUNT
	DEFAULT_GLOBAL_ACCOUNT = "$G"
)
View Source
const (
	// JsAckWaitDefault is the default AckWait, only applicable on explicit ack policy observables.
	JsAckWaitDefault = 30 * time.Second
	// JsDeleteWaitTimeDefault is the default amount of time we will wait for non-durable
	// observables to be in an inactive state before deleting them.
	JsDeleteWaitTimeDefault = 5 * time.Second
)
View Source
const (

	// Metafiles for streams and consumers.
	JetStreamMetaFile    = "meta.inf"
	JetStreamMetaFileSum = "meta.sum"
)
View Source
const (
	// JetStreamStoreDir is the prefix we use.
	JetStreamStoreDir = "jetstream"
	// JetStreamMaxStoreDefault is the default disk storage limit. 1TB
	JetStreamMaxStoreDefault = 1024 * 1024 * 1024 * 1024
	// JetStreamMaxMemDefault is only used when we can't determine system memory. 256MB
	JetStreamMaxMemDefault = 1024 * 1024 * 256
)
View Source
const (
	// JSApiInfo is for obtaining general information about JetStream for this account.
	// Will return JSON response.
	JSApiAccountInfo = "$JS.API.INFO"

	// JSApiTemplateCreate is the endpoint to create new stream templates.
	// Will return JSON response.
	JSApiTemplateCreate  = "$JS.API.STREAM.TEMPLATE.CREATE.*"
	JSApiTemplateCreateT = "$JS.API.STREAM.TEMPLATE.CREATE.%s"

	// JSApiTemplates is the endpoint to list all stream template names for this account.
	// Will return JSON response.
	JSApiTemplates = "$JS.API.STREAM.TEMPLATE.NAMES"

	// JSApiTemplateInfo is for obtaining general information about a named stream template.
	// Will return JSON response.
	JSApiTemplateInfo  = "$JS.API.STREAM.TEMPLATE.INFO.*"
	JSApiTemplateInfoT = "$JS.API.STREAM.TEMPLATE.INFO.%s"

	// JSApiTemplateDelete is the endpoint to delete stream templates.
	// Will return JSON response.
	JSApiTemplateDelete  = "$JS.API.STREAM.TEMPLATE.DELETE.*"
	JSApiTemplateDeleteT = "$JS.API.STREAM.TEMPLATE.DELETE.%s"

	// JSApiStreamCreate is the endpoint to create new streams.
	// Will return JSON response.
	JSApiStreamCreate  = "$JS.API.STREAM.CREATE.*"
	JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"

	// JSApiStreamUpdate is the endpoint to update existing streams.
	// Will return JSON response.
	JSApiStreamUpdate  = "$JS.API.STREAM.UPDATE.*"
	JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"

	// JSApiStreams is the endpoint to list all stream names for this account.
	// Will return JSON response.
	JSApiStreams = "$JS.API.STREAM.NAMES"
	// JSApiStreamList is the endpoint that will return all detailed stream information
	JSApiStreamList = "$JS.API.STREAM.LIST"

	// JSApiStreamInfo is for obtaining general information about a named stream.
	// Will return JSON response.
	JSApiStreamInfo  = "$JS.API.STREAM.INFO.*"
	JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"

	// JSApiStreamDelete is the endpoint to delete streams.
	// Will return JSON response.
	JSApiStreamDelete  = "$JS.API.STREAM.DELETE.*"
	JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"

	// JSApiPurgeStream is the endpoint to purge streams.
	// Will return JSON response.
	JSApiStreamPurge  = "$JS.API.STREAM.PURGE.*"
	JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"

	// JSApiStreamSnapshot is the endpoint to snapshot streams.
	// Will return a stream of chunks with a nil chunk as EOF to
	// the deliver subject. Caller should respond to each chunk
	// with a nil body response for ack flow.
	JSApiStreamSnapshot  = "$JS.API.STREAM.SNAPSHOT.*"
	JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"

	// JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
	// Caller should resond to each chunk with a nil body response.
	JSApiStreamRestore  = "$JS.API.STREAM.RESTORE.*"
	JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"

	// JSApiDeleteMsg is the endpoint to delete messages from a stream.
	// Will return JSON response.
	JSApiMsgDelete  = "$JS.API.STREAM.MSG.DELETE.*"
	JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"

	// JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
	// Will return JSON response.
	JSApiMsgGet  = "$JS.API.STREAM.MSG.GET.*"
	JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"

	// JSApiConsumerCreate is the endpoint to create ephemeral consumers for streams.
	// Will return JSON response.
	JSApiConsumerCreate  = "$JS.API.CONSUMER.CREATE.*"
	JSApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.%s"

	// JSApiDurableCreate is the endpoint to create ephemeral consumers for streams.
	// You need to include the stream and consumer name in the subject.
	JSApiDurableCreate  = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
	JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"

	// JSApiConsumers is the endpoint to list all consumer names for the stream.
	// Will return JSON response.
	JSApiConsumers  = "$JS.API.CONSUMER.NAMES.*"
	JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"

	// JSApiConsumerList is the endpoint that will return all detailed consumer information
	JSApiConsumerList  = "$JS.API.CONSUMER.LIST.*"
	JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"

	// JSApiConsumerInfo is for obtaining general information about a consumer.
	// Will return JSON response.
	JSApiConsumerInfo  = "$JS.API.CONSUMER.INFO.*.*"
	JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"

	// JSApiDeleteConsumer is the endpoint to delete consumers.
	// Will return JSON response.
	JSApiConsumerDelete  = "$JS.API.CONSUMER.DELETE.*.*"
	JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"

	// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
	JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"

	// JSAdvisoryPrefix is a prefix for all JetStream advisories.
	JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"

	// JSMetricPrefix is a prefix for all JetStream metrics.
	JSMetricPrefix = "$JS.EVENT.METRIC"

	// JSMetricConsumerAckPre is a metric containing ack latency.
	JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"

	// JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
	JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"

	// JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
	JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"

	// JSAdvisoryStreamCreatedPre notification that a stream was created
	JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"

	// JSAdvisoryStreamDeletedPre notification that a stream was deleted
	JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"

	// JSAdvisoryStreamUpdatedPre notification that a stream was updated
	JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"

	// JSAdvisoryConsumerCreatedPre notification that a template created
	JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"

	// JSAdvisoryConsumerDeletedPre notification that a template deleted
	JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"

	// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created
	JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"

	// JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed
	JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"

	// JSAdvisoryStreamRestoreCreatePre notification that a restore was start
	JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"

	// JSAdvisoryStreamRestoreCompletePre notification that a restore was completed
	JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"

	// JSAuditAdvisory is a notification about JetStream API access.
	// FIXME - Add in details about who..
	JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
)

Request API subjects for JetStream.

View Source
const (
	// ConnOpen filters on open clients.
	ConnOpen = ConnState(iota)
	// ConnClosed filters on closed clients.
	ConnClosed
	// ConnAll returns all clients.
	ConnAll
)
View Source
const (
	OP_START parserState = iota
	OP_PLUS
	OP_PLUS_O
	OP_PLUS_OK
	OP_MINUS
	OP_MINUS_E
	OP_MINUS_ER
	OP_MINUS_ERR
	OP_MINUS_ERR_SPC
	MINUS_ERR_ARG
	OP_C
	OP_CO
	OP_CON
	OP_CONN
	OP_CONNE
	OP_CONNEC
	OP_CONNECT
	CONNECT_ARG
	OP_H
	OP_HP
	OP_HPU
	OP_HPUB
	OP_HPUB_SPC
	HPUB_ARG
	OP_HM
	OP_HMS
	OP_HMSG
	OP_HMSG_SPC
	HMSG_ARG
	OP_P
	OP_PU
	OP_PUB
	OP_PUB_SPC
	PUB_ARG
	OP_PI
	OP_PIN
	OP_PING
	OP_PO
	OP_PON
	OP_PONG
	MSG_PAYLOAD
	MSG_END_R
	MSG_END_N
	OP_S
	OP_SU
	OP_SUB
	OP_SUB_SPC
	SUB_ARG
	OP_A
	OP_ASUB
	OP_ASUB_SPC
	ASUB_ARG
	OP_AUSUB
	OP_AUSUB_SPC
	AUSUB_ARG
	OP_L
	OP_LS
	OP_R
	OP_RS
	OP_U
	OP_UN
	OP_UNS
	OP_UNSU
	OP_UNSUB
	OP_UNSUB_SPC
	UNSUB_ARG
	OP_M
	OP_MS
	OP_MSG
	OP_MSG_SPC
	MSG_ARG
	OP_I
	OP_IN
	OP_INF
	OP_INFO
	INFO_ARG
)

Parser constants

View Source
const (
	// RouteProtoZero is the original Route protocol from 2009.
	// http://nats.io/documentation/internals/nats-protocol/
	RouteProtoZero = iota
	// RouteProtoInfo signals a route can receive more then the original INFO block.
	// This can be used to update remote cluster permissions, etc...
	RouteProtoInfo
	// RouteProtoV2 is the new route/cluster protocol that provides account support.
	RouteProtoV2
)
View Source
const (
	ConProto  = "CONNECT %s" + _CRLF_
	InfoProto = "INFO %s" + _CRLF_
)

Route protocol constants

View Source
const (
	RootPath     = "/"
	VarzPath     = "/varz"
	ConnzPath    = "/connz"
	RoutezPath   = "/routez"
	GatewayzPath = "/gatewayz"
	LeafzPath    = "/leafz"
	SubszPath    = "/subsz"
	StackszPath  = "/stacksz"
)

HTTP endpoints

View Source
const (
	// DiscardOld will remove older messages to return to the limits.
	DiscardOld = iota
	//DiscardNew will error on a StoreMsg call
	DiscardNew
)
View Source
const (
	StreamDefaultReplicas = 1
	StreamMaxReplicas     = 7
)

Replicas Range

View Source
const ConnectEventMsgType = "io.nats.server.advisory.v1.client_connect"

ConnectEventMsgType is the schema type for ConnectEventMsg

View Source
const DefaultConnListSize = 1024

DefaultConnListSize is the default size of the connection list.

View Source
const DefaultSubListSize = 1024

DefaultSubListSize is the default size of the subscriptions list.

View Source
const DisconnectEventMsgType = "io.nats.server.advisory.v1.client_disconnect"

DisconnectEventMsgType is the schema type for DisconnectEventMsg

View Source
const (
	InboxPrefix = "$SYS._INBOX."
)

Copied from go client. We could use serviceReply here instead to save some code. I prefer these semantics for the moment, when tracing you know what this is.

View Source
const JSAPIAuditType = "io.nats.jetstream.advisory.v1.api_audit"
View Source
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
View Source
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
View Source
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
View Source
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
View Source
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
View Source
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
View Source
const JSApiListLimit = 256
View Source
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
View Source
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
View Source
const JSApiNamesLimit = 1024

Maximum entries we will return for streams or consumers lists. TODO(dlc) - with header or request support could request chunked response.

View Source
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
View Source
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
View Source
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
View Source
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
View Source
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
View Source
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
View Source
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
View Source
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
View Source
const JSApiStreamTemplateCreateResponseType = "io.nats.jetstream.api.v1.stream_template_create_response"
View Source
const JSApiStreamTemplateDeleteResponseType = "io.nats.jetstream.api.v1.stream_template_delete_response"
View Source
const JSApiStreamTemplateInfoResponseType = "io.nats.jetstream.api.v1.stream_template_info_response"
View Source
const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response"
View Source
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
View Source
const JSConsumerAckMetricType = "io.nats.jetstream.metric.v1.consumer_ack"

JSConsumerAckMetricType is the schema type for JSConsumerAckMetricType

View Source
const JSConsumerActionAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_action"
View Source
const JSConsumerDeliveryExceededAdvisoryType = "io.nats.jetstream.advisory.v1.max_deliver"

JSConsumerDeliveryExceededAdvisoryType is the schema type for JSConsumerDeliveryExceededAdvisory

View Source
const JSConsumerDeliveryTerminatedAdvisoryType = "io.nats.jetstream.advisory.v1.terminated"

JSConsumerDeliveryTerminatedAdvisoryType is the schema type for JSConsumerDeliveryTerminatedAdvisory

View Source
const JSMaxNameLen = 256

Maximum name lengths for streams, consumers and templates.

View Source
const JSPubId = "Msg-Id"

JSPubId is used for identifying published messages and performing de-duplication.

View Source
const JSRestoreCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.restore_complete"

JSRestoreCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory

View Source
const JSRestoreCreateAdvisoryType = "io.nats.jetstream.advisory.v1.restore_create"

JSRestoreCreateAdvisory is the schema type for JSSnapshotCreateAdvisory

View Source
const JSSnapshotCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.snapshot_complete"

JSSnapshotCompleteAdvisoryType is the schema type for JSSnapshotCreateAdvisory

View Source
const JSSnapshotCreatedAdvisoryType = "io.nats.jetstream.advisory.v1.snapshot_create"

JSSnapshotCreatedAdvisoryType is the schema type for JSSnapshotCreateAdvisory

View Source
const JSStreamActionAdvisoryType = "io.nats.jetstream.advisory.v1.stream_action"
View Source
const OK = "+OK"

OK

View Source
const ServiceLatencyType = "io.nats.server.metric.v1.service_latency"

ServiceLatencyType is the NATS Event Type for ServiceLatency

View Source
const StreamDefaultDuplicatesWindow = 2 * time.Minute

Variables

View Source
var (
	// Ack
	AckAck = []byte("+ACK") // nil or no payload to ack subject also means ACK
	AckOK  = []byte(OK)     // deprecated but +OK meant ack as well.

	// Nack
	AckNak = []byte("-NAK")
	// Progress indicator
	AckProgress = []byte("+WPI")
	// Ack + Deliver the next message(s).
	AckNext = []byte("+NXT")
	// Terminate delivery of the message.
	AckTerm = []byte("+TERM")
)

Ack responses. Note that a nil or no payload is same as AckAck

View Source
var (
	// ErrConnectionClosed represents an error condition on a closed connection.
	ErrConnectionClosed = errors.New("connection closed")

	// ErrAuthentication represents an error condition on failed authentication.
	ErrAuthentication = errors.New("authentication error")

	// ErrAuthTimeout represents an error condition on failed authorization due to timeout.
	ErrAuthTimeout = errors.New("authentication timeout")

	// ErrAuthExpired represents an expired authorization due to timeout.
	ErrAuthExpired = errors.New("authentication expired")

	// ErrMaxPayload represents an error condition when the payload is too big.
	ErrMaxPayload = errors.New("maximum payload exceeded")

	// ErrMaxControlLine represents an error condition when the control line is too big.
	ErrMaxControlLine = errors.New("maximum control line exceeded")

	// ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.>
	ErrReservedPublishSubject = errors.New("reserved internal subject")

	// ErrBadPublishSubject represents an error condition for an invalid publish subject.
	ErrBadPublishSubject = errors.New("invalid publish subject")

	// ErrBadClientProtocol signals a client requested an invalid client protocol.
	ErrBadClientProtocol = errors.New("invalid client protocol")

	// ErrTooManyConnections signals a client that the maximum number of connections supported by the
	// server has been reached.
	ErrTooManyConnections = errors.New("maximum connections exceeded")

	// ErrTooManyAccountConnections signals that an account has reached its maximum number of active
	// connections.
	ErrTooManyAccountConnections = errors.New("maximum account active connections exceeded")

	// ErrTooManySubs signals a client that the maximum number of subscriptions per connection
	// has been reached.
	ErrTooManySubs = errors.New("maximum subscriptions exceeded")

	// ErrClientConnectedToRoutePort represents an error condition when a client
	// attempted to connect to the route listen port.
	ErrClientConnectedToRoutePort = errors.New("attempted to connect to route port")

	// ErrClientConnectedToLeafNodePort represents an error condition when a client
	// attempted to connect to the leaf node listen port.
	ErrClientConnectedToLeafNodePort = errors.New("attempted to connect to leaf node port")

	// ErrConnectedToWrongPort represents an error condition when a connection is attempted
	// to the wrong listen port (for instance a LeafNode to a client port, etc...)
	ErrConnectedToWrongPort = errors.New("attempted to connect to wrong port")

	// ErrAccountExists is returned when an account is attempted to be registered
	// but already exists.
	ErrAccountExists = errors.New("account exists")

	// ErrBadAccount represents a malformed or incorrect account.
	ErrBadAccount = errors.New("bad account")

	// ErrReservedAccount represents a reserved account that can not be created.
	ErrReservedAccount = errors.New("reserved account")

	// ErrMissingAccount is returned when an account does not exist.
	ErrMissingAccount = errors.New("account missing")

	// ErrMissingService is returned when an account does not have an exported service.
	ErrMissingService = errors.New("service missing")

	// ErrBadServiceType is returned when latency tracking is being applied to non-singleton response types.
	ErrBadServiceType = errors.New("bad service response type")

	// ErrBadSampling is returned when the sampling for latency tracking is not 1 >= sample <= 100.
	ErrBadSampling = errors.New("bad sampling percentage, should be 1-100")

	// ErrAccountValidation is returned when an account has failed validation.
	ErrAccountValidation = errors.New("account validation failed")

	// ErrAccountExpired is returned when an account has expired.
	ErrAccountExpired = errors.New("account expired")

	// ErrNoAccountResolver is returned when we attempt an update but do not have an account resolver.
	ErrNoAccountResolver = errors.New("account resolver missing")

	// ErrAccountResolverUpdateTooSoon is returned when we attempt an update too soon to last request.
	ErrAccountResolverUpdateTooSoon = errors.New("account resolver update too soon")

	// ErrAccountResolverSameClaims is returned when same claims have been fetched.
	ErrAccountResolverSameClaims = errors.New("account resolver no new claims")

	// ErrStreamImportAuthorization is returned when a stream import is not authorized.
	ErrStreamImportAuthorization = errors.New("stream import not authorized")

	// ErrStreamImportBadPrefix is returned when a stream import prefix contains wildcards.
	ErrStreamImportBadPrefix = errors.New("stream import prefix can not contain wildcard tokens")

	// ErrStreamImportDuplicate is returned when a stream import is a duplicate of one that already exists.
	ErrStreamImportDuplicate = errors.New("stream import already exists")

	// ErrServiceImportAuthorization is returned when a service import is not authorized.
	ErrServiceImportAuthorization = errors.New("service import not authorized")

	// ErrClientOrRouteConnectedToGatewayPort represents an error condition when
	// a client or route attempted to connect to the Gateway port.
	ErrClientOrRouteConnectedToGatewayPort = errors.New("attempted to connect to gateway port")

	// ErrWrongGateway represents an error condition when a server receives a connect
	// request from a remote Gateway with a destination name that does not match the server's
	// Gateway's name.
	ErrWrongGateway = errors.New("wrong gateway")

	// ErrNoSysAccount is returned when an attempt to publish or subscribe is made
	// when there is no internal system account defined.
	ErrNoSysAccount = errors.New("system account not setup")

	// ErrRevocation is returned when a credential has been revoked.
	ErrRevocation = errors.New("credentials have been revoked")

	// ErrServerNotRunning is used to signal an error that a server is not running.
	ErrServerNotRunning = errors.New("server is not running")

	// ErrBadMsgHeader signals the parser detected a bad message header
	ErrBadMsgHeader = errors.New("bad message header detected")

	// ErrMsgHeadersNotSupported signals the parser detected a message header
	// but they are not supported on this server.
	ErrMsgHeadersNotSupported = errors.New("message headers not supported")

	// ErrNoRespondersRequiresHeaders signals that a client needs to have headers
	// on if they want no responders behavior.
	ErrNoRespondersRequiresHeaders = errors.New("no responders requires headers support")

	// ErrClusterNameConfigConflict signals that the options for cluster name in cluster and gateway are in conflict.
	ErrClusterNameConfigConflict = errors.New("cluster name conflicts between cluster and gateway definitions")

	// ErrClusterNameRemoteConflict signals that a remote server has a different cluster name.
	ErrClusterNameRemoteConflict = errors.New("cluster name from remote server conflicts")

	// ErrMalformedSubject is returned when a subscription is made with a subject that does not conform to subject rules.
	ErrMalformedSubject = errors.New("malformed subject")

	// ErrSubscribePermissionViolation is returned when processing of a subscription fails due to permissions.
	ErrSubscribePermissionViolation = errors.New("subscribe permission viloation")
)
View Source
var (
	// ErrStoreClosed is returned when the store has been closed
	ErrStoreClosed = errors.New("store is closed")
	// ErrStoreMsgNotFound when message was not found but was expected to be.
	ErrStoreMsgNotFound = errors.New("no message found")
	// ErrStoreEOF is returned when message seq is greater than the last sequence.
	ErrStoreEOF = errors.New("stream EOF")
	// ErrMaxMsgs is returned when we have discard new as a policy and we reached
	// the message limit.
	ErrMaxMsgs = errors.New("maximum messages exceeded")
	// ErrMaxBytes is returned when we have discard new as a policy and we reached
	// the bytes limit.
	ErrMaxBytes = errors.New("maximum bytes exceeded")
	// ErrStoreSnapshotInProgress is returned when RemoveMsg or EraseMsg is called
	// while a snapshot is in progress.
	ErrStoreSnapshotInProgress = errors.New("snapshot in progress")
	// ErrMsgTooBig is returned when a message is considered too large.
	ErrMsgTooLarge = errors.New("message to large")
)
View Source
var (
	ErrInvalidSubject    = errors.New("sublist: invalid subject")
	ErrNotFound          = errors.New("sublist: no matches found")
	ErrNilChan           = errors.New("sublist: nil channel")
	ErrAlreadyRegistered = errors.New("sublist: notification already registered")
)

Sublist related errors

View Source
var IsPublicExport = []*Account(nil)

IsPublicExport is a placeholder to denote a public export.

Functions

func CanonicalName

func CanonicalName(name string) string

CanonicalName will replace all token separators '.' with '_'. This can be used when naming streams or consumers with multi-token subjects.

func ErrorIs

func ErrorIs(err, target error) bool

implements: go 1.13 errors.Is(err, target error) bool TODO replace with native code once we no longer support go1.12

func FriendlyBytes

func FriendlyBytes(bytes int64) string

FriendlyBytes returns a string with the given bytes int64 represented as a size, such as 1KB, 10MB, etc...

func GenTLSConfig

func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error)

GenTLSConfig loads TLS related configuration parameters.

func IsValidLiteralSubject

func IsValidLiteralSubject(subject string) bool

IsValidLiteralSubject returns true if a subject is valid and literal (no wildcards), false otherwise

func IsValidPublishSubject

func IsValidPublishSubject(subject string) bool

IsValidPublishSubject returns true if a subject is valid and a literal, false otherwise

func IsValidSubject

func IsValidSubject(subject string) bool

IsValidSubject returns true if a subject is valid, false otherwise

func NewErrorCtx

func NewErrorCtx(err error, format string, args ...interface{}) error

func NoErrOnUnknownFields

func NoErrOnUnknownFields(noError bool)

NoErrOnUnknownFields can be used to change the behavior the processing of a configuration file. By default, an error is reported if unknown fields are found. If `noError` is set to true, no error will be reported if top-level unknown fields are found.

func PrintAndDie

func PrintAndDie(msg string)

PrintAndDie is exported for access in other packages.

func PrintServerAndExit

func PrintServerAndExit()

PrintServerAndExit will print our version and exit.

func PrintTLSHelpAndDie

func PrintTLSHelpAndDie()

PrintTLSHelpAndDie prints TLS usage and exits.

func ProcessCommandLineArgs

func ProcessCommandLineArgs(cmd *flag.FlagSet) (showVersion bool, showHelp bool, err error)

ProcessCommandLineArgs takes the command line arguments validating and setting flags for handling in case any sub command was present.

func ProcessSignal

func ProcessSignal(command Command, pidStr string) error

ProcessSignal sends the given signal command to the given process. If pidStr is empty, this will send the signal to the single running instance of nats-server. If multiple instances are running, it returns an error. This returns an error if the given process is not running or the command is invalid.

func ReadOperatorJWT

func ReadOperatorJWT(jwtfile string) (*jwt.OperatorClaims, error)

ReadOperatorJWT will read a jwt file for an operator claim. This can be a decorated file.

func RemoveSelfReference

func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error)

RemoveSelfReference removes this server from an array of routes

func ResetGatewaysSolicitDelay

func ResetGatewaysSolicitDelay()

ResetGatewaysSolicitDelay resets the initial delay before gateways connections are initiated to its default values. Used by tests.

func ResponseHandler

func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte)

ResponseHandler handles responses for monitoring routes

func RoutesFromStr

func RoutesFromStr(routesStr string) []*url.URL

RoutesFromStr parses route URLs from a string

func Run

func Run(server *Server) error

Run starts the NATS server. This wrapper function allows Windows to add a hook for running NATS as a service.

func SetGatewaysSolicitDelay

func SetGatewaysSolicitDelay(delay time.Duration)

SetGatewaysSolicitDelay sets the initial delay before gateways connections are initiated. Used by tests.

func SetProcessName

func SetProcessName(name string)

SetProcessName allows to change the expected name of the process.

func SubjectsCollide

func SubjectsCollide(subj1, subj2 string) bool

SubjectsCollide will determine if two subjects could both match a single literal subject.

func UnpackIfErrorCtx

func UnpackIfErrorCtx(err error) string

Return Error or, if type is right error and context

Types

type Account

type Account struct {
	Name   string
	Nkey   string
	Issuer string
	// contains filtered or unexported fields
}

Account are subject namespace definitions. By default no messages are shared between accounts. You can share via Exports and Imports of Streams and Services.

func NewAccount

func NewAccount(name string) *Account

NewAccount creates a new unlimited account with the given name.

func (*Account) AddServiceExport

func (a *Account) AddServiceExport(subject string, accounts []*Account) error

AddServiceExport will configure the account with the defined export.

func (*Account) AddServiceExportWithResponse

func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceRespType, accounts []*Account) error

AddServiceExportWithResponse will configure the account with the defined export and response type.

func (*Account) AddServiceImport

func (a *Account) AddServiceImport(destination *Account, from, to string) error

AddServiceImport will add a route to an account to send published messages / requests to the destination account. From is the local subject to map, To is the subject that will appear on the destination account. Destination will need to have an import rule to allow access via addService.

func (*Account) AddServiceImportWithClaim

func (a *Account) AddServiceImportWithClaim(destination *Account, from, to string, imClaim *jwt.Import) error

AddServiceImportWithClaim will add in the service import via the jwt claim.

func (*Account) AddStream

func (a *Account) AddStream(config *StreamConfig) (*Stream, error)

AddStream adds a stream for the given account.

func (*Account) AddStreamExport

func (a *Account) AddStreamExport(subject string, accounts []*Account) error

AddStreamExport will add an export to the account. If accounts is nil it will signify a public export, meaning anyone can impoort.

func (*Account) AddStreamImport

func (a *Account) AddStreamImport(account *Account, from, prefix string) error

AddStreamImport will add in the stream import from a specific account.

func (*Account) AddStreamImportWithClaim

func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string, imClaim *jwt.Import) error

AddStreamImportWithClaim will add in the stream import from a specific account with optional token.

func (*Account) AddStreamTemplate

func (a *Account) AddStreamTemplate(tc *StreamTemplateConfig) (*StreamTemplate, error)

AddStreamTemplate will add a stream template to this account that allows auto-creation of streams.

func (*Account) AddStreamWithStore

func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreConfig) (*Stream, error)

AddStreamWithStore adds a stream for the given account with custome store config options.

func (*Account) DeleteStreamTemplate

func (a *Account) DeleteStreamTemplate(name string) error

func (*Account) DisableJetStream

func (a *Account) DisableJetStream() error

DisableJetStream will disable JetStream for this account.

func (*Account) EnableJetStream

func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error

EnableJetStream will enable JetStream on this account with the defined limits. This is a helper for JetStreamEnableAccount.

func (*Account) GetName

func (a *Account) GetName() string

GetName will return the accounts name.

func (*Account) Interest

func (a *Account) Interest(subject string) int

Interest returns the number of subscriptions for a given subject that match.

func (*Account) IsExpired

func (a *Account) IsExpired() bool

IsExpired returns expiration status.

func (*Account) IsExportService

func (a *Account) IsExportService(service string) bool

IsExportService will indicate if this service exists. Will check wildcard scenarios.

func (*Account) IsExportServiceTracking

func (a *Account) IsExportServiceTracking(service string) bool

IsExportServiceTracking will indicate if given publish subject is an export service with tracking enabled.

func (*Account) JetStreamEnabled

func (a *Account) JetStreamEnabled() bool

JetStreamEnabled is a helper to determine if jetstream is enabled for an account.

func (*Account) JetStreamUsage

func (a *Account) JetStreamUsage() JetStreamAccountStats

JetStreamUsage reports on JetStream usage and limits for an account.

func (*Account) LookupStream

func (a *Account) LookupStream(name string) (*Stream, error)

func (*Account) LookupStreamTemplate

func (a *Account) LookupStreamTemplate(name string) (*StreamTemplate, error)

LookupStreamTemplate looks up the names stream template.

func (*Account) MaxActiveConnections

func (a *Account) MaxActiveConnections() int

MaxActiveConnections return the set limit for the account system wide for total number of active connections.

func (*Account) MaxActiveLeafNodes

func (a *Account) MaxActiveLeafNodes() int

MaxActiveLeafNodes return the set limit for the account system wide for total number of leavenode connections. NOTE: these are tracked separately.

func (*Account) MaxTotalConnectionsReached

func (a *Account) MaxTotalConnectionsReached() bool

MaxTotalConnectionsReached returns if we have reached our limit for number of connections.

func (*Account) MaxTotalLeafNodesReached

func (a *Account) MaxTotalLeafNodesReached() bool

MaxTotalLeafNodesReached returns if we have reached our limit for number of leafnodes.

func (*Account) NumConnections

func (a *Account) NumConnections() int

NumConnections returns active number of clients for this account for all known servers.

func (*Account) NumLeafNodes

func (a *Account) NumLeafNodes() int

NumLeafNodes returns the active number of local and remote leaf node connections.

func (*Account) NumLocalConnections

func (a *Account) NumLocalConnections() int

NumLocalConnections returns active number of clients for this account on this server.

func (*Account) NumPendingAllResponses

func (a *Account) NumPendingAllResponses() int

NumPendingAllResponses return the number of all responses outstanding for service exports.

func (*Account) NumPendingResponses

func (a *Account) NumPendingResponses(filter string) int

NumResponsesPending returns the number of responses outstanding for service exports on this account. An empty filter string returns all responses regardless of which export. If you specify the filter we will only return ones that are for that export. NOTE this is only for what this server is tracking.

func (*Account) NumPendingReverseResponses

func (a *Account) NumPendingReverseResponses() int

NumPendingReverseResponses returns the number of response mappings we have for all outstanding requests for service imports.

func (*Account) NumRemoteConnections

func (a *Account) NumRemoteConnections() int

NumRemoteConnections returns the number of client or leaf connections that are not on this server.

func (*Account) NumRemoteLeafNodes

func (a *Account) NumRemoteLeafNodes() int

NumRemoteLeafNodes returns the active number of remote leaf node connections.

func (*Account) NumServiceImports

func (a *Account) NumServiceImports() int

NumServiceImports returns the number of service imports we have configured.

func (*Account) NumStreams

func (a *Account) NumStreams() int

NumStreams will return how many streams we have.

func (*Account) RestoreStream

func (a *Account) RestoreStream(stream string, r io.Reader) (*Stream, error)

RestoreStream will restore a stream from a snapshot.

func (*Account) RoutedSubs

func (a *Account) RoutedSubs() int

RoutedSubs returns how many subjects we would send across a route when first connected or expressing interest. Local client subs.

func (*Account) ServiceExportResponseThreshold

func (a *Account) ServiceExportResponseThreshold(export string) (time.Duration, error)

ServiceExportResponseThreshold returns the current threshold.

func (*Account) SetServiceExportResponseThreshold

func (a *Account) SetServiceExportResponseThreshold(export string, maxTime time.Duration) error

SetServiceExportResponseThreshold sets the maximum time the system will a response to be delivered from a service export responder.

func (*Account) SetServiceImportSharing

func (a *Account) SetServiceImportSharing(destination *Account, to string, allow bool) error

SetServiceImportSharing will allow sharing of information about requests with the export account. Used for service latency tracking at the moment.

func (*Account) Streams

func (a *Account) Streams() []*Stream

Streams will return all known streams.

func (*Account) SubscriptionInterest

func (a *Account) SubscriptionInterest(subject string) bool

SubscriptionInterest returns true if this account has a matching subscription for the given `subject`. Works only for literal subjects. TODO: Add support for wildcards

func (*Account) Templates

func (a *Account) Templates() []*StreamTemplate

func (*Account) TotalSubs

func (a *Account) TotalSubs() int

TotalSubs returns total number of Subscriptions for this account.

func (*Account) TrackServiceExport

func (a *Account) TrackServiceExport(service, results string) error

TrackServiceExport will enable latency tracking of the named service. Results will be published in this account to the given results subject.

func (*Account) TrackServiceExportWithSampling

func (a *Account) TrackServiceExportWithSampling(service, results string, sampling int) error

TrackServiceExportWithSampling will enable latency tracking of the named service for the given sampling rate (1-100). Results will be published in this account to the given results subject.

func (*Account) UnTrackServiceExport

func (a *Account) UnTrackServiceExport(service string)

UnTrackServiceExport will disable latency tracking of the named service.

func (*Account) UpdateJetStreamLimits

func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error

UpdateJetStreamLimits will update the account limits for a JetStream enabled account.

type AccountGatewayz

type AccountGatewayz struct {
	Name                  string `json:"name"`
	InterestMode          string `json:"interest_mode"`
	NoInterestCount       int    `json:"no_interest_count,omitempty"`
	InterestOnlyThreshold int    `json:"interest_only_threshold,omitempty"`
	TotalSubscriptions    int    `json:"num_subs,omitempty"`
	NumQueueSubscriptions int    `json:"num_queue_subs,omitempty"`
}

AccountGatewayz represents interest mode for this account

type AccountNumConns

type AccountNumConns struct {
	Server     ServerInfo `json:"server"`
	Account    string     `json:"acc"`
	Conns      int        `json:"conns"`
	LeafNodes  int        `json:"leafnodes"`
	TotalConns int        `json:"total_conns"`
}

AccountNumConns is an event that will be sent from a server that is tracking a given account when the number of connections changes. It will also HB updates in the absence of any changes.

type AccountResolver

type AccountResolver interface {
	Fetch(name string) (string, error)
	Store(name, jwt string) error
	IsReadOnly() bool
	Start(server *Server) error
	IsTrackingUpdate() bool
	Reload() error
	Close()
}

AccountResolver interface. This is to fetch Account JWTs by public nkeys

type AckPolicy

type AckPolicy int

AckPolicy determines how the consumer should acknowledge delivered messages.

const (
	// AckNone requires no acks for delivered messages.
	AckNone AckPolicy = iota
	// AckAll when acking a sequence number, this implicitly acks all sequences below this one as well.
	AckAll
	// AckExplicit requires ack or nack for all messages.
	AckExplicit
)

func (AckPolicy) MarshalJSON

func (ap AckPolicy) MarshalJSON() ([]byte, error)

func (AckPolicy) String

func (a AckPolicy) String() string

func (*AckPolicy) UnmarshalJSON

func (ap *AckPolicy) UnmarshalJSON(data []byte) error

type ActionAdvisoryType

type ActionAdvisoryType string

ActionAdvisoryType indicates which action against a stream, consumer or template triggered an advisory

const (
	CreateEvent ActionAdvisoryType = "create"
	DeleteEvent ActionAdvisoryType = "delete"
	ModifyEvent ActionAdvisoryType = "modify"
)

type ApiError

type ApiError struct {
	Code        int    `json:"code"`
	Description string `json:"description,omitempty"`
}

ApiError is included in all responses if there was an error. TODO(dlc) - Move to more generic location.

type ApiPaged

type ApiPaged struct {
	Total  int `json:"total"`
	Offset int `json:"offset"`
	Limit  int `json:"limit"`
}

ApiPaged includes variables used to create paged responses from the JSON API

type ApiPagedRequest

type ApiPagedRequest struct {
	Offset int `json:"offset"`
}

ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged

type ApiResponse

type ApiResponse struct {
	Type  string    `json:"type"`
	Error *ApiError `json:"error,omitempty"`
}

ApiResponse is a standard response from the JetStream JSON API

type Authentication

type Authentication interface {
	// Check if a client is authorized to connect
	Check(c ClientAuthentication) bool
}

Authentication is an interface for implementing authentication

type CacheDirAccResolver

type CacheDirAccResolver struct {
	DirAccResolver
	*Server
	// contains filtered or unexported fields
}

Caching resolver using nats for lookups and making use of a directory for storage

func NewCacheDirAccResolver

func NewCacheDirAccResolver(path string, limit int64, ttl time.Duration) (*CacheDirAccResolver, error)

func (*CacheDirAccResolver) Fetch

func (dr *CacheDirAccResolver) Fetch(name string) (string, error)

func (*CacheDirAccResolver) Reload

func (dr *CacheDirAccResolver) Reload() error

func (*CacheDirAccResolver) Start

func (dr *CacheDirAccResolver) Start(s *Server) error

type ClientAPIAudit

type ClientAPIAudit struct {
	Host     string `json:"host"`
	Port     int    `json:"port"`
	CID      uint64 `json:"cid"`
	Account  string `json:"account"`
	User     string `json:"user,omitempty"`
	Name     string `json:"name,omitempty"`
	Language string `json:"lang,omitempty"`
	Version  string `json:"version,omitempty"`
}

ClientAPIAudit is for identifying a client who initiated an API call to the system.

type ClientAuthentication

type ClientAuthentication interface {
	// Get options associated with a client
	GetOpts() *clientOpts
	// If TLS is enabled, TLS ConnectionState, nil otherwise
	GetTLSConnectionState() *tls.ConnectionState
	// Optionally map a user after auth.
	RegisterUser(*User)
	// RemoteAddress expose the connection information of the client
	RemoteAddress() net.Addr
}

ClientAuthentication is an interface for client authentication

type ClientInfo

type ClientInfo struct {
	Start   time.Time  `json:"start,omitempty"`
	Host    string     `json:"host,omitempty"`
	ID      uint64     `json:"id"`
	Account string     `json:"acc"`
	User    string     `json:"user,omitempty"`
	Name    string     `json:"name,omitempty"`
	Lang    string     `json:"lang,omitempty"`
	Version string     `json:"ver,omitempty"`
	RTT     string     `json:"rtt,omitempty"`
	Server  string     `json:"server,omitempty"`
	Stop    *time.Time `json:"stop,omitempty"`
}

ClientInfo is detailed information about the client forming a connection.

type ClosedState

type ClosedState int

ClosedState is the reason client was closed. This will be passed into calls to clearConnection, but will only be stored in ConnInfo for monitoring.

func (ClosedState) String

func (reason ClosedState) String() string

type ClusterOpts

type ClusterOpts struct {
	Name           string            `json:"-"`
	Host           string            `json:"addr,omitempty"`
	Port           int               `json:"cluster_port,omitempty"`
	Username       string            `json:"-"`
	Password       string            `json:"-"`
	AuthTimeout    float64           `json:"auth_timeout,omitempty"`
	Permissions    *RoutePermissions `json:"-"`
	TLSTimeout     float64           `json:"-"`
	TLSConfig      *tls.Config       `json:"-"`
	TLSMap         bool              `json:"-"`
	ListenStr      string            `json:"-"`
	Advertise      string            `json:"-"`
	NoAdvertise    bool              `json:"-"`
	ConnectRetries int               `json:"-"`
}

ClusterOpts are options for clusters. NOTE: This structure is no longer used for monitoring endpoints and json tags are deprecated and may be removed in the future.

type ClusterOptsVarz

type ClusterOptsVarz struct {
	Name        string   `json:"name,omitempty"`
	Host        string   `json:"addr,omitempty"`
	Port        int      `json:"cluster_port,omitempty"`
	AuthTimeout float64  `json:"auth_timeout,omitempty"`
	URLs        []string `json:"urls,omitempty"`
}

ClusterOptsVarz contains monitoring cluster information

type Command

type Command string

Command is a signal used to control a running nats-server process.

type ConnInfo

type ConnInfo struct {
	Cid            uint64      `json:"cid"`
	IP             string      `json:"ip"`
	Port           int         `json:"port"`
	Start          time.Time   `json:"start"`
	LastActivity   time.Time   `json:"last_activity"`
	Stop           *time.Time  `json:"stop,omitempty"`
	Reason         string      `json:"reason,omitempty"`
	RTT            string      `json:"rtt,omitempty"`
	Uptime         string      `json:"uptime"`
	Idle           string      `json:"idle"`
	Pending        int         `json:"pending_bytes"`
	InMsgs         int64       `json:"in_msgs"`
	OutMsgs        int64       `json:"out_msgs"`
	InBytes        int64       `json:"in_bytes"`
	OutBytes       int64       `json:"out_bytes"`
	NumSubs        uint32      `json:"subscriptions"`
	Name           string      `json:"name,omitempty"`
	Lang           string      `json:"lang,omitempty"`
	Version        string      `json:"version,omitempty"`
	TLSVersion     string      `json:"tls_version,omitempty"`
	TLSCipher      string      `json:"tls_cipher_suite,omitempty"`
	AuthorizedUser string      `json:"authorized_user,omitempty"`
	Account        string      `json:"account,omitempty"`
	Subs           []string    `json:"subscriptions_list,omitempty"`
	SubsDetail     []SubDetail `json:"subscriptions_list_detail,omitempty"`
}

ConnInfo has detailed information on a per connection basis.

type ConnInfos

type ConnInfos []*ConnInfo

ConnInfos represents a connection info list. We use pointers since it will be sorted.

func (ConnInfos) Len

func (cl ConnInfos) Len() int

For sorting Len returns length for sorting.

func (ConnInfos) Swap

func (cl ConnInfos) Swap(i, j int)

Swap will sawap the elements.

type ConnState

type ConnState int

ConnState is for filtering states of connections. We will only have two, open and closed.

type ConnectEventMsg

type ConnectEventMsg struct {
	TypedEvent
	Server ServerInfo `json:"server"`
	Client ClientInfo `json:"client"`
}

ConnectEventMsg is sent when a new connection is made that is part of an account.

type Connz

type Connz struct {
	ID       string      `json:"server_id"`
	Now      time.Time   `json:"now"`
	NumConns int         `json:"num_connections"`
	Total    int         `json:"total"`
	Offset   int         `json:"offset"`
	Limit    int         `json:"limit"`
	Conns    []*ConnInfo `json:"connections"`
}

Connz represents detailed information on current client connections.

type ConnzEventOptions

type ConnzEventOptions struct {
	ConnzOptions
	EventFilterOptions
}

In the context of system events, ConnzEventOptions are options passed to Connz

type ConnzOptions

type ConnzOptions struct {
	// Sort indicates how the results will be sorted. Check SortOpt for possible values.
	// Only the sort by connection ID (ByCid) is ascending, all others are descending.
	Sort SortOpt `json:"sort"`

	// Username indicates if user names should be included in the results.
	Username bool `json:"auth"`

	// Subscriptions indicates if subscriptions should be included in the results.
	Subscriptions bool `json:"subscriptions"`

	// SubscriptionsDetail indicates if subscription details should be included in the results
	SubscriptionsDetail bool `json:"subscriptions_detail"`

	// Offset is used for pagination. Connz() only returns connections starting at this
	// offset from the global results.
	Offset int `json:"offset"`

	// Limit is the maximum number of connections that should be returned by Connz().
	Limit int `json:"limit"`

	// Filter for this explicit client connection.
	CID uint64 `json:"cid"`

	// Filter by connection state.
	State ConnState `json:"state"`

	// Filter by username.
	User string `json:"user"`

	// Filter by account.
	Account string `json:"acc"`
}

ConnzOptions are the options passed to Connz()

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a jetstream consumer.

func (*Consumer) Active

func (o *Consumer) Active() bool

Active indicates if this consumer is still active.

func (*Consumer) Config

func (o *Consumer) Config() ConsumerConfig

Config returns the consumer's configuration.

func (*Consumer) Created

func (o *Consumer) Created() time.Time

Created returns created time.

func (*Consumer) Delete

func (o *Consumer) Delete() error

Delete will delete the consumer for the associated stream and send advisories.

func (*Consumer) Info

func (o *Consumer) Info() *ConsumerInfo

Info returns our current consumer state.

func (*Consumer) Name

func (o *Consumer) Name() string

Name returns the name of this observable.

func (*Consumer) NextSeq

func (o *Consumer) NextSeq() uint64

NextSeq returns the next delivered sequence number for this observable.

func (*Consumer) ReplyInfo

func (o *Consumer) ReplyInfo(reply string) (sseq, dseq, dcount uint64, ts int64)

Grab encoded information in the reply subject for a delivered message.

func (*Consumer) RequestNextMsgSubject

func (o *Consumer) RequestNextMsgSubject() string

RequestNextMsgSubject returns the subject to request the next message when in pull or worker mode. Returns empty otherwise.

func (*Consumer) SeqFromReply

func (o *Consumer) SeqFromReply(reply string) uint64

SeqFromReply will extract a sequence number from a reply subject.

func (*Consumer) SetInActiveDeleteThreshold

func (o *Consumer) SetInActiveDeleteThreshold(dthresh time.Duration) error

SetInActiveDeleteThreshold sets the delete threshold for how long to wait before deleting an inactive ephemeral observable.

func (*Consumer) Stop

func (o *Consumer) Stop() error

Stop will shutdown the consumer for the associated stream.

func (*Consumer) StreamSeqFromReply

func (o *Consumer) StreamSeqFromReply(reply string) uint64

StreamSeqFromReply will extract the stream sequence from the reply subject.

type ConsumerConfig

type ConsumerConfig struct {
	Durable         string        `json:"durable_name,omitempty"`
	DeliverSubject  string        `json:"deliver_subject,omitempty"`
	DeliverPolicy   DeliverPolicy `json:"deliver_policy"`
	OptStartSeq     uint64        `json:"opt_start_seq,omitempty"`
	OptStartTime    *time.Time    `json:"opt_start_time,omitempty"`
	AckPolicy       AckPolicy     `json:"ack_policy"`
	AckWait         time.Duration `json:"ack_wait,omitempty"`
	MaxDeliver      int           `json:"max_deliver,omitempty"`
	FilterSubject   string        `json:"filter_subject,omitempty"`
	ReplayPolicy    ReplayPolicy  `json:"replay_policy"`
	RateLimit       uint64        `json:"rate_limit_bps,omitempty"` // Bits per sec
	SampleFrequency string        `json:"sample_freq,omitempty"`
}

type ConsumerInfo

type ConsumerInfo struct {
	Stream         string         `json:"stream_name"`
	Name           string         `json:"name"`
	Created        time.Time      `json:"created"`
	Config         ConsumerConfig `json:"config"`
	Delivered      SequencePair   `json:"delivered"`
	AckFloor       SequencePair   `json:"ack_floor"`
	NumPending     int            `json:"num_pending"`
	NumRedelivered int            `json:"num_redelivered"`
}

type ConsumerState

type ConsumerState struct {
	// Delivered keeps track of last delivered sequence numbers for both the stream and the consumer.
	Delivered SequencePair `json:"delivered"`
	// AckFloor keeps track of the ack floors for both the stream and the consumer.
	AckFloor SequencePair `json:"ack_floor"`
	// These are both in stream sequence context.
	// Pending is for all messages pending and the timestamp for the delivered time.
	// This will only be present when the AckPolicy is ExplicitAck.
	Pending map[uint64]int64 `json:"pending"`
	// This is for messages that have been redelivered, so count > 1.
	Redelivered map[uint64]uint64 `json:"redelivered"`
}

ConsumerState represents a stored state for a consumer.

type ConsumerStore

type ConsumerStore interface {
	State() (*ConsumerState, error)
	Update(*ConsumerState) error
	Stop() error
	Delete() error
}

ConsumerStore stores state on consumers for streams.

type CreateConsumerRequest

type CreateConsumerRequest struct {
	Stream string         `json:"stream_name"`
	Config ConsumerConfig `json:"config"`
}

type DataStats

type DataStats struct {
	Msgs  int64 `json:"msgs"`
	Bytes int64 `json:"bytes"`
}

DataStats reports how may msg and bytes. Applicable for both sent and received.

type DeliverPolicy

type DeliverPolicy int

DeliverPolicy determines how the consumer should select the first message to deliver.

const (
	// DeliverAll will be the default so can be omitted from the request.
	DeliverAll DeliverPolicy = iota
	// DeliverLast will start the consumer with the last sequence received.
	DeliverLast
	// DeliverNew will only deliver new messages that are sent after the consumer is created.
	DeliverNew
	// DeliverByStartSequence will look for a defined starting sequence to start.
	DeliverByStartSequence
	// DeliverByStartTime will select the first messsage with a timestamp >= to StartTime
	DeliverByStartTime
)

func (DeliverPolicy) MarshalJSON

func (p DeliverPolicy) MarshalJSON() ([]byte, error)

func (DeliverPolicy) String

func (dp DeliverPolicy) String() string

func (*DeliverPolicy) UnmarshalJSON

func (p *DeliverPolicy) UnmarshalJSON(data []byte) error

type DirAccResolver

type DirAccResolver struct {
	*DirJWTStore
	// contains filtered or unexported fields
}

Resolver based on nats for synchronization and backing directory for storage.

func NewDirAccResolver

func NewDirAccResolver(path string, limit int64, syncInterval time.Duration) (*DirAccResolver, error)

func (*DirAccResolver) Fetch

func (dr *DirAccResolver) Fetch(name string) (string, error)

func (*DirAccResolver) IsTrackingUpdate

func (dr *DirAccResolver) IsTrackingUpdate() bool

func (*DirAccResolver) Start

func (dr *DirAccResolver) Start(s *Server) error

func (*DirAccResolver) Store

func (dr *DirAccResolver) Store(name, jwt string) error

type DirJWTStore

type DirJWTStore struct {
	sync.Mutex
	// contains filtered or unexported fields
}

DirJWTStore implements the JWT Store interface, keeping JWTs in an optionally sharded directory structure

func NewDirJWTStore

func NewDirJWTStore(dirPath string, shard bool, create bool) (*DirJWTStore, error)

Creates a directory based jwt store. Operates on files only, does NOT watch directories and files.

func NewExpiringDirJWTStore

func NewExpiringDirJWTStore(dirPath string, shard bool, create bool, expireCheck time.Duration, limit int64,
	evictOnLimit bool, ttl time.Duration, changeNotification JWTChanged) (*DirJWTStore, error)

Creates a directory based jwt store.

When ttl is set deletion of file is based on it and not on the jwt expiration To completely disable expiration (including expiration in jwt) set ttl to max duration time.Duration(math.MaxInt64)

limit defines how many files are allowed at any given time. Set to math.MaxInt64 to disable. evictOnLimit determines the behavior once limit is reached.

true - Evict based on lru strategy
false - return an error

func NewImmutableDirJWTStore

func NewImmutableDirJWTStore(dirPath string, shard bool) (*DirJWTStore, error)

Creates a directory based jwt store. Reads files only, does NOT watch directories and files.

func (*DirJWTStore) Close

func (store *DirJWTStore) Close()

func (*DirJWTStore) Hash

func (store *DirJWTStore) Hash() [sha256.Size]byte

returns a hash representing all indexed jwt

func (*DirJWTStore) IsReadOnly

func (store *DirJWTStore) IsReadOnly() bool

func (*DirJWTStore) LoadAcc

func (store *DirJWTStore) LoadAcc(publicKey string) (string, error)

func (*DirJWTStore) LoadAct

func (store *DirJWTStore) LoadAct(hash string) (string, error)

func (*DirJWTStore) Merge

func (store *DirJWTStore) Merge(pack string) error

Merge takes the JWTs from package and adds them to the store Merge is destructive in the sense that it doesn't check if the JWT is newer or anything like that.

func (*DirJWTStore) Pack

func (store *DirJWTStore) Pack(maxJWTs int) (string, error)

Pack up to maxJWTs into a package

func (*DirJWTStore) PackWalk

func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string)) error

Pack up to maxJWTs into a message and invoke callback with it

func (*DirJWTStore) Reload

func (store *DirJWTStore) Reload() error

func (*DirJWTStore) SaveAcc

func (store *DirJWTStore) SaveAcc(publicKey string, theJWT string) error

func (*DirJWTStore) SaveAct

func (store *DirJWTStore) SaveAct(hash string, theJWT string) error

type DiscardPolicy

type DiscardPolicy int

Discard Policy determines how we proceed when limits of messages or bytes are hit. The default, DicscardOld will remove older messages. DiscardNew will fail to store the new message.

func (DiscardPolicy) MarshalJSON

func (dp DiscardPolicy) MarshalJSON() ([]byte, error)

func (DiscardPolicy) String

func (dp DiscardPolicy) String() string

func (*DiscardPolicy) UnmarshalJSON

func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error

type DisconnectEventMsg

type DisconnectEventMsg struct {
	TypedEvent
	Server   ServerInfo `json:"server"`
	Client   ClientInfo `json:"client"`
	Sent     DataStats  `json:"sent"`
	Received DataStats  `json:"received"`
	Reason   string     `json:"reason"`
}

DisconnectEventMsg is sent when a new connection previously defined from a ConnectEventMsg is closed.

type EventFilterOptions

type EventFilterOptions struct {
	Name    string `json:"server_name,omitempty"` // filter by server name
	Cluster string `json:"cluster,omitempty"`     // filter by cluster name
	Host    string `json:"host,omitempty"`        // filter by host name
}

Common filter options for system requests STATSZ VARZ SUBSZ CONNZ ROUTEZ GATEWAYZ LEAFZ

type FileConsumerInfo

type FileConsumerInfo struct {
	Created time.Time
	Name    string
	ConsumerConfig
}

File ConsumerInfo is used for creating consumer stores.

type FileStoreConfig

type FileStoreConfig struct {
	// Where the parent directory for all storage will be located.
	StoreDir string
	// BlockSize is the file block size. This also represents the maximum overhead size.
	BlockSize uint64
	// ReadCacheExpire is how long with no activity until we expire the read cache.
	ReadCacheExpire time.Duration
	// SyncInterval is how often we sync to disk in the background.
	SyncInterval time.Duration
}

type FileStreamInfo

type FileStreamInfo struct {
	Created time.Time
	StreamConfig
}

FileStreamInfo allows us to remember created time.

type GatewayInterestMode

type GatewayInterestMode byte

GatewayInterestMode represents an account interest mode for a gateway connection

const (
	// optimistic is the default mode where a cluster will send
	// to a gateway unless it is been told that there is no interest
	// (this is for plain subscribers only).
	Optimistic GatewayInterestMode = iota
	// transitioning is when a gateway has to send too many
	// no interest on subjects to the remote and decides that it is
	// now time to move to modeInterestOnly (this is on a per account
	// basis).
	Transitioning
	// interestOnly means that a cluster sends all it subscriptions
	// interest to the gateway, which in return does not send a message
	// unless it knows that there is explicit interest.
	InterestOnly
)

GatewayInterestMode values

func (GatewayInterestMode) String

func (im GatewayInterestMode) String() string

type GatewayOpts

type GatewayOpts struct {
	Name           string               `json:"name"`
	Host           string               `json:"addr,omitempty"`
	Port           int                  `json:"port,omitempty"`
	Username       string               `json:"-"`
	Password       string               `json:"-"`
	AuthTimeout    float64              `json:"auth_timeout,omitempty"`
	TLSConfig      *tls.Config          `json:"-"`
	TLSTimeout     float64              `json:"tls_timeout,omitempty"`
	TLSMap         bool                 `json:"-"`
	Advertise      string               `json:"advertise,omitempty"`
	ConnectRetries int                  `json:"connect_retries,omitempty"`
	Gateways       []*RemoteGatewayOpts `json:"gateways,omitempty"`
	RejectUnknown  bool                 `json:"reject_unknown,omitempty"`
	// contains filtered or unexported fields
}

GatewayOpts are options for gateways. NOTE: This structure is no longer used for monitoring endpoints and json tags are deprecated and may be removed in the future.

type GatewayOptsVarz

type GatewayOptsVarz struct {
	Name           string                  `json:"name,omitempty"`
	Host           string                  `json:"host,omitempty"`
	Port           int                     `json:"port,omitempty"`
	AuthTimeout    float64                 `json:"auth_timeout,omitempty"`
	TLSTimeout     float64                 `json:"tls_timeout,omitempty"`
	Advertise      string                  `json:"advertise,omitempty"`
	ConnectRetries int                     `json:"connect_retries,omitempty"`
	Gateways       []RemoteGatewayOptsVarz `json:"gateways,omitempty"`
	RejectUnknown  bool                    `json:"reject_unknown,omitempty"`
}

GatewayOptsVarz contains monitoring gateway information

type GatewayStat

type GatewayStat struct {
	ID         uint64    `json:"gwid"`
	Name       string    `json:"name"`
	Sent       DataStats `json:"sent"`
	Received   DataStats `json:"received"`
	NumInbound int       `json:"inbound_connections"`
}

GatewayStat holds gateway statistics.

type Gatewayz

type Gatewayz struct {
	ID               string                       `json:"server_id"`
	Now              time.Time                    `json:"now"`
	Name             string                       `json:"name,omitempty"`
	Host             string                       `json:"host,omitempty"`
	Port             int                          `json:"port,omitempty"`
	OutboundGateways map[string]*RemoteGatewayz   `json:"outbound_gateways"`
	InboundGateways  map[string][]*RemoteGatewayz `json:"inbound_gateways"`
}

Gatewayz represents detailed information on Gateways

type GatewayzEventOptions

type GatewayzEventOptions struct {
	GatewayzOptions
	EventFilterOptions
}

In the context of system events, GatewayzEventOptions are options passed to Gatewayz

type GatewayzOptions

type GatewayzOptions struct {
	// Name will output only remote gateways with this name
	Name string `json:"name"`

	// Accounts indicates if accounts with its interest should be included in the results.
	Accounts bool `json:"accounts"`

	// AccountName will limit the list of accounts to that account name (makes Accounts implicit)
	AccountName string `json:"account_name"`
}

GatewayzOptions are the options passed to Gatewayz()

type Info

type Info struct {
	ID                string   `json:"server_id"`
	Name              string   `json:"server_name"`
	Version           string   `json:"version"`
	Proto             int      `json:"proto"`
	GitCommit         string   `json:"git_commit,omitempty"`
	GoVersion         string   `json:"go"`
	Host              string   `json:"host"`
	Port              int      `json:"port"`
	Headers           bool     `json:"headers"`
	AuthRequired      bool     `json:"auth_required,omitempty"`
	TLSRequired       bool     `json:"tls_required,omitempty"`
	TLSVerify         bool     `json:"tls_verify,omitempty"`
	TLSAvailable      bool     `json:"tls_available,omitempty"`
	MaxPayload        int32    `json:"max_payload"`
	JetStream         bool     `json:"jetstream,omitempty"`
	IP                string   `json:"ip,omitempty"`
	CID               uint64   `json:"client_id,omitempty"`
	ClientIP          string   `json:"client_ip,omitempty"`
	Nonce             string   `json:"nonce,omitempty"`
	Cluster           string   `json:"cluster,omitempty"`
	Dynamic           bool     `json:"cluster_dynamic,omitempty"`
	ClientConnectURLs []string `json:"connect_urls,omitempty"`    // Contains URLs a client can connect to.
	WSConnectURLs     []string `json:"ws_connect_urls,omitempty"` // Contains URLs a ws client can connect to.
	LameDuckMode      bool     `json:"ldm,omitempty"`

	// Route Specific
	Import *SubjectPermission `json:"import,omitempty"`
	Export *SubjectPermission `json:"export,omitempty"`
	LNOC   bool               `json:"lnoc,omitempty"`

	// Gateways Specific
	Gateway           string   `json:"gateway,omitempty"`             // Name of the origin Gateway (sent by gateway's INFO)
	GatewayURLs       []string `json:"gateway_urls,omitempty"`        // Gateway URLs in the originating cluster (sent by gateway's INFO)
	GatewayURL        string   `json:"gateway_url,omitempty"`         // Gateway URL on that server (sent by route's INFO)
	GatewayCmd        byte     `json:"gateway_cmd,omitempty"`         // Command code for the receiving server to know what to do
	GatewayCmdPayload []byte   `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
	GatewayNRP        bool     `json:"gateway_nrp,omitempty"`         // Uses new $GNR. prefix for mapped replies

	// LeafNode Specific
	LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.
}

Info is the information sent to clients, routes, gateways, and leaf nodes, to help them understand information about this server.

type JSAPIAudit

type JSAPIAudit struct {
	TypedEvent
	Server   string         `json:"server"`
	Client   ClientAPIAudit `json:"client"`
	Subject  string         `json:"subject"`
	Request  string         `json:"request,omitempty"`
	Response string         `json:"response"`
}

JSAPIAudit is an advisory about administrative actions taken on JetStream

type JSApiAccountInfoResponse

type JSApiAccountInfoResponse struct {
	ApiResponse
	*JetStreamAccountStats
}

JSApiAccountInfoResponse reports back information on jetstream for this account.

type JSApiConsumerCreateResponse

type JSApiConsumerCreateResponse struct {
	ApiResponse
	*ConsumerInfo
}

JSApiConsumerCreateResponse.

type JSApiConsumerDeleteResponse

type JSApiConsumerDeleteResponse struct {
	ApiResponse
	Success bool `json:"success,omitempty"`
}

JSApiConsumerDeleteResponse.

type JSApiConsumerInfoResponse

type JSApiConsumerInfoResponse struct {
	ApiResponse
	*ConsumerInfo
}

JSApiConsumerInfoResponse.

type JSApiConsumerListResponse

type JSApiConsumerListResponse struct {
	ApiResponse
	ApiPaged
	Consumers []*ConsumerInfo `json:"consumers"`
}

JSApiConsumerListResponse.

type JSApiConsumerNamesResponse

type JSApiConsumerNamesResponse struct {
	ApiResponse
	ApiPaged
	Consumers []string `json:"consumers"`
}

JSApiConsumerNamesResponse.

type JSApiConsumersRequest

type JSApiConsumersRequest struct {
	ApiPagedRequest
}

JSApiConsumersRequest

type JSApiMsgDeleteRequest

type JSApiMsgDeleteRequest struct {
	Seq uint64 `json:"seq"`
}

JSApiMsgDeleteRequest delete message request.

type JSApiMsgDeleteResponse

type JSApiMsgDeleteResponse struct {
	ApiResponse
	Success bool `json:"success,omitempty"`
}

JSApiMsgDeleteResponse.

type JSApiMsgGetRequest

type JSApiMsgGetRequest struct {
	Seq uint64 `json:"seq"`
}

JSApiMsgGetRequest get a message request.

type JSApiMsgGetResponse

type JSApiMsgGetResponse struct {
	ApiResponse
	Message *StoredMsg `json:"message,omitempty"`
}

JSApiMsgGetResponse.

type JSApiStreamCreateResponse

type JSApiStreamCreateResponse struct {
	ApiResponse
	*StreamInfo
}

JSApiStreamCreateResponse stream creation.

type JSApiStreamDeleteResponse

type JSApiStreamDeleteResponse struct {
	ApiResponse
	Success bool `json:"success,omitempty"`
}

JSApiStreamDeleteResponse stream removal.

type JSApiStreamInfoResponse

type JSApiStreamInfoResponse struct {
	ApiResponse
	*StreamInfo
}

JSApiStreamInfoResponse.

type JSApiStreamListResponse

type JSApiStreamListResponse struct {
	ApiResponse
	ApiPaged
	Streams []*StreamInfo `json:"streams"`
}

JSApiStreamListResponse list of detailed stream information. A nil request is valid and means all streams.

type JSApiStreamNamesRequest

type JSApiStreamNamesRequest struct {
	ApiPagedRequest
}

type JSApiStreamNamesResponse

type JSApiStreamNamesResponse struct {
	ApiResponse
	ApiPaged
	Streams []string `json:"streams"`
}

JSApiStreamNamesResponse list of streams. A nil request is valid and means all streams.

type JSApiStreamPurgeResponse

type JSApiStreamPurgeResponse struct {
	ApiResponse
	Success bool   `json:"success,omitempty"`
	Purged  uint64 `json:"purged,omitempty"`
}

JSApiStreamPurgeResponse.

type JSApiStreamRestoreResponse

type JSApiStreamRestoreResponse struct {
	ApiResponse
	// Subject to deliver the chunks to for the snapshot restore.
	DeliverSubject string `json:"deliver_subject"`
}

JSApiStreamRestoreResponse is the direct response to the restore request.

type JSApiStreamSnapshotRequest

type JSApiStreamSnapshotRequest struct {
	// Subject to deliver the chunks to for the snapshot.
	DeliverSubject string `json:"deliver_subject"`
	// Do not include consumers in the snapshot.
	NoConsumers bool `json:"no_consumers,omitempty"`
	// Optional chunk size preference.
	// Best to just let server select.
	ChunkSize int `json:"chunk_size,omitempty"`
	// Check all message's checksums prior to snapshot.
	CheckMsgs bool `json:"jsck,omitempty"`
}

type JSApiStreamSnapshotResponse

type JSApiStreamSnapshotResponse struct {
	ApiResponse
	// Estimate of number of blocks for the messages.
	NumBlks int `json:"num_blks"`
	// Block size limit as specified by the stream.
	BlkSize int `json:"blk_size"`
}

JSApiStreamSnapshotResponse is the direct response to the snapshot request.

type JSApiStreamTemplateCreateResponse

type JSApiStreamTemplateCreateResponse struct {
	ApiResponse
	*StreamTemplateInfo
}

JSApiStreamTemplateCreateResponse for creating templates.

type JSApiStreamTemplateDeleteResponse

type JSApiStreamTemplateDeleteResponse struct {
	ApiResponse
	Success bool `json:"success,omitempty"`
}

JSApiStreamTemplateDeleteResponse

type JSApiStreamTemplateInfoResponse

type JSApiStreamTemplateInfoResponse struct {
	ApiResponse
	*StreamTemplateInfo
}

JSApiStreamTemplateInfoResponse for information about stream templates.

type JSApiStreamTemplateNamesResponse

type JSApiStreamTemplateNamesResponse struct {
	ApiResponse
	ApiPaged
	Templates []string `json:"streams"`
}

JSApiStreamTemplateNamesResponse list of templates

type JSApiStreamTemplatesRequest

type JSApiStreamTemplatesRequest struct {
	ApiPagedRequest
}

JSApiStreamTemplatesRequest

type JSApiStreamUpdateResponse

type JSApiStreamUpdateResponse struct {
	ApiResponse
	*StreamInfo
}

JSApiStreamUpdateResponse for updating a stream.

type JSConsumerAckMetric

type JSConsumerAckMetric struct {
	TypedEvent
	Stream      string `json:"stream"`
	Consumer    string `json:"consumer"`
	ConsumerSeq uint64 `json:"consumer_seq"`
	StreamSeq   uint64 `json:"stream_seq"`
	Delay       int64  `json:"ack_time"`
	Deliveries  uint64 `json:"deliveries"`
}

JSConsumerAckMetric is a metric published when a user acknowledges a message, the number of these that will be published is dependent on SampleFrequency

type JSConsumerActionAdvisory

type JSConsumerActionAdvisory struct {
	TypedEvent
	Stream   string             `json:"stream"`
	Consumer string             `json:"consumer"`
	Action   ActionAdvisoryType `json:"action"`
}

JSConsumerActionAdvisory indicates that a consumer was created or deleted

type JSConsumerDeliveryExceededAdvisory

type JSConsumerDeliveryExceededAdvisory struct {
	TypedEvent
	Stream     string `json:"stream"`
	Consumer   string `json:"consumer"`
	StreamSeq  uint64 `json:"stream_seq"`
	Deliveries uint64 `json:"deliveries"`
}

JSConsumerDeliveryExceededAdvisory is an advisory informing that a message hit its MaxDeliver threshold and so might be a candidate for DLQ handling

type JSConsumerDeliveryTerminatedAdvisory

type JSConsumerDeliveryTerminatedAdvisory struct {
	TypedEvent
	Stream      string `json:"stream"`
	Consumer    string `json:"consumer"`
	ConsumerSeq uint64 `json:"consumer_seq"`
	StreamSeq   uint64 `json:"stream_seq"`
	Deliveries  uint64 `json:"deliveries"`
}

JSConsumerDeliveryTerminatedAdvisory is an advisory informing that a message was terminated by the consumer, so might be a candidate for DLQ handling

type JSRestoreCompleteAdvisory

type JSRestoreCompleteAdvisory struct {
	TypedEvent
	Stream string          `json:"stream"`
	Start  time.Time       `json:"start"`
	End    time.Time       `json:"end"`
	Bytes  int64           `json:"bytes"`
	Client *ClientAPIAudit `json:"client"`
}

JSRestoreCompleteAdvisory is an advisory sent after a snapshot is successfully started

type JSRestoreCreateAdvisory

type JSRestoreCreateAdvisory struct {
	TypedEvent
	Stream string          `json:"stream"`
	Client *ClientAPIAudit `json:"client"`
}

JSRestoreCreateAdvisory is an advisory sent after a snapshot is successfully started

type JSSnapshotCompleteAdvisory

type JSSnapshotCompleteAdvisory struct {
	TypedEvent
	Stream string          `json:"stream"`
	Start  time.Time       `json:"start"`
	End    time.Time       `json:"end"`
	Client *ClientAPIAudit `json:"client"`
}

JSSnapshotCompleteAdvisory is an advisory sent after a snapshot is successfully started

type JSSnapshotCreateAdvisory

type JSSnapshotCreateAdvisory struct {
	TypedEvent
	Stream  string          `json:"stream"`
	NumBlks int             `json:"blocks"`
	BlkSize int             `json:"block_size"`
	Client  *ClientAPIAudit `json:"client"`
}

JSSnapshotCreateAdvisory is an advisory sent after a snapshot is successfully started

type JSStreamActionAdvisory

type JSStreamActionAdvisory struct {
	TypedEvent
	Stream   string             `json:"stream"`
	Action   ActionAdvisoryType `json:"action"`
	Template string             `json:"template,omitempty"`
}

JSStreamActionAdvisory indicates that a stream was created, edited or deleted

type JWTChanged

type JWTChanged func(publicKey string)

JWTChanged functions are called when the store file watcher notices a JWT changed

type JetStreamAccountLimits

type JetStreamAccountLimits struct {
	MaxMemory    int64 `json:"max_memory"`
	MaxStore     int64 `json:"max_storage"`
	MaxStreams   int   `json:"max_streams"`
	MaxConsumers int   `json:"max_consumers"`
}

TODO(dlc) - need to track and rollup against server limits, etc.

type JetStreamAccountStats

type JetStreamAccountStats struct {
	Memory  uint64                 `json:"memory"`
	Store   uint64                 `json:"storage"`
	Streams int                    `json:"streams"`
	Limits  JetStreamAccountLimits `json:"limits"`
}

JetStreamAccountStats returns current statistics about the account's JetStream usage.

type JetStreamConfig

type JetStreamConfig struct {
	MaxMemory int64
	MaxStore  int64
	StoreDir  string
}

JetStreamConfig determines this server's configuration. MaxMemory and MaxStore are in bytes.

type JetStreamVarz

type JetStreamVarz struct {
	MaxMemory int64  `json:"max_memory,omitempty"`
	MaxStore  int64  `json:"max_store,omitempty"`
	StoreDir  string `json:"store_dir,omitempty"`
	Accounts  int    `json:"accounts,omitempty"`
}

JetStreamVarz contains basic runtime information about jetstream

type LatencyClient

type LatencyClient struct {
	Account string        `json:"acc"`
	RTT     time.Duration `json:"rtt"`
	Start   time.Time     `json:"start,omitempty"`
	User    string        `json:"user,omitempty"`
	Name    string        `json:"name,omitempty"`
	Lang    string        `json:"lang,omitempty"`
	Version string        `json:"ver,omitempty"`
	IP      string        `json:"ip,omitempty"`
	CID     uint64        `json:"cid,omitempty"`
	Server  string        `json:"server,omitempty"`
}

LatencyClient is the JSON message structure assigned to requestors and responders. Note that for a requestor, the only information shared by default is the RTT used to calculate the total latency. The requestor's account can designate to share the additional information in the service import.

type LeafInfo

type LeafInfo struct {
	Account  string   `json:"account"`
	IP       string   `json:"ip"`
	Port     int      `json:"port"`
	RTT      string   `json:"rtt,omitempty"`
	InMsgs   int64    `json:"in_msgs"`
	OutMsgs  int64    `json:"out_msgs"`
	InBytes  int64    `json:"in_bytes"`
	OutBytes int64    `json:"out_bytes"`
	NumSubs  uint32   `json:"subscriptions"`
	Subs     []string `json:"subscriptions_list,omitempty"`
}

LeafInfo has detailed information on each remote leafnode connection.

type LeafNodeOpts

type LeafNodeOpts struct {
	Host              string        `json:"addr,omitempty"`
	Port              int           `json:"port,omitempty"`
	Username          string        `json:"-"`
	Password          string        `json:"-"`
	Account           string        `json:"-"`
	Users             []*User       `json:"-"`
	AuthTimeout       float64       `json:"auth_timeout,omitempty"`
	TLSConfig         *tls.Config   `json:"-"`
	TLSTimeout        float64       `json:"tls_timeout,omitempty"`
	TLSMap            bool          `json:"-"`
	Advertise         string        `json:"-"`
	NoAdvertise       bool          `json:"-"`
	ReconnectInterval time.Duration `json:"-"`

	// For solicited connections to other clusters/superclusters.
	Remotes []*RemoteLeafOpts `json:"remotes,omitempty"`
	// contains filtered or unexported fields
}

LeafNodeOpts are options for a given server to accept leaf node connections and/or connect to a remote cluster.

type LeafNodeOptsVarz

type LeafNodeOptsVarz struct {
	Host        string               `json:"host,omitempty"`
	Port        int                  `json:"port,omitempty"`
	AuthTimeout float64              `json:"auth_timeout,omitempty"`
	TLSTimeout  float64              `json:"tls_timeout,omitempty"`
	Remotes     []RemoteLeafOptsVarz `json:"remotes,omitempty"`
}

LeafNodeOptsVarz contains monitoring leaf node information

type Leafz

type Leafz struct {
	ID       string      `json:"server_id"`
	Now      time.Time   `json:"now"`
	NumLeafs int         `json:"leafnodes"`
	Leafs    []*LeafInfo `json:"leafs"`
}

Leafz represents detailed information on Leafnodes.

type LeafzEventOptions

type LeafzEventOptions struct {
	LeafzOptions
	EventFilterOptions
}

In the context of system events, LeafzEventOptions are options passed to Leafz

type LeafzOptions

type LeafzOptions struct {
	// Subscriptions indicates that Leafz will return a leafnode's subscriptions
	Subscriptions bool `json:"subscriptions"`
}

LeafzOptions are options passed to Leafz

type Logger

type Logger interface {

	// Log a notice statement
	Noticef(format string, v ...interface{})

	// Log a warning statement
	Warnf(format string, v ...interface{})

	// Log a fatal error
	Fatalf(format string, v ...interface{})

	// Log an error
	Errorf(format string, v ...interface{})

	// Log a debug statement
	Debugf(format string, v ...interface{})

	// Log a trace statement
	Tracef(format string, v ...interface{})
}

Logger interface of the NATS Server

type MemAccResolver

type MemAccResolver struct {
	// contains filtered or unexported fields
}

MemAccResolver is a memory only resolver. Mostly for testing.

func (*MemAccResolver) Close

func (*MemAccResolver) Close()

func (*MemAccResolver) Fetch

func (m *MemAccResolver) Fetch(name string) (string, error)

Fetch will fetch the account jwt claims from the internal sync.Map.

func (*MemAccResolver) IsReadOnly

func (ur *MemAccResolver) IsReadOnly() bool

func (*MemAccResolver) IsTrackingUpdate

func (*MemAccResolver) IsTrackingUpdate() bool

func (*MemAccResolver) Reload

func (*MemAccResolver) Reload() error

func (*MemAccResolver) Start

func (*MemAccResolver) Start(*Server) error

func (*MemAccResolver) Store

func (m *MemAccResolver) Store(name, jwt string) error

Store will store the account jwt claims in the internal sync.Map.

type NkeyUser

type NkeyUser struct {
	Nkey        string       `json:"user"`
	Permissions *Permissions `json:"permissions,omitempty"`
	Account     *Account     `json:"account,omitempty"`
	SigningKey  string       `json:"signing_key,omitempty"`
}

NkeyUser is for multiple nkey based users

type Options

type Options struct {
	ConfigFile            string        `json:"-"`
	ServerName            string        `json:"server_name"`
	Host                  string        `json:"addr"`
	Port                  int           `json:"port"`
	ClientAdvertise       string        `json:"-"`
	Trace                 bool          `json:"-"`
	Debug                 bool          `json:"-"`
	TraceVerbose          bool          `json:"-"`
	NoLog                 bool          `json:"-"`
	NoSigs                bool          `json:"-"`
	NoSublistCache        bool          `json:"-"`
	NoHeaderSupport       bool          `json:"-"`
	DisableShortFirstPing bool          `json:"-"`
	Logtime               bool          `json:"-"`
	MaxConn               int           `json:"max_connections"`
	MaxSubs               int           `json:"max_subscriptions,omitempty"`
	Nkeys                 []*NkeyUser   `json:"-"`
	Users                 []*User       `json:"-"`
	Accounts              []*Account    `json:"-"`
	NoAuthUser            string        `json:"-"`
	SystemAccount         string        `json:"-"`
	NoSystemAccount       bool          `json:"-"`
	AllowNewAccounts      bool          `json:"-"`
	Username              string        `json:"-"`
	Password              string        `json:"-"`
	Authorization         string        `json:"-"`
	PingInterval          time.Duration `json:"ping_interval"`
	MaxPingsOut           int           `json:"ping_max"`
	HTTPHost              string        `json:"http_host"`
	HTTPPort              int           `json:"http_port"`
	HTTPBasePath          string        `json:"http_base_path"`
	HTTPSPort             int           `json:"https_port"`
	AuthTimeout           float64       `json:"auth_timeout"`
	MaxControlLine        int32         `json:"max_control_line"`
	MaxPayload            int32         `json:"max_payload"`
	MaxPending            int64         `json:"max_pending"`
	Cluster               ClusterOpts   `json:"cluster,omitempty"`
	Gateway               GatewayOpts   `json:"gateway,omitempty"`
	LeafNode              LeafNodeOpts  `json:"leaf,omitempty"`
	JetStream             bool          `json:"jetstream"`
	JetStreamMaxMemory    int64         `json:"-"`
	JetStreamMaxStore     int64         `json:"-"`
	StoreDir              string        `json:"-"`
	Websocket             WebsocketOpts `json:"-"`
	ProfPort              int           `json:"-"`
	PidFile               string        `json:"-"`
	PortsFileDir          string        `json:"-"`
	LogFile               string        `json:"-"`
	LogSizeLimit          int64         `json:"-"`
	Syslog                bool          `json:"-"`
	RemoteSyslog          string        `json:"-"`
	Routes                []*url.URL    `json:"-"`
	RoutesStr             string        `json:"-"`
	TLSTimeout            float64       `json:"tls_timeout"`
	TLS                   bool          `json:"-"`
	TLSVerify             bool          `json:"-"`
	TLSMap                bool          `json:"-"`
	TLSCert               string        `json:"-"`
	TLSKey                string        `json:"-"`
	TLSCaCert             string        `json:"-"`
	TLSConfig             *tls.Config   `json:"-"`
	AllowNonTLS           bool          `json:"-"`
	WriteDeadline         time.Duration `json:"-"`
	MaxClosedClients      int           `json:"-"`
	LameDuckDuration      time.Duration `json:"-"`
	LameDuckGracePeriod   time.Duration `json:"-"`

	// MaxTracedMsgLen is the maximum printable length for traced messages.
	MaxTracedMsgLen int `json:"-"`

	// Operating a trusted NATS server
	TrustedKeys              []string              `json:"-"`
	TrustedOperators         []*jwt.OperatorClaims `json:"-"`
	AccountResolver          AccountResolver       `json:"-"`
	AccountResolverTLSConfig *tls.Config           `json:"-"`

	CustomClientAuthentication Authentication `json:"-"`
	CustomRouterAuthentication Authentication `json:"-"`

	// CheckConfig configuration file syntax test was successful and exit.
	CheckConfig bool `json:"-"`

	// ConnectErrorReports specifies the number of failed attempts
	// at which point server should report the failure of an initial
	// connection to a route, gateway or leaf node.
	// See DEFAULT_CONNECT_ERROR_REPORTS for default value.
	ConnectErrorReports int

	// ReconnectErrorReports is similar to ConnectErrorReports except
	// that this applies to reconnect events.
	ReconnectErrorReports int

	// SCTP switch
	Sctp bool
	// contains filtered or unexported fields
}

Options block for nats-server. NOTE: This structure is no longer used for monitoring endpoints and json tags are deprecated and may be removed in the future.

var FlagSnapshot *Options

FlagSnapshot captures the server options as specified by CLI flags at startup. This should not be modified once the server has started.

func ConfigureOptions

func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, printTLSHelp func()) (*Options, error)

ConfigureOptions accepts a flag set and augments it with NATS Server specific flags. On success, an options structure is returned configured based on the selected flags and/or configuration file. The command line options take precedence to the ones in the configuration file.

func MergeOptions

func MergeOptions(fileOpts, flagOpts *Options) *Options

MergeOptions will merge two options giving preference to the flagOpts if the item is present.

func ProcessConfigFile

func ProcessConfigFile(configFile string) (*Options, error)

ProcessConfigFile processes a configuration file. FIXME(dlc): A bit hacky

func (*Options) Clone

func (o *Options) Clone() *Options

Clone performs a deep copy of the Options struct, returning a new clone with all values copied.

func (*Options) ProcessConfigFile

func (o *Options) ProcessConfigFile(configFile string) error

ProcessConfigFile updates the Options structure with options present in the given configuration file. This version is convenient if one wants to set some default options and then override them with what is in the config file. For instance, this version allows you to do something such as:

opts := &Options{Debug: true} opts.ProcessConfigFile(myConfigFile)

If the config file contains "debug: false", after this call, opts.Debug would really be false. It would be impossible to achieve that with the non receiver ProcessConfigFile() version, since one would not know after the call if "debug" was not present or was present but set to false.

type Permissions

type Permissions struct {
	Publish   *SubjectPermission  `json:"publish"`
	Subscribe *SubjectPermission  `json:"subscribe"`
	Response  *ResponsePermission `json:"responses,omitempty"`
}

Permissions are the allowed subjects on a per publish or subscribe basis.

type Ports

type Ports struct {
	Nats       []string `json:"nats,omitempty"`
	Monitoring []string `json:"monitoring,omitempty"`
	Cluster    []string `json:"cluster,omitempty"`
	Profile    []string `json:"profile,omitempty"`
	WebSocket  []string `json:"websocket,omitempty"`
}

Ports describes URLs that the server can be contacted in

type PubAck

type PubAck struct {
	Stream    string `json:"stream"`
	Seq       uint64 `json:"seq"`
	Duplicate bool   `json:"duplicate,omitempty"`
}

PubAck is the detail you get back from a publish to a stream that was successful. e.g. +OK {"stream": "Orders", "seq": 22}

type RemoteGatewayOpts

type RemoteGatewayOpts struct {
	Name       string      `json:"name"`
	TLSConfig  *tls.Config `json:"-"`
	TLSTimeout float64     `json:"tls_timeout,omitempty"`
	URLs       []*url.URL  `json:"urls,omitempty"`
}

RemoteGatewayOpts are options for connecting to a remote gateway NOTE: This structure is no longer used for monitoring endpoints and json tags are deprecated and may be removed in the future.

type RemoteGatewayOptsVarz

type RemoteGatewayOptsVarz struct {
	Name       string   `json:"name"`
	TLSTimeout float64  `json:"tls_timeout,omitempty"`
	URLs       []string `json:"urls,omitempty"`
}

RemoteGatewayOptsVarz contains monitoring remote gateway information

type RemoteGatewayz

type RemoteGatewayz struct {
	IsConfigured bool               `json:"configured"`
	Connection   *ConnInfo          `json:"connection,omitempty"`
	Accounts     []*AccountGatewayz `json:"accounts,omitempty"`
}

RemoteGatewayz represents information about an outbound connection to a gateway

type RemoteLeafOpts

type RemoteLeafOpts struct {
	LocalAccount string      `json:"local_account,omitempty"`
	URLs         []*url.URL  `json:"urls,omitempty"`
	Credentials  string      `json:"-"`
	TLS          bool        `json:"-"`
	TLSConfig    *tls.Config `json:"-"`
	TLSTimeout   float64     `json:"tls_timeout,omitempty"`
	Hub          bool        `json:"hub,omitempty"`
	DenyImports  []string    `json:"-"`
	DenyExports  []string    `json:"-"`
}

RemoteLeafOpts are options for connecting to a remote server as a leaf node.

type RemoteLeafOptsVarz

type RemoteLeafOptsVarz struct {
	LocalAccount string   `json:"local_account,omitempty"`
	TLSTimeout   float64  `json:"tls_timeout,omitempty"`
	URLs         []string `json:"urls,omitempty"`
}

RemoteLeafOptsVarz contains monitoring remote leaf node information

type ReplayPolicy

type ReplayPolicy int

ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.

const (
	// ReplayInstant will replay messages as fast as possible.
	ReplayInstant ReplayPolicy = iota
	// ReplayOriginal will maintain the same timing as the messages were received.
	ReplayOriginal
)

func (ReplayPolicy) MarshalJSON

func (rp ReplayPolicy) MarshalJSON() ([]byte, error)

func (ReplayPolicy) String

func (r ReplayPolicy) String() string

func (*ReplayPolicy) UnmarshalJSON

func (rp *ReplayPolicy) UnmarshalJSON(data []byte) error

type ResponsePermission

type ResponsePermission struct {
	MaxMsgs int           `json:"max"`
	Expires time.Duration `json:"ttl"`
}

ResponsePermission can be used to allow responses to any reply subject that is received on a valid subscription.

type RetentionPolicy

type RetentionPolicy int

RetentionPolicy determines how messages in a set are retained.

const (
	// LimitsPolicy (default) means that messages are retained until any given limit is reached.
	// This could be one of MaxMsgs, MaxBytes, or MaxAge.
	LimitsPolicy RetentionPolicy = iota
	// InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
	InterestPolicy
	// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
	WorkQueuePolicy
)

func (RetentionPolicy) MarshalJSON

func (rp RetentionPolicy) MarshalJSON() ([]byte, error)

func (RetentionPolicy) String

func (rp RetentionPolicy) String() string

func (*RetentionPolicy) UnmarshalJSON

func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error

type RouteInfo

type RouteInfo struct {
	Rid          uint64             `json:"rid"`
	RemoteID     string             `json:"remote_id"`
	DidSolicit   bool               `json:"did_solicit"`
	IsConfigured bool               `json:"is_configured"`
	IP           string             `json:"ip"`
	Port         int                `json:"port"`
	Import       *SubjectPermission `json:"import,omitempty"`
	Export       *SubjectPermission `json:"export,omitempty"`
	Pending      int                `json:"pending_size"`
	RTT          string             `json:"rtt,omitempty"`
	InMsgs       int64              `json:"in_msgs"`
	OutMsgs      int64              `json:"out_msgs"`
	InBytes      int64              `json:"in_bytes"`
	OutBytes     int64              `json:"out_bytes"`
	NumSubs      uint32             `json:"subscriptions"`
	Subs         []string           `json:"subscriptions_list,omitempty"`
	SubsDetail   []SubDetail        `json:"subscriptions_list_detail,omitempty"`
}

RouteInfo has detailed information on a per connection basis.

type RoutePermissions

type RoutePermissions struct {
	Import *SubjectPermission `json:"import"`
	Export *SubjectPermission `json:"export"`
}

RoutePermissions are similar to user permissions but describe what a server can import/export from and to another server.

type RouteStat

type RouteStat struct {
	ID       uint64    `json:"rid"`
	Name     string    `json:"name,omitempty"`
	Sent     DataStats `json:"sent"`
	Received DataStats `json:"received"`
	Pending  int       `json:"pending"`
}

RouteStat holds route statistics.

type RouteType

type RouteType int

RouteType designates the router type

const (
	// This route we learned from speaking to other routes.
	Implicit RouteType = iota
	// This route was explicitly configured.
	Explicit
)

Type of Route

type Routez

type Routez struct {
	ID        string             `json:"server_id"`
	Now       time.Time          `json:"now"`
	Import    *SubjectPermission `json:"import,omitempty"`
	Export    *SubjectPermission `json:"export,omitempty"`
	NumRoutes int                `json:"num_routes"`
	Routes    []*RouteInfo       `json:"routes"`
}

Routez represents detailed information on current client connections.

type RoutezEventOptions

type RoutezEventOptions struct {
	RoutezOptions
	EventFilterOptions
}

In the context of system events, RoutezEventOptions are options passed to Routez

type RoutezOptions

type RoutezOptions struct {
	// Subscriptions indicates that Routez will return a route's subscriptions
	Subscriptions bool `json:"subscriptions"`
	// SubscriptionsDetail indicates if subscription details should be included in the results
	SubscriptionsDetail bool `json:"subscriptions_detail"`
}

RoutezOptions are options passed to Routez

type SequencePair

type SequencePair struct {
	ConsumerSeq uint64 `json:"consumer_seq"`
	StreamSeq   uint64 `json:"stream_seq"`
}

SequencePair has both the consumer and the stream sequence. They point to same message.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is our main struct.

func New

func New(opts *Options) *Server

New will setup a new server struct after parsing the options. DEPRECATED: Use NewServer(opts)

func NewServer

func NewServer(opts *Options) (*Server, error)

NewServer will setup a new server struct after parsing the options. Could return an error if options can not be validated.

func (*Server) AcceptLoop

func (s *Server) AcceptLoop(clr chan struct{})

AcceptLoop is exported for easier testing.

func (*Server) AccountResolver

func (s *Server) AccountResolver() AccountResolver

AccountResolver returns the registered account resolver.

func (*Server) Addr

func (s *Server) Addr() net.Addr

Addr will return the net.Addr object for the current listener.

func (*Server) ClientURL

func (s *Server) ClientURL() string

ClientURL returns the URL used to connect clients. Helpful in testing when we designate a random client port (-1).

func (*Server) ClusterAddr

func (s *Server) ClusterAddr() *net.TCPAddr

ClusterAddr returns the net.Addr object for the route listener.

func (*Server) ClusterName

func (s *Server) ClusterName() string

clusterName returns our cluster name which could be dynamic.

func (*Server) ConfigTime

func (s *Server) ConfigTime() time.Time

ConfigTime will report the last time the server configuration was loaded.

func (*Server) ConfigureLogger

func (s *Server) ConfigureLogger()

ConfigureLogger configures and sets the logger for the server.

func (*Server) Connz

func (s *Server) Connz(opts *ConnzOptions) (*Connz, error)

Connz returns a Connz struct containing information about connections.

func (*Server) Debugf

func (s *Server) Debugf(format string, v ...interface{})

Debugf logs a debug statement

func (*Server) EnableJetStream

func (s *Server) EnableJetStream(config *JetStreamConfig) error

EnableJetStream will enable JetStream support on this server with the given configuration. A nil configuration will dynamically choose the limits and temporary file storage directory. If this server is part of a cluster, a system account will need to be defined.

func (*Server) Errorc

func (s *Server) Errorc(ctx string, e error)

Error logs an error with a context

func (*Server) Errorf

func (s *Server) Errorf(format string, v ...interface{})

Errorf logs an error

func (*Server) Errors

func (s *Server) Errors(scope interface{}, e error)

Error logs an error with a scope

func (*Server) Errorsc

func (s *Server) Errorsc(scope interface{}, ctx string, e error)

Error logs an error with a scope and context

func (*Server) EventsEnabled

func (s *Server) EventsEnabled() bool

EventsEnabled will report if the server has internal events enabled via a defined system account.

func (*Server) Fatalf

func (s *Server) Fatalf(format string, v ...interface{})

Fatalf logs a fatal error

func (*Server) GatewayAddr

func (s *Server) GatewayAddr() *net.TCPAddr

GatewayAddr returns the net.Addr object for the gateway listener.

func (*Server) Gatewayz

func (s *Server) Gatewayz(opts *GatewayzOptions) (*Gatewayz, error)

Gatewayz returns a Gatewayz struct containing information about gateways.

func (*Server) GetClient

func (s *Server) GetClient(cid uint64) *client

GetClient will return the client associated with cid.

func (*Server) GetLeafNode

func (s *Server) GetLeafNode(cid uint64) *client

GetLeafNode returns the leafnode associated with the cid.

func (*Server) GlobalAccount

func (s *Server) GlobalAccount() *Account

GlobalAccount returns the global account. Default clients will use the global account.

func (*Server) HTTPHandler

func (s *Server) HTTPHandler() http.Handler

HTTPHandler returns the http.Handler object used to handle monitoring endpoints. It will return nil if the server is not configured for monitoring, or if the server has not been started yet (Server.Start()).

func (*Server) HandleConnz

func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request)

HandleConnz process HTTP requests for connection information.

func (*Server) HandleGatewayz

func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request)

HandleGatewayz process HTTP requests for route information.

func (*Server) HandleLeafz

func (s *Server) HandleLeafz(w http.ResponseWriter, r *http.Request)

HandleLeafz process HTTP requests for leafnode information.

func (*Server) HandleRoot

func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request)

HandleRoot will show basic info and links to others handlers.

func (*Server) HandleRoutez

func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request)

HandleRoutez process HTTP requests for route information.

func (*Server) HandleStacksz

func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request)

HandleStacksz processes HTTP requests for getting stacks

func (*Server) HandleSubsz

func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request)

HandleSubsz processes HTTP requests for subjects stats.

func (*Server) HandleVarz

func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request)

HandleVarz will process HTTP requests for server information.

func (*Server) ID

func (s *Server) ID() string

ID returns the server's ID

func (*Server) JetStreamConfig

func (s *Server) JetStreamConfig() *JetStreamConfig

JetStreamConfig will return the current config. Useful if the system created a dynamic configuration. A copy is returned.

func (*Server) JetStreamEnabled

func (s *Server) JetStreamEnabled() bool

JetStreamEnabled reports if jetstream is enabled.

func (*Server) JetStreamNumAccounts

func (s *Server) JetStreamNumAccounts() int

JetStreamNumAccounts returns the number of enabled accounts this server is tracking.

func (*Server) JetStreamReservedResources

func (s *Server) JetStreamReservedResources() (int64, int64, error)

JetStreamReservedResources returns the reserved resources if JetStream is enabled.

func (*Server) Leafz

func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error)

Leafz returns a Leafz structure containing information about leafnodes.

func (*Server) LookupAccount

func (s *Server) LookupAccount(name string) (*Account, error)

LookupAccount is a public function to return the account structure associated with name.

func (*Server) LookupOrRegisterAccount

func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool)

LookupOrRegisterAccount will return the given account if known or create a new entry.

func (*Server) MonitorAddr

func (s *Server) MonitorAddr() *net.TCPAddr

MonitorAddr will return the net.Addr object for the monitoring listener.

func (*Server) Name

func (s *Server) Name() string

Name returns the server's name. This will be the same as the ID if it was not set.

func (*Server) NewAccountsAllowed

func (s *Server) NewAccountsAllowed() bool

NewAccountsAllowed returns whether or not new accounts can be created on the fly.

func (*Server) NonceRequired

func (s *Server) NonceRequired() bool

NonceRequired tells us if we should send a nonce.

func (*Server) Noticef

func (s *Server) Noticef(format string, v ...interface{})

Noticef logs a notice statement

func (*Server) NumActiveAccounts

func (s *Server) NumActiveAccounts() int32

NumActiveAccounts reports number of active accounts on this server.

func (*Server) NumClients

func (s *Server) NumClients() int

NumClients will report the number of registered clients.

func (*Server) NumLeafNodes

func (s *Server) NumLeafNodes() int

NumLeafNodes will report number of leaf node connections.

func (*Server) NumLoadedAccounts

func (s *Server) NumLoadedAccounts() int

NumLoadedAccounts returns the number of loaded accounts.

func (*Server) NumOutboundGateways

func (s *Server) NumOutboundGateways() int

NumOutboundGateways is public here mostly for testing.

func (*Server) NumRemotes

func (s *Server) NumRemotes() int

NumRemotes will report number of registered remotes.

func (*Server) NumRoutes

func (s *Server) NumRoutes() int

NumRoutes will report the number of registered routes.

func (*Server) NumSlowConsumers

func (s *Server) NumSlowConsumers() int64

NumSlowConsumers will report the number of slow consumers.

func (*Server) NumSubscriptions

func (s *Server) NumSubscriptions() uint32

NumSubscriptions will report how many subscriptions are active.

func (*Server) PortsInfo

func (s *Server) PortsInfo(maxWait time.Duration) *Ports

PortsInfo attempts to resolve all the ports. If after maxWait the ports are not resolved, it returns nil. Otherwise it returns a Ports struct describing ports where the server can be contacted

func (*Server) ProfilerAddr

func (s *Server) ProfilerAddr() *net.TCPAddr

ProfilerAddr returns the net.Addr object for the profiler listener.

func (*Server) ReOpenLogFile

func (s *Server) ReOpenLogFile()

ReOpenLogFile if the logger is a file based logger, close and re-open the file. This allows for file rotation by 'mv'ing the file then signaling the process to trigger this function.

func (*Server) ReadyForConnections

func (s *Server) ReadyForConnections(dur time.Duration) bool

ReadyForConnections returns `true` if the server is ready to accept clients and, if routing is enabled, route connections. If after the duration `dur` the server is still not ready, returns `false`.

func (*Server) RegisterAccount

func (s *Server) RegisterAccount(name string) (*Account, error)

RegisterAccount will register an account. The account must be new or this call will fail.

func (*Server) Reload

func (s *Server) Reload() error

Reload reads the current configuration file and applies any supported changes. This returns an error if the server was not started with a config file or an option which doesn't support hot-swapping was changed.

func (*Server) Routez

func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error)

Routez returns a Routez struct containing information about routes.

func (*Server) SetAccountResolver

func (s *Server) SetAccountResolver(ar AccountResolver)

SetAccountResolver will assign the account resolver.

func (*Server) SetDefaultSystemAccount

func (s *Server) SetDefaultSystemAccount() error

SetDefaultSystemAccount will create a default system account if one is not present.

func (*Server) SetLogger

func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool)

SetLogger sets the logger of the server

func (*Server) SetLoggerV2

func (s *Server) SetLoggerV2(logger Logger, debugFlag, traceFlag, sysTrace bool)

SetLogger sets the logger of the server

func (*Server) SetSystemAccount

func (s *Server) SetSystemAccount(accName string) error

SetSystemAccount will set the internal system account. If root operators are present it will also check validity.

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown will shutdown the server instance by kicking out the AcceptLoop and closing all associated clients.

func (*Server) Start

func (s *Server) Start()

Start up the server, this will block. Start via a Go routine if needed.

func (*Server) StartHTTPMonitoring

func (s *Server) StartHTTPMonitoring()

StartHTTPMonitoring will enable the HTTP monitoring port. DEPRECATED: Should use StartMonitoring.

func (*Server) StartHTTPSMonitoring

func (s *Server) StartHTTPSMonitoring()

StartHTTPSMonitoring will enable the HTTPS monitoring port. DEPRECATED: Should use StartMonitoring.

func (*Server) StartMonitoring

func (s *Server) StartMonitoring() error

StartMonitoring starts the HTTP or HTTPs server if needed.

func (*Server) StartProfiler

func (s *Server) StartProfiler()

StartProfiler is called to enable dynamic profiling.

func (*Server) StartRouting

func (s *Server) StartRouting(clientListenReady chan struct{})

StartRouting will start the accept loop on the cluster host:port and will actively try to connect to listed routes.

func (*Server) Subsz

func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error)

Subsz returns a Subsz struct containing subjects statistics

func (*Server) SystemAccount

func (s *Server) SystemAccount() *Account

SystemAccount returns the system account if set.

func (*Server) Tracef

func (s *Server) Tracef(format string, v ...interface{})

Tracef logs a trace statement

func (*Server) TrackedRemoteServers

func (s *Server) TrackedRemoteServers() int

TrackedRemoteServers returns how many remote servers we are tracking from a system events perspective.

func (*Server) UpdateAccountClaims

func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims)

updateAccountClaims will update an existing account with new claims. This will replace any exports or imports previously defined. Lock MUST NOT be held upon entry.

func (*Server) Varz

func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error)

Varz returns a Varz struct containing the server information.

func (*Server) WaitForShutdown

func (s *Server) WaitForShutdown()

WaitForShutdown will block until the server has been fully shutdown.

func (*Server) Warnf

func (s *Server) Warnf(format string, v ...interface{})

Warnf logs a warning error

type ServerInfo

type ServerInfo struct {
	Name      string    `json:"name"`
	Host      string    `json:"host"`
	ID        string    `json:"id"`
	Cluster   string    `json:"cluster,omitempty"`
	Version   string    `json:"ver"`
	Seq       uint64    `json:"seq"`
	JetStream bool      `json:"jetstream"`
	Time      time.Time `json:"time"`
}

ServerInfo identifies remote servers.

type ServerStats

type ServerStats struct {
	Start            time.Time      `json:"start"`
	Mem              int64          `json:"mem"`
	Cores            int            `json:"cores"`
	CPU              float64        `json:"cpu"`
	Connections      int            `json:"connections"`
	TotalConnections uint64         `json:"total_connections"`
	ActiveAccounts   int            `json:"active_accounts"`
	NumSubs          uint32         `json:"subscriptions"`
	Sent             DataStats      `json:"sent"`
	Received         DataStats      `json:"received"`
	SlowConsumers    int64          `json:"slow_consumers"`
	Routes           []*RouteStat   `json:"routes,omitempty"`
	Gateways         []*GatewayStat `json:"gateways,omitempty"`
}

ServerStats hold various statistics that we will periodically send out.

type ServerStatsMsg

type ServerStatsMsg struct {
	Server ServerInfo  `json:"server"`
	Stats  ServerStats `json:"statsz"`
}

ServerStatsMsg is sent periodically with stats updates.

type ServiceLatency

type ServiceLatency struct {
	TypedEvent

	Status         int           `json:"status"`
	Error          string        `json:"description,omitempty"`
	Requestor      LatencyClient `json:"requestor,omitempty"`
	Responder      LatencyClient `json:"responder,omitempty"`
	RequestHeader  http.Header   `json:"header,omitempty"` // only contains header(s) triggering the measurement
	RequestStart   time.Time     `json:"start"`
	ServiceLatency time.Duration `json:"service"`
	SystemLatency  time.Duration `json:"system"`
	TotalLatency   time.Duration `json:"total"`
}

ServiceLatency is the JSON message sent out in response to latency tracking for an accounts exported services. Additional client info is available in requestor and responder. Note that for a requestor, the only information shared by default is the RTT used to calculate the total latency. The requestor's account can designate to share the additional information in the service import.

func (*ServiceLatency) NATSTotalTime

func (nl *ServiceLatency) NATSTotalTime() time.Duration

NATSTotalTime is a helper function that totals the NATS latencies.

type ServiceRespType

type ServiceRespType uint8

ServiceRespType represents the types of service request response types.

const (
	Singleton ServiceRespType = iota
	Streamed
	Chunked
)

Service response types. Defaults to a singleton.

func (ServiceRespType) String

func (rt ServiceRespType) String() string

String helper.

type SnapshotResult

type SnapshotResult struct {
	Reader  io.ReadCloser
	BlkSize int
	NumBlks int
}

SnapshotResult contains information about the snapshot.

type SortOpt

type SortOpt string

SortOpt is a helper type to sort clients

const (
	ByCid      SortOpt = "cid"        // By connection ID
	ByStart    SortOpt = "start"      // By connection start time, same as CID
	BySubs     SortOpt = "subs"       // By number of subscriptions
	ByPending  SortOpt = "pending"    // By amount of data in bytes waiting to be sent to client
	ByOutMsgs  SortOpt = "msgs_to"    // By number of messages sent
	ByInMsgs   SortOpt = "msgs_from"  // By number of messages received
	ByOutBytes SortOpt = "bytes_to"   // By amount of bytes sent
	ByInBytes  SortOpt = "bytes_from" // By amount of bytes received
	ByLast     SortOpt = "last"       // By the last activity
	ByIdle     SortOpt = "idle"       // By the amount of inactivity
	ByUptime   SortOpt = "uptime"     // By the amount of time connections exist
	ByStop     SortOpt = "stop"       // By the stop time for a closed connection
	ByReason   SortOpt = "reason"     // By the reason for a closed connection

)

Possible sort options

func (SortOpt) IsValid

func (s SortOpt) IsValid() bool

IsValid determines if a sort option is valid

type StatszEventOptions

type StatszEventOptions struct {
	EventFilterOptions
}

StatszEventOptions are options passed to Statsz

type StorageType

type StorageType int

StorageType determines how messages are stored for retention.

const (
	// MemoryStorage specifies in memory only.
	MemoryStorage StorageType = iota
	// FileStorage specifies on disk, designated by the JetStream config StoreDir.
	FileStorage
)

func (StorageType) MarshalJSON

func (st StorageType) MarshalJSON() ([]byte, error)

func (StorageType) String

func (st StorageType) String() string

func (*StorageType) UnmarshalJSON

func (st *StorageType) UnmarshalJSON(data []byte) error

type StoredMsg

type StoredMsg struct {
	Subject  string    `json:"subject"`
	Sequence uint64    `json:"seq"`
	Header   []byte    `json:"hdrs,omitempty"`
	Data     []byte    `json:"data,omitempty"`
	Time     time.Time `json:"time"`
}

StoredMsg is for raw access to messages in a stream.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is a jetstream stream of messages. When we receive a message internally destined for a Stream we will direct link from the client to this Stream structure.

func (*Stream) AddConsumer

func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error)

func (*Stream) Config

func (mset *Stream) Config() StreamConfig

Config returns the stream's configuration.

func (*Stream) Consumers

func (mset *Stream) Consumers() []*Consumer

Consunmers will return all the current consumers for this stream.

func (*Stream) Created

func (mset *Stream) Created() time.Time

Created returns created time.

func (*Stream) Delete

func (mset *Stream) Delete() error

Delete deletes a stream from the owning account.

func (*Stream) DeleteConsumer

func (mset *Stream) DeleteConsumer(o *Consumer) error

DeleteConsumer will delete the consumer from this stream.

func (*Stream) DeleteMsg

func (mset *Stream) DeleteMsg(seq uint64) (bool, error)

DeleteMsg will remove a message from a stream.

func (*Stream) EraseMsg

func (mset *Stream) EraseMsg(seq uint64) (bool, error)

EraseMsg will securely remove a message and rewrite the data with random data.

func (*Stream) GetMsg

func (mset *Stream) GetMsg(seq uint64) (*StoredMsg, error)

func (*Stream) LookupConsumer

func (mset *Stream) LookupConsumer(name string) *Consumer

LookupConsumer will retrieve a consumer by name.

func (*Stream) Name

func (mset *Stream) Name() string

Name returns the stream name.

func (*Stream) NumConsumers

func (mset *Stream) NumConsumers() int

NumConsumers reports on number of active observables for this stream.

func (*Stream) NumMsgIds

func (mset *Stream) NumMsgIds() int

NumMsgIds returns the number of message ids being tracked for duplicate suppression.

func (*Stream) Purge

func (mset *Stream) Purge() uint64

Purge will remove all messages from the stream and underlying store.

func (*Stream) RemoveMsg

func (mset *Stream) RemoveMsg(seq uint64) (bool, error)

RemoveMsg will remove a message from a stream. FIXME(dlc) - Should pick one and be consistent.

func (*Stream) Snapshot

func (mset *Stream) Snapshot(deadline time.Duration, checkMsgs, includeConsumers bool) (*SnapshotResult, error)

Snapshot creates a snapshot for the stream and possibly consumers.

func (*Stream) State

func (mset *Stream) State() StreamState

State will return the current state for this stream.

func (*Stream) Update

func (mset *Stream) Update(config *StreamConfig) error

Update will allow certain configuration properties of an existing stream to be updated.

type StreamConfig

type StreamConfig struct {
	Name         string          `json:"name"`
	Subjects     []string        `json:"subjects,omitempty"`
	Retention    RetentionPolicy `json:"retention"`
	MaxConsumers int             `json:"max_consumers"`
	MaxMsgs      int64           `json:"max_msgs"`
	MaxBytes     int64           `json:"max_bytes"`
	Discard      DiscardPolicy   `json:"discard"`
	MaxAge       time.Duration   `json:"max_age"`
	MaxMsgSize   int32           `json:"max_msg_size,omitempty"`
	Storage      StorageType     `json:"storage"`
	Replicas     int             `json:"num_replicas"`
	NoAck        bool            `json:"no_ack,omitempty"`
	Template     string          `json:"template_owner,omitempty"`
	Duplicates   time.Duration   `json:"duplicate_window,omitempty"`
}

StreamConfig will determine the name, subjects and retention policy for a given stream. If subjects is empty the name will be used.

type StreamInfo

type StreamInfo struct {
	Config  StreamConfig `json:"config"`
	Created time.Time    `json:"created"`
	State   StreamState  `json:"state"`
}

StreamInfo shows config and current state for this stream.

type StreamState

type StreamState struct {
	Msgs      uint64    `json:"messages"`
	Bytes     uint64    `json:"bytes"`
	FirstSeq  uint64    `json:"first_seq"`
	FirstTime time.Time `json:"first_ts"`
	LastSeq   uint64    `json:"last_seq"`
	LastTime  time.Time `json:"last_ts"`
	Consumers int       `json:"consumer_count"`
}

StreamStats is information about the given stream.

type StreamStore

type StreamStore interface {
	StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error)
	LoadMsg(seq uint64) (subj string, hdr, msg []byte, ts int64, err error)
	RemoveMsg(seq uint64) (bool, error)
	EraseMsg(seq uint64) (bool, error)
	Purge() uint64
	GetSeqFromTime(t time.Time) uint64
	State() StreamState
	StorageBytesUpdate(func(int64))
	UpdateConfig(cfg *StreamConfig) error
	Delete() error
	Stop() error
	ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error)
	Snapshot(deadline time.Duration, includeConsumers, checkMsgs bool) (*SnapshotResult, error)
}

type StreamTemplate

type StreamTemplate struct {
	*StreamTemplateConfig
	// contains filtered or unexported fields
}

StreamTemplate

func (*StreamTemplate) Delete

func (t *StreamTemplate) Delete() error

type StreamTemplateConfig

type StreamTemplateConfig struct {
	Name       string        `json:"name"`
	Config     *StreamConfig `json:"config"`
	MaxStreams uint32        `json:"max_streams"`
}

StreamTemplateConfig allows a configuration to auto-create streams based on this template when a message is received that matches. Each new stream will use the config as the template config to create them.

type StreamTemplateInfo

type StreamTemplateInfo struct {
	Config  *StreamTemplateConfig `json:"config"`
	Streams []string              `json:"streams"`
}

StreamTemplateInfo

type SubDetail

type SubDetail struct {
	Account string `json:"account,omitempty"`
	Subject string `json:"subject"`
	Queue   string `json:"qgroup,omitempty"`
	Sid     string `json:"sid"`
	Msgs    int64  `json:"msgs"`
	Max     int64  `json:"max,omitempty"`
	Cid     uint64 `json:"cid"`
}

SubDetail is for verbose information for subscriptions.

type SubjectPermission

type SubjectPermission struct {
	Allow []string `json:"allow,omitempty"`
	Deny  []string `json:"deny,omitempty"`
}

SubjectPermission is an individual allow and deny struct for publish and subscribe authorizations.

type Sublist

type Sublist struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

A Sublist stores and efficiently retrieves subscriptions.

func NewSublist

func NewSublist(enableCache bool) *Sublist

NewSublist will create a default sublist with caching enabled per the flag.

func NewSublistNoCache

func NewSublistNoCache() *Sublist

NewSublistNoCache will create a default sublist with caching disabled.

func NewSublistWithCache

func NewSublistWithCache() *Sublist

NewSublistWithCache will create a default sublist with caching enabled.

func (*Sublist) All

func (s *Sublist) All(subs *[]*subscription)

All is used to collect all subscriptions.

func (*Sublist) CacheCount

func (s *Sublist) CacheCount() int

CacheCount returns the number of result sets in the cache.

func (*Sublist) CacheEnabled

func (s *Sublist) CacheEnabled() bool

CacheEnabled returns whether or not caching is enabled for this sublist.

func (*Sublist) ClearNotification

func (s *Sublist) ClearNotification(subject string, notify chan<- bool) bool

func (*Sublist) Count

func (s *Sublist) Count() uint32

Count returns the number of subscriptions.

func (*Sublist) Insert

func (s *Sublist) Insert(sub *subscription) error

Insert adds a subscription into the sublist

func (*Sublist) Match

func (s *Sublist) Match(subject string) *SublistResult

Match will match all entries to the literal subject. It will return a set of results for both normal and queue subscribers.

func (*Sublist) RegisterNotification

func (s *Sublist) RegisterNotification(subject string, notify chan<- bool) error

RegisterNotification will register for notifications when interest for the given subject changes. The subject must be a literal publish type subject. The notification is true for when the first interest for a subject is inserted, and false when all interest in the subject is removed. The sublist will not block when trying to send the notification. Its up to the caller to make sure the channel send will not block.

func (*Sublist) Remove

func (s *Sublist) Remove(sub *subscription) error

Remove will remove a subscription.

func (*Sublist) RemoveBatch

func (s *Sublist) RemoveBatch(subs []*subscription) error

RemoveBatch will remove a list of subscriptions.

func (*Sublist) Stats

func (s *Sublist) Stats() *SublistStats

Stats will return a stats structure for the current state.

func (*Sublist) UpdateRemoteQSub

func (s *Sublist) UpdateRemoteQSub(sub *subscription)

UpdateRemoteQSub should be called when we update the weight of an existing remote queue sub.

type SublistResult

type SublistResult struct {
	// contains filtered or unexported fields
}

SublistResult is a result structure better optimized for queue subs.

type SublistStats

type SublistStats struct {
	NumSubs      uint32  `json:"num_subscriptions"`
	NumCache     uint32  `json:"num_cache"`
	NumInserts   uint64  `json:"num_inserts"`
	NumRemoves   uint64  `json:"num_removes"`
	NumMatches   uint64  `json:"num_matches"`
	CacheHitRate float64 `json:"cache_hit_rate"`
	MaxFanout    uint32  `json:"max_fanout"`
	AvgFanout    float64 `json:"avg_fanout"`
	// contains filtered or unexported fields
}

SublistStats are public stats for the sublist

type Subsz

type Subsz struct {
	ID  string    `json:"server_id"`
	Now time.Time `json:"now"`
	*SublistStats
	Total  int         `json:"total"`
	Offset int         `json:"offset"`
	Limit  int         `json:"limit"`
	Subs   []SubDetail `json:"subscriptions_list,omitempty"`
}

Subsz represents detail information on current connections.

type SubszEventOptions

type SubszEventOptions struct {
	SubszOptions
	EventFilterOptions
}

In the context of system events, SubzEventOptions are options passed to Subz

type SubszOptions

type SubszOptions struct {
	// Offset is used for pagination. Subsz() only returns connections starting at this
	// offset from the global results.
	Offset int `json:"offset"`

	// Limit is the maximum number of subscriptions that should be returned by Subsz().
	Limit int `json:"limit"`

	// Subscriptions indicates if subscription details should be included in the results.
	Subscriptions bool `json:"subscriptions"`

	// Filter based on this account name.
	Account string `json:"account,omitempty"`

	// Test the list against this subject. Needs to be literal since it signifies a publish subject.
	// We will only return subscriptions that would match if a message was sent to this subject.
	Test string `json:"test,omitempty"`
}

SubszOptions are the options passed to Subsz. As of now, there are no options defined.

type TLSConfigOpts

type TLSConfigOpts struct {
	CertFile         string
	KeyFile          string
	CaFile           string
	Verify           bool
	Insecure         bool
	Map              bool
	Timeout          float64
	Ciphers          []uint16
	CurvePreferences []tls.CurveID
}

TLSConfigOpts holds the parsed tls config information, used with flag parsing

type TemplateStore

type TemplateStore interface {
	Store(*StreamTemplate) error
	Delete(*StreamTemplate) error
}

TemplateStore stores templates.

type TypedEvent

type TypedEvent struct {
	Type string    `json:"type"`
	ID   string    `json:"id"`
	Time time.Time `json:"timestamp"`
}

TypedEvent is a event or advisory sent by the server that has nats type hints typically used for events that might be consumed by 3rd party event systems

type URLAccResolver

type URLAccResolver struct {
	// contains filtered or unexported fields
}

URLAccResolver implements an http fetcher.

func NewURLAccResolver

func NewURLAccResolver(url string) (*URLAccResolver, error)

NewURLAccResolver returns a new resolver for the given base URL.

func (*URLAccResolver) Close

func (*URLAccResolver) Close()

func (*URLAccResolver) Fetch

func (ur *URLAccResolver) Fetch(name string) (string, error)

Fetch will fetch the account jwt claims from the base url, appending the account name onto the end.

func (*URLAccResolver) IsReadOnly

func (*URLAccResolver) IsReadOnly() bool

func (*URLAccResolver) IsTrackingUpdate

func (*URLAccResolver) IsTrackingUpdate() bool

func (*URLAccResolver) Reload

func (*URLAccResolver) Reload() error

func (*URLAccResolver) Start

func (*URLAccResolver) Start(*Server) error

func (*URLAccResolver) Store

func (*URLAccResolver) Store(_, _ string) error

type User

type User struct {
	Username    string       `json:"user"`
	Password    string       `json:"password"`
	Permissions *Permissions `json:"permissions,omitempty"`
	Account     *Account     `json:"account,omitempty"`
}

User is for multiple accounts/users.

type Varz

type Varz struct {
	ID                string            `json:"server_id"`
	Name              string            `json:"server_name"`
	Version           string            `json:"version"`
	Proto             int               `json:"proto"`
	GitCommit         string            `json:"git_commit,omitempty"`
	GoVersion         string            `json:"go"`
	Host              string            `json:"host"`
	Port              int               `json:"port"`
	AuthRequired      bool              `json:"auth_required,omitempty"`
	TLSRequired       bool              `json:"tls_required,omitempty"`
	TLSVerify         bool              `json:"tls_verify,omitempty"`
	IP                string            `json:"ip,omitempty"`
	ClientConnectURLs []string          `json:"connect_urls,omitempty"`
	WSConnectURLs     []string          `json:"ws_connect_urls,omitempty"`
	MaxConn           int               `json:"max_connections"`
	MaxSubs           int               `json:"max_subscriptions,omitempty"`
	PingInterval      time.Duration     `json:"ping_interval"`
	MaxPingsOut       int               `json:"ping_max"`
	HTTPHost          string            `json:"http_host"`
	HTTPPort          int               `json:"http_port"`
	HTTPBasePath      string            `json:"http_base_path"`
	HTTPSPort         int               `json:"https_port"`
	AuthTimeout       float64           `json:"auth_timeout"`
	MaxControlLine    int32             `json:"max_control_line"`
	MaxPayload        int               `json:"max_payload"`
	MaxPending        int64             `json:"max_pending"`
	Cluster           ClusterOptsVarz   `json:"cluster,omitempty"`
	Gateway           GatewayOptsVarz   `json:"gateway,omitempty"`
	LeafNode          LeafNodeOptsVarz  `json:"leaf,omitempty"`
	JetStream         JetStreamVarz     `json:"jetstream,omitempty"`
	TLSTimeout        float64           `json:"tls_timeout"`
	WriteDeadline     time.Duration     `json:"write_deadline"`
	Start             time.Time         `json:"start"`
	Now               time.Time         `json:"now"`
	Uptime            string            `json:"uptime"`
	Mem               int64             `json:"mem"`
	Cores             int               `json:"cores"`
	MaxProcs          int               `json:"gomaxprocs"`
	CPU               float64           `json:"cpu"`
	Connections       int               `json:"connections"`
	TotalConnections  uint64            `json:"total_connections"`
	Routes            int               `json:"routes"`
	Remotes           int               `json:"remotes"`
	Leafs             int               `json:"leafnodes"`
	InMsgs            int64             `json:"in_msgs"`
	OutMsgs           int64             `json:"out_msgs"`
	InBytes           int64             `json:"in_bytes"`
	OutBytes          int64             `json:"out_bytes"`
	SlowConsumers     int64             `json:"slow_consumers"`
	Subscriptions     uint32            `json:"subscriptions"`
	HTTPReqStats      map[string]uint64 `json:"http_req_stats"`
	ConfigLoadTime    time.Time         `json:"config_load_time"`
}

Varz will output server information on the monitoring port at /varz.

type VarzEventOptions

type VarzEventOptions struct {
	VarzOptions
	EventFilterOptions
}

In the context of system events, VarzEventOptions are options passed to Varz

type VarzOptions

type VarzOptions struct{}

VarzOptions are the options passed to Varz(). Currently, there are no options defined.

type WebsocketOpts

type WebsocketOpts struct {
	// The server will accept websocket client connections on this hostname/IP.
	Host string
	// The server will accept websocket client connections on this port.
	Port int
	// The host:port to advertise to websocket clients in the cluster.
	Advertise string

	// If no user is provided when a client connects, will default to this
	// user and associated account. This user has to exist either in the
	// Users defined here or in the global options.
	NoAuthUser string

	// Name of the cookie, which if present in WebSocket upgrade headers,
	// will be treated as JWT during CONNECT phase as long as
	// "jwt" specified in the CONNECT options is missing or empty.
	JWTCookie string

	// Authentication section. If anything is configured in this section,
	// it will override the authorization configuration for regular clients.
	Username string
	Password string
	Token    string
	Users    []*User
	Nkeys    []*NkeyUser

	// Timeout for the authentication process.
	AuthTimeout float64

	// By default the server will enforce the use of TLS. If no TLS configuration
	// is provided, you need to explicitly set NoTLS to true to allow the server
	// to start without TLS configuration. Note that if a TLS configuration is
	// present, this boolean is ignored and the server will run the Websocket
	// server with that TLS configuration.
	// Running without TLS is less secure since Websocket clients that use bearer
	// tokens will send them in clear. So this should not be used in production.
	NoTLS bool

	// TLS configuration is required.
	TLSConfig *tls.Config
	// If true, map certificate values for authentication purposes.
	TLSMap bool

	// If true, the Origin header must match the request's host.
	SameOrigin bool

	// Only origins in this list will be accepted. If empty and
	// SameOrigin is false, any origin is accepted.
	AllowedOrigins []string

	// If set to true, the server will negotiate with clients
	// if compression can be used. If this is false, no compression
	// will be used (both in server and clients) since it has to
	// be negotiated between both endpoints
	Compression bool

	// Total time allowed for the server to read the client request
	// and write the response back to the client. This include the
	// time needed for the TLS Handshake.
	HandshakeTimeout time.Duration
}

WebsocketOpts ...

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL