nsqd

package
v0.0.0-...-e52ff56 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2017 License: MIT Imports: 47 Imported by: 0

README

nsqd

nsqd is the daemon that receives, queues, and delivers messages to clients.

Read the docs

Documentation

Index

Constants

View Source
const (
	TLSNotRequired = iota
	TLSRequiredExceptHTTP
	TLSRequired
)
View Source
const (
	MsgIDLength = 16
)

Variables

View Source
var ErrIDBackwards = errors.New("ID went backward")
View Source
var ErrSequenceExpired = errors.New("sequence expired")
View Source
var ErrTimeBackwards = errors.New("time has gone backwards")

Functions

This section is empty.

Types

type BackendQueue

type BackendQueue interface {
	Put([]byte) error
	ReadChan() chan []byte // this is expected to be an *unbuffered* channel
	Close() error
	Delete() error
	Depth() int64
	Empty() error
}

BackendQueue represents the behavior for the secondary message storage system

type Channel

type Channel struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)

There can be multiple channels per topic, each with there own unique set of subscribers (clients).

Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.

func NewChannel

func NewChannel(topicName string, channelName string, ctx *context,
	deleteCallback func(*Channel)) *Channel

NewChannel creates a new instance of the Channel type and returns a pointer 创建一个新的Channel

func (*Channel) AddClient

func (c *Channel) AddClient(clientID int64, client Consumer)

AddClient adds a client to the Channel's client list 添加客户端到这个Channel

func (*Channel) Close

func (c *Channel) Close() error

Close cleanly closes the Channel 关闭Channel

func (*Channel) Delete

func (c *Channel) Delete() error

Delete empties the channel and closes 删除Channel

func (*Channel) Depth

func (c *Channel) Depth() int64

Deepth函数返回内存,磁盘以及正在投递的消息数量之和,也就是尚未投递成功的消息数。

func (*Channel) Empty

func (c *Channel) Empty() error

清空Channel

func (*Channel) Exiting

func (c *Channel) Exiting() bool

Exiting returns a boolean indicating if this channel is closed/exiting 标记Channel正在退出

func (*Channel) FinishMessage

func (c *Channel) FinishMessage(clientID int64, id MessageID) error

FinishMessage successfully discards an in-flight message 消费者发送FIN,表明消息已经被接收并正确处理。 FinishMessage分别调用popInFlightMessage和removeFromInFlightPQ将消息从inFlightMessages和inFlightPQ中删除。最后,统计该消息的投递情况。

func (*Channel) IsPaused

func (c *Channel) IsPaused() bool

判断是否为暂停状态

func (*Channel) Pause

func (c *Channel) Pause() error

暂停操作

func (*Channel) PutMessage

func (c *Channel) PutMessage(m *Message) error

PutMessage writes a Message to the queue 写消息写入队列

func (*Channel) RemoveClient

func (c *Channel) RemoveClient(clientID int64)

RemoveClient removes a client from the Channel's client list 从这个Channel删除客户端

func (*Channel) RequeueMessage

func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error

RequeueMessage requeues a message based on `time.Duration`, ie:

`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout

and requeue a message (aka "deferred requeue")

客户端发送REQ,表明消息投递失败,需要再次被投递。 Channel在RequeueMessage函数对消息投递失败进行处理。该函数将消息从inFlightMessages和inFlightPQ中删除,随后进行重新投递。 发送REQ时有一个附加参数timeout,该值为0时表示立即重新投递,大于0时表示等待timeout时间之后投递。

func (*Channel) StartDeferredTimeout

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error

如果timeout大于0,则调用StartDeferredTimeout进行延迟投递。首先计算延迟投递的时间点, 然后调用pushDeferredMessage将消息加入deferredMessage字典,最后将消息放入deferredPQ队列。 延迟投递的消息会被专门的worker扫描并在延迟投递的时间点后进行投递。需要注意的是,立即重新投递的消息不会进入deferredPQ队列。

func (*Channel) StartInFlightTimeout

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error

填充消息的消费者ID、投送时间、优先级,然后调用pushInFlightMessage函数将消息放入inFlightMessages字典中。最后调用addToInFlightPQ将消息放入inFlightPQ队列中。 至此,消息投递流程完成,接下来需要等待消费者对投送结果的反馈。消费者通过发送FIN、REQ、TOUCH来回复对消息的处理结果。

func (*Channel) TouchMessage

func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error

TouchMessage resets the timeout for an in-flight message 消费者发送TOUCH,表明该消息的超时值需要被重置。 从inFlightPQ中取出消息,设置新的超时值后重新放入队列,新的超时值由当前时间、客户端通过IDENTIFY设置的超时值、配置中允许的最大超时值MaxMsgTimeout共同决定。

func (*Channel) UnPause

func (c *Channel) UnPause() error

重启操作

type ChannelStats

type ChannelStats struct {
	ChannelName   string        `json:"channel_name"`
	Depth         int64         `json:"depth"`
	BackendDepth  int64         `json:"backend_depth"`
	InFlightCount int           `json:"in_flight_count"`
	DeferredCount int           `json:"deferred_count"`
	MessageCount  uint64        `json:"message_count"`
	RequeueCount  uint64        `json:"requeue_count"`
	TimeoutCount  uint64        `json:"timeout_count"`
	Clients       []ClientStats `json:"clients"`
	Paused        bool          `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}

func NewChannelStats

func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats

type Channels

type Channels []*Channel

func (Channels) Len

func (c Channels) Len() int

func (Channels) Swap

func (c Channels) Swap(i, j int)

type ChannelsByName

type ChannelsByName struct {
	Channels
}

func (ChannelsByName) Less

func (c ChannelsByName) Less(i, j int) bool

type ClientStats

type ClientStats struct {
	// TODO: deprecated, remove in 1.0
	Name string `json:"name"`

	ClientID        string `json:"client_id"`
	Hostname        string `json:"hostname"`
	Version         string `json:"version"`
	RemoteAddress   string `json:"remote_address"`
	State           int32  `json:"state"`
	ReadyCount      int64  `json:"ready_count"`
	InFlightCount   int64  `json:"in_flight_count"`
	MessageCount    uint64 `json:"message_count"`
	FinishCount     uint64 `json:"finish_count"`
	RequeueCount    uint64 `json:"requeue_count"`
	ConnectTime     int64  `json:"connect_ts"`
	SampleRate      int32  `json:"sample_rate"`
	Deflate         bool   `json:"deflate"`
	Snappy          bool   `json:"snappy"`
	UserAgent       string `json:"user_agent"`
	Authed          bool   `json:"authed,omitempty"`
	AuthIdentity    string `json:"auth_identity,omitempty"`
	AuthIdentityURL string `json:"auth_identity_url,omitempty"`

	TLS                           bool   `json:"tls"`
	CipherSuite                   string `json:"tls_cipher_suite"`
	TLSVersion                    string `json:"tls_version"`
	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`
}

客户端状态信息

type Consumer

type Consumer interface {
	UnPause()
	Pause()
	Close() error
	TimedOutMessage()
	Stats() ClientStats
	Empty()
}

Consumer接口,定义消费者行为

type Message

type Message struct {
	ID        MessageID // 16字节的MessageID
	Body      []byte
	Timestamp int64  // nanosecond timestamp(创建的时间)
	Attempts  uint16 // (uint16)  2-byte attempts
	// contains filtered or unexported fields
}

定义Message结构

func NewMessage

func NewMessage(id MessageID, body []byte) *Message

创建一个Message

func (*Message) WriteTo

func (m *Message) WriteTo(w io.Writer) (int64, error)

写入实现了io.Writer接口的对象

type MessageID

type MessageID [MsgIDLength]byte

type NSQD

type NSQD struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func New

func New(opts *Options) *NSQD

func (*NSQD) DeleteExistingTopic

func (n *NSQD) DeleteExistingTopic(topicName string) error

DeleteExistingTopic removes a topic only if it exists 通过名字删除Topic对象

func (*NSQD) Exit

func (n *NSQD) Exit()

实例退出处理

func (*NSQD) GetError

func (n *NSQD) GetError() error

获取最近的错误值

func (*NSQD) GetExistingTopic

func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error)

GetExistingTopic gets a topic only if it exists 通过名字获取Topic对象

func (*NSQD) GetHealth

func (n *NSQD) GetHealth() string

func (*NSQD) GetStartTime

func (n *NSQD) GetStartTime() time.Time

获取实例启动的时间

func (*NSQD) GetStats

func (n *NSQD) GetStats() []TopicStats

返回全部的topic当前的信息

func (*NSQD) GetTopic

func (n *NSQD) GetTopic(topicName string) *Topic

GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new) 获取指定的topic的实例,如果不存在就创建一个

func (*NSQD) IsAuthEnabled

func (n *NSQD) IsAuthEnabled() bool

func (*NSQD) IsHealthy

func (n *NSQD) IsHealthy() bool

判断实例是否健康

func (*NSQD) LoadMetadata

func (n *NSQD) LoadMetadata()

载入原数据

func (*NSQD) Main

func (n *NSQD) Main()

主业务函数

func (*NSQD) Notify

func (n *NSQD) Notify(v interface{})

func (*NSQD) PersistMetadata

func (n *NSQD) PersistMetadata() error

持久化元数据

func (*NSQD) RealHTTPAddr

func (n *NSQD) RealHTTPAddr() *net.TCPAddr

获取HTTP监听的IP和端口

func (*NSQD) RealHTTPSAddr

func (n *NSQD) RealHTTPSAddr() *net.TCPAddr

获取HTTPS监听的IP和端口

func (*NSQD) RealTCPAddr

func (n *NSQD) RealTCPAddr() *net.TCPAddr

获取TCP监听的IP和端口

func (*NSQD) SetHealth

func (n *NSQD) SetHealth(err error)

存储最近的错误值

type Options

type Options struct {
	// basic options
	ID                     int64    `flag:"worker-id" cfg:"id"` // 进程的唯一码(默认是主机名的哈希值%1024)
	Verbose                bool     `flag:"verbose"`            // 详细的日志输出
	TCPAddress             string   `flag:"tcp-address"`
	HTTPAddress            string   `flag:"http-address"`
	HTTPSAddress           string   `flag:"https-address"`
	BroadcastAddress       string   `flag:"broadcast-address"`                                  // 通过 lookupd  注册的地址(默认名是 OS)
	NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"` // lookupd的地址
	AuthHTTPAddresses      []string `flag:"auth-http-address" cfg:"auth_http_addresses"`        // 认证服务地址

	// diskqueue options
	DataPath        string        `flag:"data-path"`          // 持久化数据的路径
	MemQueueSize    int64         `flag:"mem-queue-size"`     // Message Channel的最大缓冲
	MaxBytesPerFile int64         `flag:"max-bytes-per-file"` // 每个文件最大的字节数
	SyncEvery       int64         `flag:"sync-every"`         // 磁盘队列 fsync 的消息数
	SyncTimeout     time.Duration `flag:"sync-timeout"`       // 每个磁盘队列 fsync 平均耗时

	QueueScanInterval        time.Duration // workTicker定时器时间()
	QueueScanRefreshInterval time.Duration // refreshTicker定时器时间(更新Channel列表,并重新分配worker)
	QueueScanSelectionCount  int           // 每次扫描最多选择的Channel数量
	QueueScanWorkerPoolMax   int           // queueScanWorker的goroutines的最大数量
	QueueScanDirtyPercent    float64       // 消息投递的比例

	// msg and command options
	MsgTimeout    time.Duration `flag:"msg-timeout" arg:"1ms"`                                         // 自动重新队列消息前需要等待的时间
	MaxMsgTimeout time.Duration `flag:"max-msg-timeout"`                                               // 消息超时的最大时间间隔
	MaxMsgSize    int64         `flag:"max-msg-size" deprecated:"max-message-size" cfg:"max_msg_size"` // 消息的最大长度
	MaxBodySize   int64         `flag:"max-body-size"`                                                 // 消息体的最大长度
	MaxReqTimeout time.Duration `flag:"max-req-timeout"`                                               //  消息重新排队的超时时间
	ClientTimeout time.Duration

	// client overridable configuration options
	MaxHeartbeatInterval   time.Duration `flag:"max-heartbeat-interval"`    // 心跳超时
	MaxRdyCount            int64         `flag:"max-rdy-count"`             // 允许客户端一次最多接收的消息数量
	MaxOutputBufferSize    int64         `flag:"max-output-buffer-size"`    // tcp writer对象的缓存
	MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` // 在 flushing 到客户端前,最长的配置时间间隔。

	// statsd integration
	StatsdAddress  string        `flag:"statsd-address"`           // 统计进程的 UDP <addr>:<port>
	StatsdPrefix   string        `flag:"statsd-prefix"`            // 发送给统计keys 的前缀(%s for host replacement)
	StatsdInterval time.Duration `flag:"statsd-interval" arg:"1s"` // 从推送到统计的时间间隔
	StatsdMemStats bool          `flag:"statsd-mem-stats"`         // 切换发送内存和 GC 统计数据

	// e2e message latency
	E2EProcessingLatencyWindowTime  time.Duration `flag:"e2e-processing-latency-window-time"`                                         // 算这段时间里,点对点时间延迟
	E2EProcessingLatencyPercentiles []float64     `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"` // 消息处理时间的百分比(通过逗号可以多次指定,默认为 none)

	// TLS config
	TLSCert             string `flag:"tls-cert"`               // 证书文件路径
	TLSKey              string `flag:"tls-key"`                // 私钥路径文件
	TLSClientAuthPolicy string `flag:"tls-client-auth-policy"` // 客户端证书授权策略 ('require' or 'require-verify')
	TLSRootCAFile       string `flag:"tls-root-ca-file"`       // 私钥证书授权 PEM 路径
	TLSRequired         int    `flag:"tls-required"`           // 客户端连接需求 TLS
	TLSMinVersion       uint16 `flag:"tls-min-version"`        //  ???

	// compression
	DeflateEnabled  bool `flag:"deflate"`           // 运行协商压缩特性(客户端压缩)
	MaxDeflateLevel int  `flag:"max-deflate-level"` // 最大的压缩比率等级(> values == > nsqd CPU usage)
	SnappyEnabled   bool `flag:"snappy"`            // 打开快速选项 (客户端压缩)

	Logger logger
}

