nsq

package
v0.2.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2012 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FrameTypeResponse int32 = 0
	FrameTypeError    int32 = 1
	FrameTypeMessage  int32 = 2
)
View Source
const (
	StateInit = iota
	StateDisconnected
	StateConnected
	StateSubscribed
	StateClosing // close has started. responses are ok, but no new messages will be sent
)
View Source
const DefaultClientTimeout = 60 * time.Second
View Source
const MaxReadyCount = 2500
View Source
const MsgIdLength = 16
View Source
const VERSION = "0.2.4"

Variables

View Source
var ErrAlreadyConnected = errors.New("already connected")
View Source
var MagicV1 = []byte("  V1")
View Source
var MagicV2 = []byte("  V2")

Functions

func ApiRequest

func ApiRequest(endpoint string) (*simplejson.Json, error)

func Frame

func Frame(w io.Writer, frameType int32, data []byte) error

func IsValidChannelName

func IsValidChannelName(name string) bool

func IsValidTopicName

func IsValidTopicName(name string) bool

func ReadMagic

func ReadMagic(r io.Reader) (int32, error)

func ReadResponse

func ReadResponse(r io.Reader) ([]byte, error)

func SendCommand

func SendCommand(w io.Writer, cmd *Command) error

func SendResponse

func SendResponse(w io.Writer, data []byte) (int, error)

func UnpackResponse

func UnpackResponse(response []byte) (int32, []byte, error)

UnpackResponse is a helper function that takes serialized data (as []byte), unpacks and returns a triplicate of:

frame type, data ([]byte), error

Types

type AsyncHandler

type AsyncHandler interface {
	HandleMessage(message *Message, responseChannel chan *FinishedMessage)
}

an async handler that must send a &FinishedMessage{messageID, requeueDelay, true|false} onto responseChannel to indicate that a message has been finished. This is usefull if you want to batch work together and delay response that processing is complete

type ClientErr

type ClientErr struct {
	Err  string
	Desc string
}

func NewClientErr

func NewClientErr(err string, description string) *ClientErr

func (*ClientErr) Description

func (e *ClientErr) Description() string

func (*ClientErr) Error

func (e *ClientErr) Error() string

type Command

type Command struct {
	Name   []byte
	Params [][]byte
	Body   []byte
}

func Announce

func Announce(topic string, channel string, port int, ips []string) *Command

Announce creates a new Command to announce the existence of a given topic and/or channel. NOTE: if channel == "." then it is considered n/a

func Finish

func Finish(id []byte) *Command

Finish creates a new Command to indiciate that a given message (by id) has been processed successfully

func Identify

func Identify(version string, tcpPort int, httpPort int, address string) *Command

Identify is the first message sent to the Lookupd and provides information about the client

func Nop

func Nop() *Command

func Ping

func Ping() *Command

Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client

func Publish

func Publish(topic string, body []byte) *Command

Publish creates a new Command to write a message to a given topic

func Ready

func Ready(count int) *Command

Ready creates a new Command to specify the number of messages a client is willing to receive

func Register

func Register(topic string, channel string) *Command

REGISTER a topic/channel for this nsqd

func Requeue

func Requeue(id []byte, timeoutMs int) *Command

Requeue creats a new Command to indicate that a given message (by id) should be requeued after the given timeout (in ms) NOTE: a timeout of 0 indicates immediate requeue

func StartClose

func StartClose() *Command

StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected to ACK after which it can finish pending messages and close the connection

func Subscribe

func Subscribe(topic string, channel string, shortIdentifier string, longIdentifier string) *Command

Subscribe creates a new Command to subscribe to the given topic/channel

func UnRegister

func UnRegister(topic string, channel string) *Command

UNREGISTER removes a topic/channel from this nsqd

func (*Command) String

func (c *Command) String() string

type FailedMessageLogger

type FailedMessageLogger interface {
	LogFailedMessage(message *Message)
}

type FinishedMessage

type FinishedMessage struct {
	Id             []byte
	RequeueDelayMs int
	Success        bool
}

type Handler

type Handler interface {
	HandleMessage(message *Message) error
}

a syncronous handler that returns an error (or nil to indicate success)

type LookupPeer

type LookupPeer struct {
	PeerInfo PeerInfo
	// contains filtered or unexported fields
}

LookupPeer is a low-level type for connecting/reading/writing to nsqlookupd

func NewLookupPeer

func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer

NewLookupPeer creates a new LookupPeer instance

func (*LookupPeer) Close

func (lp *LookupPeer) Close() error

func (*LookupPeer) Command

func (lp *LookupPeer) Command(cmd *Command) ([]byte, error)

func (*LookupPeer) Connect

func (lp *LookupPeer) Connect() error

func (*LookupPeer) Read

func (lp *LookupPeer) Read(data []byte) (int, error)

func (*LookupPeer) String

func (lp *LookupPeer) String() string

func (*LookupPeer) Write

func (lp *LookupPeer) Write(data []byte) (int, error)

type Message

type Message struct {
	Id        []byte
	Body      []byte
	Timestamp int64
	Attempts  uint16
}

Message is the fundamental data type containing the id, body, and meta-data

func DecodeMessage

func DecodeMessage(byteBuf []byte) (*Message, error)

DecodeMessage deseralizes data (as []byte) and creates/returns a pointer to a new Message

func NewMessage

func NewMessage(id []byte, body []byte) *Message

NewMessage creates a Message, initializes some meta-data, and returns a pointer

func (*Message) Encode

func (m *Message) Encode(w io.Writer) error

Encode serializes the message into the supplied writer

func (*Message) EncodeBytes

func (m *Message) EncodeBytes() ([]byte, error)

EncodeBytes serializes the message into a new []byte

type PeerInfo

type PeerInfo struct {
	TcpPort  int    `json:"tcp_port"`
	HttpPort int    `json:"http_port"`
	Version  string `json:"version"`
	Address  string `json:"address"`
}

type Protocol

type Protocol interface {
	IOLoop(conn net.Conn) error
}

describes the basic behavior of any protocol in the system

type Reader

type Reader struct {
	TopicName           string        // name of topic to subscribe to
	ChannelName         string        // name of channel to subscribe to
	LookupdPollInterval time.Duration // seconds between polling lookupd's (+/- random 1/10th this value)
	MaxAttemptCount     uint16
	DefaultRequeueDelay time.Duration
	MaxRequeueDelay     time.Duration
	VerboseLogging      bool
	ShortIdentifier     string // an identifier to send to nsqd when connecting (defaults: short hostname)
	LongIdentifier      string // an identifier to send to nsqd when connecting (defaults: long hostname)
	ReadTimeout         time.Duration
	WriteTimeout        time.Duration
	MessagesReceived    uint64
	MessagesFinished    uint64
	MessagesRequeued    uint64
	ExitChan            chan int
	// contains filtered or unexported fields
}

func NewReader

func NewReader(topic string, channel string) (*Reader, error)

func (*Reader) AddAsyncHandler

func (q *Reader) AddAsyncHandler(handler AsyncHandler)

this starts an async handler on the Reader it's ok to start more than one handler simultaneously

func (*Reader) AddHandler

func (q *Reader) AddHandler(handler Handler)

this starts a handler on the Reader it's ok to start more than one handler simultaneously

func (*Reader) ConnectToLookupd

func (q *Reader) ConnectToLookupd(addr string) error

func (*Reader) ConnectToNSQ

func (q *Reader) ConnectToNSQ(addr string) error

func (*Reader) ConnectionMaxInFlight

func (q *Reader) ConnectionMaxInFlight() int

calculate the max in flight count per connection this may change dynamically based on the number of connections

func (*Reader) IsStarved

func (q *Reader) IsStarved() bool

IsStarved indicates whether any connections for this reader are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)

func (*Reader) MaxInFlight

func (q *Reader) MaxInFlight() int

max number of messages to allow in-flight at a time

func (*Reader) SetMaxInFlight

func (q *Reader) SetMaxInFlight(maxInFlight int)

update the reader ready state, updating each connection as appropriate

func (*Reader) Stop

func (q *Reader) Stop()

stops a Reader gracefully

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL