gearman

package module
v0.0.0-...-60dd28f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2018 License: MIT Imports: 16 Imported by: 0

README

gearman Build Status Go Report Card

It is a go implementation of gearman

For now only the server is implemented, client and worker will be added in future.

server

client (TODO)

worker (TODO)

Documentation

Index

Constants

View Source
const (
	// PacketTypeMin is the min packet type value
	PacketTypeMin = 1
	// PacketTypeMax is the max packet type value
	PacketTypeMax = 42
)
View Source
const (
	// MagicReqValue holds the magic code text value of REQ
	MagicReqValue = "\000REQ"
	// MagicResValue holds the magic code text value of RES
	MagicResValue = "\000RES"
)
View Source
const IDStrLength = 32

IDStrLength is the length of ID string

View Source
const MaxBodySize = 63

MaxBodySize is the max body size

Variables

View Source
var (
	MsgPool       = NewMessagePool()
	MsgHeaderPool = NewMessageHeaderPool()
)

Functions

This section is empty.

Types

type Conn

type Conn interface {
	fmt.Stringer
	ReadMsg() (*Message, string, error)
	WriteMsg(*Message) error
	WriteTxtMsg(string) error
	WriteBin([]byte) error
	Close() error
	ID() *ID
	Closed() <-chan struct{}
}

Conn defines the high level interface of a connection

type ConnManager

type ConnManager struct {
	// contains filtered or unexported fields
}

ConnManager manages connections

func NewConnManager

func NewConnManager() *ConnManager

NewConnManager creates a new conn manager

func (*ConnManager) AddConn

func (m *ConnManager) AddConn(conn Conn)

AddConn adds a conn to the manager

func (*ConnManager) GetConn

func (m *ConnManager) GetConn(id *ID) Conn

GetConn returns the connection by given id return nil if the id not found in the manager

func (*ConnManager) RemoveConn

func (m *ConnManager) RemoveConn(id *ID)

RemoveConn removes the connection from manager by id

type ID

type ID uuid.UUID

ID is the type of identity The underlying type is byte array, so use ID as key of a map, but use it's pointer for assignments and arguments to avoid memery copy

func UnmarshalID

func UnmarshalID(str string) (*ID, error)

UnmarshalID unmarshal an ID from string

func (*ID) String

func (id *ID) String() string

type IDGenerator

type IDGenerator struct {
}

IDGenerator is a generator of ID

func NewIDGenerator

func NewIDGenerator() *IDGenerator

NewIDGenerator creates a new ID generator

func (*IDGenerator) Generate

func (g *IDGenerator) Generate() *ID

Generate generates a new ID

type MagicType

type MagicType byte

MagicType is the type of the magic code

const (

	// MagicReq is for REQ magic type
	MagicReq MagicType
	// MagicRes is for RES magic type
	MagicRes
)

func (MagicType) String

func (magicType MagicType) String() string

func (MagicType) Valid

func (magicType MagicType) Valid() bool

Valid checks if the magic type is valid

type Message

type Message struct {
	MagicType  MagicType
	PacketType PacketType
	Arguments  []string
}

Message represents a REQ/RES packet

func NextMessage

func NextMessage(reader *bufio.Reader) (binMsg *Message, txtMsg string, err error)

NextMessage reads next message from a bufio.Reader It returns one of binMsg and txtMsg leaving the other as zero value if no any error occured binMsg returned if the next message is binary, and txtMsg if it's text For binary message, it treats error for such cases - 1.read error from the reader 2. invalid magic code / packet type / body size (it will read the full message from the reader in this case, so the next message can be read as expected) It dose not care about the validity of the message, message.Validate() should be called for it

func (*Message) Encode

func (m *Message) Encode() ([]byte, error)

Encode encodes the message to bytes in the gearman official protocol format

func (*Message) String

func (m *Message) String() string

func (*Message) Validate

func (m *Message) Validate(role RoleType) error

Validate checks the validity of the message and return an error if has It validates - 1. If the packet type is expected for the current role (Eg. SUBMIT_JOB sent from a client to server is invalid) 2. If the length of the arguments is expected

func (*Message) WriteTo

func (m *Message) WriteTo(writer io.Writer) (int64, error)

WriteTo writes the message data to a Writer

type MessageHandler

type MessageHandler interface {
	SupportPacketTypes() []PacketType
	Handle(context.Context, *Message, Conn) (bool, error)
}

MessageHandler is the interface of a message handler

type MessageHandlerManager

type MessageHandlerManager struct {
	// contains filtered or unexported fields
}

MessageHandlerManager holds all available message handlers

func NewHandlerManager

func NewHandlerManager(role RoleType, reqTimeout time.Duration) *MessageHandlerManager

NewHandlerManager creates an empty handler manager

func (*MessageHandlerManager) HandleMessage

func (m *MessageHandlerManager) HandleMessage(msg *Message, conn Conn) (bool, error)

HandleMessage process one message for the connection first it checks the validity of the message and return the error if fails then it dispatch to the approciate handler to process the message

func (*MessageHandlerManager) RegisterHandler

func (m *MessageHandlerManager) RegisterHandler(packetType PacketType, handler MessageHandler)

RegisterHandler registers a handler

type MessageHeaderPool

type MessageHeaderPool struct {
	// contains filtered or unexported fields
}

MessageHeaderPool keeps the recyclable message header slice to prevent re-allocate It wraps sync.Pool

func NewMessageHeaderPool

func NewMessageHeaderPool() *MessageHeaderPool

NewMessageHeaderPool creates a new MessageHeaderPool object

func (*MessageHeaderPool) Get

func (p *MessageHeaderPool) Get() *[headerSize]byte

Get returns a free Message header slice from the pool or creates a new one if the pool is empty

func (*MessageHeaderPool) Put

func (p *MessageHeaderPool) Put(header *[headerSize]byte)

Put puts a free Message header slice back to the pool

type MessagePool

type MessagePool struct {
	// contains filtered or unexported fields
}

MessagePool keeps the recyclable Message objects to prevent re-allocate It wraps sync.Pool

func NewMessagePool

func NewMessagePool() *MessagePool

NewMessagePool creates a new MessagePool object

func (*MessagePool) Get

func (p *MessagePool) Get() *Message

Get returns a free Message object from the pool or creates a new one if the pool is empty

func (*MessagePool) Put

func (p *MessagePool) Put(msg *Message)

Put puts a free Message object back to the pool

type MockConn

type MockConn struct {
	ReadCh     chan *Message
	WriteCh    chan *Message
	ReadTxtCh  chan string
	WriteTxtCh chan string
	Timeout    time.Duration
	ConnID     *ID
	// contains filtered or unexported fields
}

MockConn is a Conn implementation by channel the main purpose of this is for unit test

func NewMockConn

func NewMockConn(readCache int, writeCache int) *MockConn

NewMockConn creates a new MockConn

func (*MockConn) Close

func (c *MockConn) Close() error

Close closes the mock connection

func (*MockConn) Closed

func (c *MockConn) Closed() <-chan struct{}

Closed returns the closed channel of the mock connection

func (*MockConn) ID

func (c *MockConn) ID() *ID

ID returns the identity of the mock connection

func (*MockConn) ReadMsg

func (c *MockConn) ReadMsg() (*Message, string, error)

ReadMsg reads next Message from the channel

func (*MockConn) String

func (c *MockConn) String() string

func (*MockConn) WriteBin

func (c *MockConn) WriteBin(binData []byte) error

WriteBin writes an encoded binary message to the channel

func (*MockConn) WriteMsg

func (c *MockConn) WriteMsg(m *Message) error

WriteMsg writes a Message to the channel

func (*MockConn) WriteTxtMsg

func (c *MockConn) WriteTxtMsg(content string) error

WriteTxtMsg writes a text message to the channel

type MockHandler

type MockHandler struct {
	mock.Mock
}

MockHandler is an implementation for unit test

func (*MockHandler) Handle

func (h *MockHandler) Handle(ctx context.Context, msg *Message, conn Conn) (bool, error)

func (*MockHandler) SupportPacketTypes

func (h *MockHandler) SupportPacketTypes() []PacketType

type NetConn

type NetConn struct {
	// contains filtered or unexported fields
}

NetConn is a Conn implementation of the net connection

func NewNetConn

func NewNetConn(conn net.Conn, id *ID) *NetConn

NewNetConn creates a NetConn

func (*NetConn) Close

func (c *NetConn) Close() error

Close closes the connection

func (*NetConn) Closed

func (c *NetConn) Closed() <-chan struct{}

Closed returns the closed channel

func (*NetConn) ID

func (c *NetConn) ID() *ID

ID returns the identity of the net connection

func (*NetConn) ReadMsg

func (c *NetConn) ReadMsg() (*Message, string, error)

ReadMsg reads next Message from the net connection

func (*NetConn) String

func (c *NetConn) String() string

func (*NetConn) WriteBin

func (c *NetConn) WriteBin(binData []byte) error

WriteBin writes an encoded binary message to the net connection

func (*NetConn) WriteMsg

func (c *NetConn) WriteMsg(msg *Message) error

WriteMsg writes a Message to the net connection

func (*NetConn) WriteTxtMsg

func (c *NetConn) WriteTxtMsg(content string) error

WriteTxtMsg writes a Message to the net connection

type PacketType

type PacketType byte

PacketType is the type of the packet, such as SUBMIT_JOB / GET_STATUS

const (
	CAN_DO PacketType
	CANT_DO
	RESET_ABILITIES
	PRE_SLEEP

	NOOP
	SUBMIT_JOB
	JOB_CREATED
	GRAB_JOB
	NO_JOB
	JOB_ASSIGN
	WORK_STATUS
	WORK_COMPLETE
	WORK_FAIL
	GET_STATUS
	ECHO_REQ
	ECHO_RES
	SUBMIT_JOB_BG
	ERROR
	STATUS_RES
	SUBMIT_JOB_HIGH
	SET_CLIENT_ID
	CAN_DO_TIMEOUT
	ALL_YOURS
	WORK_EXCEPTION
	OPTION_REQ
	OPTION_RES
	WORK_DATA
	WORK_WARNING
	GRAB_JOB_UNIQ
	JOB_ASSIGN_UNIQ
	SUBMIT_JOB_HIGH_BG
	SUBMIT_JOB_LOW
	SUBMIT_JOB_LOW_BG
	SUBMIT_JOB_SCHED
	SUBMIT_JOB_EPOCH
	SUBMIT_REDUCE_JOB
	SUBMIT_REDUCE_JOB_BACKGROUND
	GRAB_JOB_ALL
	JOB_ASSIGN_ALL
	GET_STATUS_UNIQUE
	STATUS_RES_UNIQUE
)

func (PacketType) String

func (packetType PacketType) String() string

func (PacketType) Valid

func (packetType PacketType) Valid() bool

Valid checks if the packet type is valid

type RoleType

type RoleType byte

RoleType is the type of role(worker/client/server)

const (
	// RoleWorker is the role of worker
	RoleWorker RoleType = 1 << iota
	// RoleClient is the role of client
	RoleClient
	// RoleServer is the role of server
	RoleServer
	// RoleWorkerAndClient presents role worker and client
	RoleWorkerAndClient = RoleWorker | RoleClient
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL