nsqd

package
v0.0.0-...-94081c8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2019 License: MIT Imports: 50 Imported by: 0

README

nsqd

nsqd is the daemon that receives, queues, and delivers messages to clients.

Read the docs

Documentation

Overview

Package skiplist implements skip list based maps and sets.

Skip lists are a data structure that can be used in place of balanced trees. Skip lists use probabilistic balancing rather than strictly enforced balancing and as a result the algorithms for insertion and deletion in skip lists are much simpler and significantly faster than equivalent algorithms for balanced trees.

Skip lists were first described in Pugh, William (June 1990). "Skip lists: a probabilistic alternative to balanced trees". Communications of the ACM 33 (6): 668–676

Index

Constants

View Source
const (
	LOG_DEBUG = lg.DEBUG
	LOG_INFO  = lg.INFO
	LOG_WARN  = lg.WARN
	LOG_ERROR = lg.ERROR
	LOG_FATAL = lg.FATAL
)
View Source
const (
	UNKONW_STATUS = MessageDtType(0)
	PRE_STATUS    = MessageDtType(1)
	CANCEL_STATUS = MessageDtType(2)
	COMMIT_STATUS = MessageDtType(3)
)
View Source
const (
	TLSNotRequired = iota
	TLSRequiredExceptHTTP
	TLSRequired
)
View Source
const (
	FrameTypeResponse int32 = 0
	FrameTypeError    int32 = 1
	FrameTypeMessage  int32 = 2
	FrameTypeUnknown  int32 = -1
)

frame types

View Source
const DefaultMaxLevel = 32
View Source
const (
	MAX_POSSIBLE_MSG_SIZE = 1 << 28
)
View Source
const (
	MAX_QUEUE_OFFSET_META_DATA_KEEP = 100
)
View Source
const (
	MsgIDLength = 16
)

Variables

View Source
var (
	ErrInvalidOffset     = errors.New("invalid offset")
	ErrNeedFixQueueStart = errors.New("init queue start should be fixed")
)
View Source
var DataInconsistentError = errors.New("DataInconsistentError")
View Source
var EndOfMsgVirOffsetError = errors.New("EndOfMsgVirOffsetError")
View Source
var ErrIDBackwards = errors.New("ID went backward")
View Source
var ErrSequenceExpired = errors.New("sequence expired")
View Source
var ErrTimeBackwards = errors.New("time has gone backwards")
View Source
var FlagInconsistentError = errors.New("FlagInconsistentError")
View Source
var MagicDT = []byte("  DT")

MagicDT is the initial identifier sent when connecting for DT clients

View Source
var MagicV1 = []byte("  V1")

MagicV1 is the initial identifier sent when connecting for V1 clients

View Source
var MagicV2 = []byte("  V2")

MagicV2 is the initial identifier sent when connecting for V2 clients

View Source
var StepInconsistentError = errors.New("StepInconsistentError")

Functions

func GetQueueFileName

func GetQueueFileName(dataRoot string, base string, fileNum int64) string

func GetTopicFullName

func GetTopicFullName(topic string, part int) string

func IsValidChannelName

func IsValidChannelName(name string) bool

IsValidChannelName checks a channel name for correctness

func IsValidTopicName

func IsValidTopicName(name string) bool

IsValidTopicName checks a topic name for correctness

func NewGUIDFactory

func NewGUIDFactory(nodeID int64) *guidFactory

func ReadResponse

func ReadResponse(r io.Reader) ([]byte, error)

ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
    size       data

func ReadUnpackedResponse

func ReadUnpackedResponse(r io.Reader) (int32, []byte, error)

ReadUnpackedResponse reads and parses data from the underlying TCP connection according to the NSQ TCP protocol spec and returns the frameType, data or error

func UnpackResponse

func UnpackResponse(response []byte) (int32, []byte, error)

UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
  frame ID     data

Returns a triplicate of: frame type, data ([]byte), error

Types

type BackendQueueEnd

type BackendQueueEnd interface {
	Offset() int64
	TotalMsgCnt() int64
	IsSame(BackendQueueEnd) bool
}

type BackendQueueReader

type BackendQueueReader interface {
	Put([]byte) (BackendQueueEnd, error)
	Close() error
	Delete() error
	Empty() error
	Depth() int64
	ReaderFlush() error
	GetQueueReadEnd() BackendQueueEnd
	GetQueueCurMemRead() BackendQueueEnd
	UpdateBackendQueueEnd(BackendQueueEnd)
	TryReadOne() (*ReadResult, bool)
	Confirm(start int64, end int64, endCnt int64) bool
}

BackendQueueReader represents reader for current topic's consumer

type BackendQueueWriter

type BackendQueueWriter interface {
	Put(data []byte) (BackendQueueEnd, error)
	Close() error
	Delete() error
	Empty() error
	WriterFlush() (bool, bool, error)
	EndInfo()
	GetQueueReadEnd() BackendQueueEnd
	GetQueueCurWriterEnd() BackendQueueEnd
}

BackendQueue represents the behavior for the secondary message storage system

func NewDiskQueueWriter

func NewDiskQueueWriter(name string, dataPath string, maxBytesPerFile int64,
	minMsgSize int32, maxMsgSize int32, ctx *context) (BackendQueueWriter, error)

type Channel

type Channel struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)

There can be multiple channels per topic, each with there own unique set of subscribers (clients).

Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.

func NewChannel

func NewChannel(topicName string, channelName string, chEnd BackendQueueEnd,
	ctx *context, maxWin int64, deleteCallback func(*Channel)) *Channel

NewChannel creates a new instance of the Channel type and returns a pointer

func (*Channel) AddClient

func (c *Channel) AddClient(clientID int64, client Consumer) error

AddClient adds a client to the Channel's client list

func (*Channel) CheckBack

func (c *Channel) CheckBack(m *Message)

func (*Channel) Close

func (c *Channel) Close() error

Close cleanly closes the Channel

func (*Channel) CommitDtPreMsg

func (c *Channel) CommitDtPreMsg(msgId MessageID) error

func (*Channel) Delete

func (c *Channel) Delete() error

Delete empties the channel and closes

func (*Channel) Depth

func (c *Channel) Depth() int64

func (*Channel) Empty

func (c *Channel) Empty() error

func (*Channel) Exiting

func (c *Channel) Exiting() bool

Exiting returns a boolean indicating if this channel is closed/exiting

func (*Channel) FinishMessage

func (c *Channel) FinishMessage(clientID int64, id MessageID) error

FinishMessage successfully discards an in-flight message

func (*Channel) GetDtPreMsgByCmtMsg

func (c *Channel) GetDtPreMsgByCmtMsg(msgId MessageID) (*Message, error)

func (*Channel) HandleSyncChannelFromSlave

func (c *Channel) HandleSyncChannelFromSlave() ([]byte, error)

func (*Channel) IsPaused

func (c *Channel) IsPaused() bool

func (*Channel) ListMost10Item

func (c *Channel) ListMost10Item() []*Message

func (*Channel) Pause

func (c *Channel) Pause() error

func (*Channel) PutMessageDeferred

func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration)

func (*Channel) RemoveClient

func (c *Channel) RemoveClient(clientID int64)

RemoveClient removes a client from the Channel's client list

func (*Channel) RequeueMessage

func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error

RequeueMessage requeues a message based on `time.Duration`, ie:

`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout

and requeue a message (aka "deferred requeue")

func (*Channel) RestartPreDtMsgTimeout

func (c *Channel) RestartPreDtMsgTimeout(msg *Message, timeout time.Duration)

func (*Channel) StartDeferredTimeout

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error

func (*Channel) StartInFlightTimeout

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error

func (*Channel) StartPreDtMsgTimeout

func (c *Channel) StartPreDtMsgTimeout(msg *Message, timeout time.Duration) error

func (*Channel) TouchMessage

func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error

TouchMessage resets the timeout for an in-flight message

func (*Channel) UnPause

func (c *Channel) UnPause() error

func (*Channel) UpdateBackendQueueEnd

func (c *Channel) UpdateBackendQueueEnd(backendQueueEnd BackendQueueEnd)

type ChannelStats

type ChannelStats struct {
	ChannelName   string        `json:"channel_name"`
	Depth         int64         `json:"depth"`
	BackendDepth  int64         `json:"backend_depth"`
	InFlightCount int           `json:"in_flight_count"`
	DeferredCount int           `json:"deferred_count"`
	MessageCount  uint64        `json:"message_count"`
	RequeueCount  uint64        `json:"requeue_count"`
	TimeoutCount  uint64        `json:"timeout_count"`
	ClientCount   int           `json:"client_count"`
	Clients       []ClientStats `json:"clients"`
	Paused        bool          `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}

func NewChannelStats

func NewChannelStats(c *Channel, clients []ClientStats, clientCount int) ChannelStats

type Channels

type Channels []*Channel

func (Channels) Len

func (c Channels) Len() int

func (Channels) Swap

func (c Channels) Swap(i, j int)

type ChannelsByName

type ChannelsByName struct {
	Channels
}

func (ChannelsByName) Less

func (c ChannelsByName) Less(i, j int) bool

type Client

type Client interface {
	Stats() ClientStats
	IsProducer() bool
}

type ClientStats

type ClientStats struct {
	ClientID        string `json:"client_id"`
	Hostname        string `json:"hostname"`
	Version         string `json:"version"`
	RemoteAddress   string `json:"remote_address"`
	State           int32  `json:"state"`
	ReadyCount      int64  `json:"ready_count"`
	InFlightCount   int64  `json:"in_flight_count"`
	MessageCount    uint64 `json:"message_count"`
	FinishCount     uint64 `json:"finish_count"`
	RequeueCount    uint64 `json:"requeue_count"`
	ConnectTime     int64  `json:"connect_ts"`
	SampleRate      int32  `json:"sample_rate"`
	Deflate         bool   `json:"deflate"`
	Snappy          bool   `json:"snappy"`
	UserAgent       string `json:"user_agent"`
	Authed          bool   `json:"authed,omitempty"`
	AuthIdentity    string `json:"auth_identity,omitempty"`
	AuthIdentityURL string `json:"auth_identity_url,omitempty"`

	PubCounts []PubCount `json:"pub_counts,omitempty"`

	TLS                           bool   `json:"tls"`
	CipherSuite                   string `json:"tls_cipher_suite"`
	TLSVersion                    string `json:"tls_version"`
	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`
}

type Consumer

type Consumer interface {
	UnPause()
	Pause()
	Close() error
	TimedOutMessage()
	Stats() ClientStats
	Empty()
}

type IntervalTree

type IntervalTree struct {
	// contains filtered or unexported fields
}

func NewIntervalTree

func NewIntervalTree() *IntervalTree

func (*IntervalTree) AddOrMerge

func (IT *IntervalTree) AddOrMerge(inter *QueueInterval) *QueueInterval

func (*IntervalTree) DeleteInterval

func (self *IntervalTree) DeleteInterval(inter *QueueInterval)

func (*IntervalTree) Len

func (IT *IntervalTree) Len() int64

type Iterator

type Iterator interface {
	// Next returns true if the iterator contains subsequent elements
	// and advances its state to the next element if that is possible.
	Next() (ok bool)
	// Previous returns true if the iterator contains previous elements
	// and rewinds its state to the previous element if that is possible.
	Previous() (ok bool)
	// Key returns the current key.
	Key() interface{}
	// Value returns the current value.
	Value() interface{}
	// Seek reduces iterative seek costs for searching forward into the Skip List
	// by remarking the range of keys over which it has scanned before.  If the
	// requested key occurs prior to the point, the Skip List will start searching
	// as a safeguard.  It returns true if the key is within the known range of
	// the list.
	Seek(key interface{}) (ok bool)
	// Close this iterator to reap resources associated with it.  While not
	// strictly required, it will provide extra hints for the garbage collector.
	Close()
}

Iterator is an interface that you can use to iterate through the skip list (in its entirety or fragments). For an use example, see the documentation of SkipList.

Key and Value return the key and the value of the current node.

type Logger

type Logger lg.Logger

type Message

type Message struct {
	ID        MessageID
	Body      []byte
	Timestamp int64
	Attempts  uint16

	MovedSize int64

	//for backend queue
	BackendQueueEnd
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(id MessageID, body []byte) *Message

func (*Message) WriteTo

func (m *Message) WriteTo(w io.Writer) (int64, error)

type MessageDtType

type MessageDtType int

type MessageID

type MessageID [MsgIDLength]byte

type NSQD

type NSQD struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func New

func New(opts *Options) (*NSQD, error)

func (*NSQD) AddClient

func (n *NSQD) AddClient(clientID int64, client Client)

func (*NSQD) ChannelList

func (n *NSQD) ChannelList(topicName string) []string

func (*NSQD) DeleteExistingTopic

func (n *NSQD) DeleteExistingTopic(topicName string) error

DeleteExistingTopic removes a topic only if it exists

func (*NSQD) Exit

func (n *NSQD) Exit()

func (*NSQD) GetError

func (n *NSQD) GetError() error

func (*NSQD) GetExistingTopic

func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error)

GetExistingTopic gets a topic only if it exists

func (*NSQD) GetHealth

func (n *NSQD) GetHealth() string

func (*NSQD) GetProducerStats

func (n *NSQD) GetProducerStats() []ClientStats

func (*NSQD) GetStartTime

func (n *NSQD) GetStartTime() time.Time

func (*NSQD) GetStats

func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []TopicStats

func (*NSQD) GetTopic

func (n *NSQD) GetTopic(topicName string) *Topic

GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)

func (*NSQD) GetTopicMapCopy

func (n *NSQD) GetTopicMapCopy() []*Topic

func (*NSQD) GetTopicsAndChannelsBytes

func (n *NSQD) GetTopicsAndChannelsBytes() ([]byte, error)

func (*NSQD) HandleSyncTopicFromSlave

func (n *NSQD) HandleSyncTopicFromSlave(topicName string, totalMsgCnt, filenum, fileoffset, virtutaloffset, maxnum int64) ([]byte, error)

func (*NSQD) IsAuthEnabled

func (n *NSQD) IsAuthEnabled() bool

func (*NSQD) IsHealthy

func (n *NSQD) IsHealthy() bool

func (*NSQD) LoadMetadata

func (n *NSQD) LoadMetadata() error

func (*NSQD) Main

func (n *NSQD) Main() error

func (*NSQD) Notify

func (n *NSQD) Notify(v interface{})

func (*NSQD) PersistMetadata

func (n *NSQD) PersistMetadata() error

func (*NSQD) RealHTTPAddr

func (n *NSQD) RealHTTPAddr() *net.TCPAddr

func (*NSQD) RealHTTPSAddr

func (n *NSQD) RealHTTPSAddr() *net.TCPAddr

func (*NSQD) RealTCPAddr

func (n *NSQD) RealTCPAddr() *net.TCPAddr

func (*NSQD) RemoveClient

func (n *NSQD) RemoveClient(clientID int64)

func (*NSQD) SetHealth

func (n *NSQD) SetHealth(err error)

func (*NSQD) SlaveSyncChannel

func (n *NSQD) SlaveSyncChannel(topicName, channelName string) ([]byte, error)

func (*NSQD) SlaveSyncLoop

func (n *NSQD) SlaveSyncLoop()

func (*NSQD) TopicList

func (n *NSQD) TopicList() []string

type Options

type Options struct {
	// basic options
	ID        int64       `flag:"node-id" cfg:"id"`
	LogLevel  lg.LogLevel `flag:"log-level"`
	LogPrefix string      `flag:"log-prefix"`
	Logger    Logger

	TCPAddress               string        `flag:"tcp-address"`
	HTTPAddress              string        `flag:"http-address"`
	HTTPSAddress             string        `flag:"https-address"`
	BroadcastAddress         string        `flag:"broadcast-address"`
	NSQLookupdTCPAddresses   []string      `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"`
	AuthHTTPAddresses        []string      `flag:"auth-http-address" cfg:"auth_http_addresses"`
	HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"`
	HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout" cfg:"http_client_request_timeout"`
	NsqdMasterAddr           string        `flag:"nsqd-master-addr"`

	// diskqueue options
	DataPath           string        `flag:"data-path"`
	MemQueueSize       int64         `flag:"mem-queue-size"`
	MaxBytesPerFile    int64         `flag:"max-bytes-per-file"`
	SyncEvery          int64         `flag:"sync-every"`
	DtCheckBackTimeout time.Duration `flag:"dt-checkback-timeout"`
	SyncTimeout        time.Duration `flag:"sync-timeout"`
	LoopReadTimeout    time.Duration `flag:"loop-read-timeout"`

	QueueScanInterval        time.Duration
	QueueScanRefreshInterval time.Duration
	QueueScanSelectionCount  int
	QueueScanWorkerPoolMax   int
	QueueScanDirtyPercent    float64

	// msg and command options
	MsgTimeout    time.Duration `flag:"msg-timeout"`
	MaxMsgTimeout time.Duration `flag:"max-msg-timeout"`
	MaxMsgSize    int64         `flag:"max-msg-size"`
	MaxBodySize   int64         `flag:"max-body-size"`
	MaxReqTimeout time.Duration `flag:"max-req-timeout"`
	ClientTimeout time.Duration

	// client overridable configuration options
	MaxHeartbeatInterval   time.Duration `flag:"max-heartbeat-interval"`
	MaxRdyCount            int64         `flag:"max-rdy-count"`
	MaxOutputBufferSize    int64         `flag:"max-output-buffer-size"`
	MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"`
	MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"`
	MaxConfirmWin          int64         `flag:"max-confirm-win"`
	OutputBufferTimeout    time.Duration `flag:"output-buffer-timeout"`
	MaxChannelConsumers    int           `flag:"max-channel-consumers"`

	// statsd integration
	StatsdAddress       string        `flag:"statsd-address"`
	StatsdPrefix        string        `flag:"statsd-prefix"`
	StatsdInterval      time.Duration `flag:"statsd-interval"`
	StatsdMemStats      bool          `flag:"statsd-mem-stats"`
	StatsdUDPPacketSize int           `flag:"statsd-udp-packet-size"`

	// e2e message latency
	E2EProcessingLatencyWindowTime  time.Duration `flag:"e2e-processing-latency-window-time"`
	E2EProcessingLatencyPercentiles []float64     `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"`

	// TLS config
	TLSCert             string `flag:"tls-cert"`
	TLSKey              string `flag:"tls-key"`
	TLSClientAuthPolicy string `flag:"tls-client-auth-policy"`
	TLSRootCAFile       string `flag:"tls-root-ca-file"`
	TLSRequired         int    `flag:"tls-required"`
	TLSMinVersion       uint16 `flag:"tls-min-version"`

	// compression
	DeflateEnabled  bool `flag:"deflate"`
	MaxDeflateLevel int  `flag:"max-deflate-level"`
	SnappyEnabled   bool `flag:"snappy"`
}

func NewOptions

func NewOptions() *Options

type Ordered

type Ordered interface {
	LessThan(other Ordered) bool
}

Ordered is an interface which can be linearly ordered by the LessThan method, whereby this instance is deemed to be less than other. Additionally, Ordered instances should behave properly when compared using == and !=.

type PubCount

type PubCount struct {
	Topic string `json:"topic"`
	Count uint64 `json:"count"`
}

type QueueInterval

type QueueInterval struct {
	// contains filtered or unexported fields
}

func (*QueueInterval) End

func (QI *QueueInterval) End() int64

func (*QueueInterval) EndCnt

func (QI *QueueInterval) EndCnt() int64

func (*QueueInterval) HighAtDimension

func (QI *QueueInterval) HighAtDimension(dim uint64) int64

func (*QueueInterval) ID

func (QI *QueueInterval) ID() uint64

the augmentedtree use the low and the id to determin if the interval is the duplicate so here we use the end as the id of segment

func (*QueueInterval) LowAtDimension

func (QI *QueueInterval) LowAtDimension(dim uint64) int64

func (*QueueInterval) OverlapsAtDimension

func (QI *QueueInterval) OverlapsAtDimension(inter augmentedtree.Interval, dim uint64) bool

func (*QueueInterval) Start

func (QI *QueueInterval) Start() int64

type ReadResult

type ReadResult struct {
	Data []byte
	Err  error
	// contains filtered or unexported fields
}

ReadResult represents the result for TryReadOne()

type Response

type Response struct {
	// contains filtered or unexported fields
}

type Set

type Set struct {
	// contains filtered or unexported fields
}

Set is an ordered set data structure.

Its elements must implement the Ordered interface. It uses a SkipList for storage, and it gives you similar performance guarantees.

To iterate over a set (where s is a *Set):

for i := s.Iterator(); i.Next(); {
	// do something with i.Key().
	// i.Value() will be nil.
}

func NewCustomSet

func NewCustomSet(lessThan func(l, r interface{}) bool) *Set

NewCustomSet returns a new Set that will use lessThan as the comparison function. lessThan should define a linear order on elements you intend to use with the Set.

func NewIntSet

func NewIntSet() *Set

NewIntSet returns a new Set that accepts int elements.

func NewSet

func NewSet() *Set

NewSet returns a new Set.

func NewStringSet

func NewStringSet() *Set

NewStringSet returns a new Set that accepts string elements.

func (*Set) Add

func (s *Set) Add(key interface{})

Add adds key to s.

func (*Set) Contains

func (s *Set) Contains(key interface{}) bool

Contains returns true if key is present in s.