参数信息

func NewOptions

func NewOptions() *Options

type Topic

type Topic struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic

Topic constructor 创建Topic对象(主题名字、上下文对象(NSQD)、删除主题回调对象)

func (*Topic) AggregateChannelE2eProcessingLatency

func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile

func (*Topic) Close

func (t *Topic) Close() error

Close persists all outstanding topic data and closes all its channels 关闭Topic

func (*Topic) Delete

func (t *Topic) Delete() error

Delete empties the topic and all its channels and closes 删除Topic

func (*Topic) DeleteExistingChannel

func (t *Topic) DeleteExistingChannel(channelName string) error

DeleteExistingChannel removes a channel from the topic only if it exists 删除名为channelName的Channel

func (*Topic) Depth

func (t *Topic) Depth() int64

深度: 内存中的消息个数 + (BackendQueue)队列中的数据

func (*Topic) Empty

func (t *Topic) Empty() error

清理内存和磁盘中的数据

func (*Topic) Exiting

func (t *Topic) Exiting() bool

Exiting returns a boolean indicating if this topic is closed/exiting 将exitFlag标记为1,说明需要正在退出

func (*Topic) GetChannel

func (t *Topic) GetChannel(channelName string) *Channel

GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic

func (*Topic) GetExistingChannel

func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)

获取名为channelName的Channel对象

func (*Topic) IsPaused

func (t *Topic) IsPaused() bool

判断是否为暂停状态

func (*Topic) Pause

func (t *Topic) Pause() error

暂停操作

func (*Topic) PutMessage

func (t *Topic) PutMessage(m *Message) error

PutMessage writes a Message to the queue 发送一条消息到队列

func (*Topic) PutMessages

func (t *Topic) PutMessages(msgs []*Message) error

PutMessages writes multiple Messages to the queue 发送多条消息

func (*Topic) UnPause

func (t *Topic) UnPause() error

重启操作

type TopicStats

type TopicStats struct {
	TopicName    string         `json:"topic_name"`
	Channels     []ChannelStats `json:"channels"`
	Depth        int64          `json:"depth"`
	BackendDepth int64          `json:"backend_depth"`
	MessageCount uint64         `json:"message_count"`
	Paused       bool           `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}

func NewTopicStats

func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats

type Topics

type Topics []*Topic

func (Topics) Len

func (t Topics) Len() int

func (Topics) Swap

func (t Topics) Swap(i, j int)

type TopicsByName

type TopicsByName struct {
	Topics
}

func (TopicsByName) Less

func (t TopicsByName) Less(i, j int) bool

type Uint64Slice

type Uint64Slice []uint64

func (Uint64Slice) Len

func (s Uint64Slice) Len() int

func (Uint64Slice) Less

func (s Uint64Slice) Less(i, j int) bool

func (Uint64Slice) Swap

func (s Uint64Slice) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL