Documentation ¶
Index ¶
- Constants
- Variables
- type Conn
- type ConnManager
- type ID
- type IDGenerator
- type MagicType
- type Message
- type MessageHandler
- type MessageHandlerManager
- type MessageHeaderPool
- type MessagePool
- type MockConn
- func (c *MockConn) Close() error
- func (c *MockConn) Closed() <-chan struct{}
- func (c *MockConn) ID() *ID
- func (c *MockConn) ReadMsg() (*Message, string, error)
- func (c *MockConn) String() string
- func (c *MockConn) WriteBin(binData []byte) error
- func (c *MockConn) WriteMsg(m *Message) error
- func (c *MockConn) WriteTxtMsg(content string) error
- type MockHandler
- type NetConn
- func (c *NetConn) Close() error
- func (c *NetConn) Closed() <-chan struct{}
- func (c *NetConn) ID() *ID
- func (c *NetConn) ReadMsg() (*Message, string, error)
- func (c *NetConn) String() string
- func (c *NetConn) WriteBin(binData []byte) error
- func (c *NetConn) WriteMsg(msg *Message) error
- func (c *NetConn) WriteTxtMsg(content string) error
- type PacketType
- type RoleType
Constants ¶
const ( // PacketTypeMin is the min packet type value PacketTypeMin = 1 // PacketTypeMax is the max packet type value PacketTypeMax = 42 )
const ( // MagicReqValue holds the magic code text value of REQ MagicReqValue = "\000REQ" // MagicResValue holds the magic code text value of RES MagicResValue = "\000RES" )
const IDStrLength = 32
IDStrLength is the length of ID string
const MaxBodySize = 63
MaxBodySize is the max body size
Variables ¶
var ( MsgPool = NewMessagePool() MsgHeaderPool = NewMessageHeaderPool() )
Functions ¶
This section is empty.
Types ¶
type Conn ¶
type Conn interface { fmt.Stringer ReadMsg() (*Message, string, error) WriteMsg(*Message) error WriteTxtMsg(string) error WriteBin([]byte) error Close() error ID() *ID Closed() <-chan struct{} }
Conn defines the high level interface of a connection
type ConnManager ¶
type ConnManager struct {
// contains filtered or unexported fields
}
ConnManager manages connections
func (*ConnManager) AddConn ¶
func (m *ConnManager) AddConn(conn Conn)
AddConn adds a conn to the manager
func (*ConnManager) GetConn ¶
func (m *ConnManager) GetConn(id *ID) Conn
GetConn returns the connection by given id return nil if the id not found in the manager
func (*ConnManager) RemoveConn ¶
func (m *ConnManager) RemoveConn(id *ID)
RemoveConn removes the connection from manager by id
type ID ¶
ID is the type of identity The underlying type is byte array, so use ID as key of a map, but use it's pointer for assignments and arguments to avoid memery copy
func UnmarshalID ¶
UnmarshalID unmarshal an ID from string
type MagicType ¶
type MagicType byte
MagicType is the type of the magic code
const ( // MagicReq is for REQ magic type MagicReq MagicType // MagicRes is for RES magic type MagicRes )
type Message ¶
type Message struct { MagicType MagicType PacketType PacketType Arguments []string }
Message represents a REQ/RES packet
func NextMessage ¶
NextMessage reads next message from a bufio.Reader It returns one of binMsg and txtMsg leaving the other as zero value if no any error occured binMsg returned if the next message is binary, and txtMsg if it's text For binary message, it treats error for such cases - 1.read error from the reader 2. invalid magic code / packet type / body size (it will read the full message from the reader in this case, so the next message can be read as expected) It dose not care about the validity of the message, message.Validate() should be called for it
func (*Message) Encode ¶
Encode encodes the message to bytes in the gearman official protocol format
type MessageHandler ¶
type MessageHandler interface { SupportPacketTypes() []PacketType Handle(context.Context, *Message, Conn) (bool, error) }
MessageHandler is the interface of a message handler
type MessageHandlerManager ¶
type MessageHandlerManager struct {
// contains filtered or unexported fields
}
MessageHandlerManager holds all available message handlers
func NewHandlerManager ¶
func NewHandlerManager(role RoleType, reqTimeout time.Duration) *MessageHandlerManager
NewHandlerManager creates an empty handler manager
func (*MessageHandlerManager) HandleMessage ¶
func (m *MessageHandlerManager) HandleMessage(msg *Message, conn Conn) (bool, error)
HandleMessage process one message for the connection first it checks the validity of the message and return the error if fails then it dispatch to the approciate handler to process the message
func (*MessageHandlerManager) RegisterHandler ¶
func (m *MessageHandlerManager) RegisterHandler(packetType PacketType, handler MessageHandler)
RegisterHandler registers a handler
type MessageHeaderPool ¶
type MessageHeaderPool struct {
// contains filtered or unexported fields
}
MessageHeaderPool keeps the recyclable message header slice to prevent re-allocate It wraps sync.Pool
func NewMessageHeaderPool ¶
func NewMessageHeaderPool() *MessageHeaderPool
NewMessageHeaderPool creates a new MessageHeaderPool object
func (*MessageHeaderPool) Get ¶
func (p *MessageHeaderPool) Get() *[headerSize]byte
Get returns a free Message header slice from the pool or creates a new one if the pool is empty
func (*MessageHeaderPool) Put ¶
func (p *MessageHeaderPool) Put(header *[headerSize]byte)
Put puts a free Message header slice back to the pool
type MessagePool ¶
type MessagePool struct {
// contains filtered or unexported fields
}
MessagePool keeps the recyclable Message objects to prevent re-allocate It wraps sync.Pool
func NewMessagePool ¶
func NewMessagePool() *MessagePool
NewMessagePool creates a new MessagePool object
func (*MessagePool) Get ¶
func (p *MessagePool) Get() *Message
Get returns a free Message object from the pool or creates a new one if the pool is empty
func (*MessagePool) Put ¶
func (p *MessagePool) Put(msg *Message)
Put puts a free Message object back to the pool
type MockConn ¶
type MockConn struct { ReadCh chan *Message WriteCh chan *Message ReadTxtCh chan string WriteTxtCh chan string Timeout time.Duration ConnID *ID // contains filtered or unexported fields }
MockConn is a Conn implementation by channel the main purpose of this is for unit test
func NewMockConn ¶
NewMockConn creates a new MockConn
func (*MockConn) Closed ¶
func (c *MockConn) Closed() <-chan struct{}
Closed returns the closed channel of the mock connection
func (*MockConn) WriteTxtMsg ¶
WriteTxtMsg writes a text message to the channel
type MockHandler ¶
MockHandler is an implementation for unit test
func (*MockHandler) SupportPacketTypes ¶
func (h *MockHandler) SupportPacketTypes() []PacketType
type NetConn ¶
type NetConn struct {
// contains filtered or unexported fields
}
NetConn is a Conn implementation of the net connection
func (*NetConn) Closed ¶
func (c *NetConn) Closed() <-chan struct{}
Closed returns the closed channel
func (*NetConn) WriteTxtMsg ¶
WriteTxtMsg writes a Message to the net connection
type PacketType ¶
type PacketType byte
PacketType is the type of the packet, such as SUBMIT_JOB / GET_STATUS
const ( CAN_DO PacketType CANT_DO RESET_ABILITIES PRE_SLEEP NOOP SUBMIT_JOB JOB_CREATED GRAB_JOB NO_JOB JOB_ASSIGN WORK_STATUS WORK_COMPLETE WORK_FAIL GET_STATUS ECHO_REQ ECHO_RES SUBMIT_JOB_BG ERROR STATUS_RES SUBMIT_JOB_HIGH SET_CLIENT_ID CAN_DO_TIMEOUT ALL_YOURS WORK_EXCEPTION OPTION_REQ OPTION_RES WORK_DATA WORK_WARNING GRAB_JOB_UNIQ JOB_ASSIGN_UNIQ SUBMIT_JOB_HIGH_BG SUBMIT_JOB_LOW SUBMIT_JOB_LOW_BG SUBMIT_JOB_SCHED SUBMIT_JOB_EPOCH SUBMIT_REDUCE_JOB SUBMIT_REDUCE_JOB_BACKGROUND GRAB_JOB_ALL JOB_ASSIGN_ALL GET_STATUS_UNIQUE STATUS_RES_UNIQUE )
func (PacketType) String ¶
func (packetType PacketType) String() string
func (PacketType) Valid ¶
func (packetType PacketType) Valid() bool
Valid checks if the packet type is valid
type RoleType ¶
type RoleType byte
RoleType is the type of role(worker/client/server)
const ( // RoleWorker is the role of worker RoleWorker RoleType = 1 << iota // RoleClient is the role of client RoleClient // RoleServer is the role of server RoleServer // RoleWorkerAndClient presents role worker and client RoleWorkerAndClient = RoleWorker | RoleClient )