func (*Set) GetMaxLevel

func (s *Set) GetMaxLevel() int

GetMaxLevel returns MaxLevel fo the underlying skip list.

func (*Set) Iterator

func (s *Set) Iterator() Iterator

func (*Set) Len

func (s *Set) Len() int

Len returns the length of the set.

func (*Set) Range

func (s *Set) Range(from, to interface{}) Iterator

Range returns an iterator that will go through all the elements of the set that are greater or equal than from, but less than to.

func (*Set) Remove

func (s *Set) Remove(key interface{}) (ok bool)

Remove tries to remove key from the set. It returns true if key was present.

func (*Set) SetMaxLevel

func (s *Set) SetMaxLevel(newMaxLevel int)

SetMaxLevel sets MaxLevel in the underlying skip list.

type SkipList

type SkipList struct {

	// MaxLevel determines how many items the SkipList can store
	// efficiently (2^MaxLevel).
	//
	// It is safe to increase MaxLevel to accomodate more
	// elements. If you decrease MaxLevel and the skip list
	// already contains nodes on higer levels, the effective
	// MaxLevel will be the greater of the new MaxLevel and the
	// level of the highest node.
	//
	// A SkipList with MaxLevel equal to 0 is equivalent to a
	// standard linked list and will not have any of the nice
	// properties of skip lists (probably not what you want).
	MaxLevel int
	// contains filtered or unexported fields
}

A SkipList is a map-like data structure that maintains an ordered collection of key-value pairs. Insertion, lookup, and deletion are all O(log n) operations. A SkipList can efficiently store up to 2^MaxLevel items.

To iterate over a skip list (where s is a *SkipList):

for i := s.Iterator(); i.Next(); {
	// do something with i.Key() and i.Value()
}

func NewCustomMap

func NewCustomMap(lessThan func(l, r interface{}) bool) *SkipList

NewCustomMap returns a new SkipList that will use lessThan as the comparison function. lessThan should define a linear order on keys you intend to use with the SkipList.

func NewIntMap

func NewIntMap() *SkipList

NewIntKey returns a SkipList that accepts int keys.

func NewSkipList

func NewSkipList() *SkipList

New returns a new SkipList.

Its keys must implement the Ordered interface.

func NewStringMap

func NewStringMap() *SkipList

NewStringMap returns a SkipList that accepts string keys.

func (*SkipList) Delete

func (s *SkipList) Delete(key interface{}) (value interface{}, ok bool)

Delete removes the node with the given key.

It returns the old value and whether the node was present.

func (*SkipList) Get

func (s *SkipList) Get(key interface{}) (value interface{}, ok bool)

Get returns the value associated with key from s (nil if the key is not present in s). The second return value is true when the key is present.

func (*SkipList) GetGreaterOrEqual

func (s *SkipList) GetGreaterOrEqual(min interface{}) (actualKey, value interface{}, ok bool)

GetGreaterOrEqual finds the node whose key is greater than or equal to min. It returns its value, its actual key, and whether such a node is present in the skip list.

func (*SkipList) Iterator

func (s *SkipList) Iterator() Iterator

Iterator returns an Iterator that will go through all elements s.

func (*SkipList) Len

func (s *SkipList) Len() int

Len returns the length of s.

func (*SkipList) Range

func (s *SkipList) Range(from, to interface{}) Iterator

Range returns an iterator that will go through all the elements of the skip list that are greater or equal than from, but less than to.

func (*SkipList) Seek

func (s *SkipList) Seek(key interface{}) Iterator

Seek returns a bidirectional iterator starting with the first element whose key is greater or equal to key; otherwise, a nil iterator is returned.

func (*SkipList) SeekToFirst

func (s *SkipList) SeekToFirst() Iterator

SeekToFirst returns a bidirectional iterator starting from the first element in the list if the list is populated; otherwise, a nil iterator is returned.

func (*SkipList) SeekToLast

func (s *SkipList) SeekToLast() Iterator

SeekToLast returns a bidirectional iterator starting from the last element in the list if the list is populated; otherwise, a nil iterator is returned.

func (*SkipList) Set

func (s *SkipList) Set(key, value interface{})

Sets set the value associated with key in s.

type Slave

type Slave struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSlave

func NewSlave(masterAddr string, slaveToMasterEvent chan struct{}, ctx *context) *Slave

func (*Slave) CheckConn

func (s *Slave) CheckConn() error

func (*Slave) Close

func (s *Slave) Close()

func (*Slave) ConnToMaster

func (s *Slave) ConnToMaster() error

func (*Slave) FilterResponse

func (s *Slave) FilterResponse()

func (*Slave) HandleResponse

func (s *Slave) HandleResponse()

func (*Slave) Sync

func (s *Slave) Sync() error

type Topic

type Topic struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic

Topic constructor

func (*Topic) AggregateChannelE2eProcessingLatency

func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile

func (*Topic) Close

func (t *Topic) Close() error

Close persists all outstanding topic data and closes all its channels

func (*Topic) CommitDtPreMsg

func (t *Topic) CommitDtPreMsg(msgId MessageID)

func (*Topic) Delete

func (t *Topic) Delete() error

Delete empties the topic and all its channels and closes

func (*Topic) DeleteExistingChannel

func (t *Topic) DeleteExistingChannel(channelName string) error

DeleteExistingChannel removes a channel from the topic only if it exists

func (*Topic) Empty

func (t *Topic) Empty() error

func (*Topic) Exiting

func (t *Topic) Exiting() bool

Exiting returns a boolean indicating if this topic is closed/exiting

func (*Topic) FlushTopicAndChannels

func (t *Topic) FlushTopicAndChannels() error

func (*Topic) GenerateID

func (t *Topic) GenerateID() MessageID

func (*Topic) GetChannel

func (t *Topic) GetChannel(channelName string) *Channel

GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic

func (*Topic) GetChannelMapCopy

func (t *Topic) GetChannelMapCopy() map[string]*Channel

func (*Topic) GetDtPreMsgByCmtMsg

func (t *Topic) GetDtPreMsgByCmtMsg(msgId MessageID) (*Message, error)

func (*Topic) GetExistingChannel

func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)

func (*Topic) HandleSyncTopicFromSlave

func (t *Topic) HandleSyncTopicFromSlave(totalMsgCnt, filenum, fileoffset, virtutaloffset, maxnum int64) ([]byte, error)

func (*Topic) IsPaused

func (t *Topic) IsPaused() bool

func (*Topic) Pause

func (t *Topic) Pause() error

func (*Topic) PutMessage

func (t *Topic) PutMessage(m *Message) error

func (*Topic) PutMessages

func (t *Topic) PutMessages(msgs []*Message) error

PutMessages writes multiple Messages to the queue

func (*Topic) Start

func (t *Topic) Start()

func (*Topic) UnPause

func (t *Topic) UnPause() error

func (*Topic) UpdatedBackendQueueEndCallback

func (t *Topic) UpdatedBackendQueueEndCallback()

type TopicStats

type TopicStats struct {
	TopicName    string         `json:"topic_name"`
	Channels     []ChannelStats `json:"channels"`
	Depth        int64          `json:"depth"`
	BackendDepth int64          `json:"backend_depth"`
	MessageCount uint64         `json:"message_count"`
	MessageBytes uint64         `json:"message_bytes"`
	Paused       bool           `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}

func NewTopicStats

func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats

type Topics

type Topics []*Topic

func (Topics) Len

func (t Topics) Len() int

func (Topics) Swap

func (t Topics) Swap(i, j int)

type TopicsByName

type TopicsByName struct {
	Topics
}

func (TopicsByName) Less

func (t TopicsByName) Less(i, j int) bool

type Uint64Slice

type Uint64Slice []uint64

func (Uint64Slice) Len

func (s Uint64Slice) Len() int

func (Uint64Slice) Less

func (s Uint64Slice) Less(i, j int) bool

func (Uint64Slice) Swap

func (s Uint64Slice) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL