worker

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2023 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LinkUninitialized = 0
	LinkInitialized   = 1
	LinkClosed        = 2
)
View Source
const (
	WorkerRunning        = int32(0)
	WorkerClosing        = int32(1)
	WorkerClosed         = int32(2)
	RetrialBackoffFactor = 2
	MinDataLinks         = 1
	MaxDataLinks         = 10
	MaxDataLinkRetrial   = 3
)

Variables

View Source
var (
	ErrWorkerClosed     = errors.New("worker closed")
	ErrNoProxySpecified = errors.New("no proxy specified")
	ErrInvalidShortcut  = errors.New("invalid shortcut connection")
	ErrShouldIgnore     = errors.New("should ignore")

	DialTimeout           = 20 * time.Millisecond
	RetrialDelayStartFrom = 100 * time.Millisecond
	RetrialMaxDelay       = 10 * time.Second
)
View Source
var (
	ErrHeartbeatFailed = errors.New("heartbeat failed")
)
View Source
var (
	ErrLinkClosed = errors.New("link closed")
)
View Source
var (
	ResponseTimeout = 100 * time.Millisecond
)

Functions

This section is empty.

Types

type BadResponse

type BadResponse struct {
	BaseResponse
}

type BaseResponse

type BaseResponse struct {
	resp.ResponseWriter

	Attempts   int
	Cmd        string
	Body       []byte
	BodyStream resp.AllReadCloser
	// contains filtered or unexported fields
}

func (*BaseResponse) Command

func (r *BaseResponse) Command() string

func (*BaseResponse) Context

func (r *BaseResponse) Context() context.Context

Context return the response context

func (*BaseResponse) Flush

func (r *BaseResponse) Flush() error

func (*BaseResponse) Prepare

func (r *BaseResponse) Prepare()

Overwrite me

func (*BaseResponse) SetContext

func (r *BaseResponse) SetContext(ctx context.Context)

SetContext sets the client's context

func (*BaseResponse) Size

func (r *BaseResponse) Size() int64

func (*BaseResponse) String

func (r *BaseResponse) String() string

type Client

type Client struct {
	Ctrl   bool
	Conn   net.Conn
	Writer *resp.RequestWriter
	Reader resp.ResponseReader
}

Simple client of worker for debuging.

func NewClient

func NewClient(cn net.Conn, ctrl bool) *Client

type DefaultHeartbeater

type DefaultHeartbeater struct {
}
func (hb *DefaultHeartbeater) SendToLink(link *Link, flags int64) error

type ErrorResponse

type ErrorResponse struct {
	BaseResponse

	Error interface{}
}

ErrorResponse Response wrapper for errors.

func (*ErrorResponse) Prepare

func (e *ErrorResponse) Prepare()

func (*ErrorResponse) String

func (e *ErrorResponse) String() string

type HandlerProxy

type HandlerProxy struct {
	// contains filtered or unexported fields
}

func (*HandlerProxy) HandlerFunc

func (h *HandlerProxy) HandlerFunc(w resp.ResponseWriter, c *resp.Command)

func (*HandlerProxy) StreamHandlerFunc

func (h *HandlerProxy) StreamHandlerFunc(w resp.ResponseWriter, c *resp.CommandStream)

type HeartbeatError

type HeartbeatError interface {
	Flags() int64
}

type Heartbeater

type Heartbeater interface {
	SendToLink(*Link, int64) error
}
type Link struct {
	*redeo.Client
	// contains filtered or unexported fields
}

Wrapper for redeo client that support response buffering if connection is unavailable

func LinkFromClient

func LinkFromClient(client *redeo.Client) *Link
func NewLink(ctrl bool) *Link

func (*Link) AddResponses

func (ln *Link) AddResponses(rsp interface{}) error

Add asynchronize response, error if the client is closed.

func (*Link) Close

func (ln *Link) Close()

func (*Link) GrantToken

func (ln *Link) GrantToken(token *struct{})

func (*Link) ID

func (ln *Link) ID() int

func (*Link) Initialize

func (ln *Link) Initialize() bool

func (*Link) Invalidate

func (ln *Link) Invalidate(err error)

func (*Link) IsClosed

func (ln *Link) IsClosed() bool

func (*Link) IsControl

func (ln *Link) IsControl() bool

func (*Link) Reset

func (ln *Link) Reset(conn net.Conn)

func (*Link) RevokeToken

func (ln *Link) RevokeToken() *struct{}

func (*Link) String

func (ln *Link) String() string

type ObjectResponse

type ObjectResponse struct {
	BaseResponse

	ReqId        string
	ChunkId      string
	Val          string
	Recovered    int64
	Extension    time.Duration
	PiggyFlags   int64
	PiggyPayload []byte
}

ObjectResponse Response wrapper for objects.

func (*ObjectResponse) Prepare

func (r *ObjectResponse) Prepare()

func (*ObjectResponse) Size

func (r *ObjectResponse) Size() int64

func (*ObjectResponse) String

func (r *ObjectResponse) String() string

type Preparer

type Preparer func(*SimpleResponse, resp.ResponseWriter) error

Response filler to simplify response construction. If an error is returned, the response will be discarded, and the connection will be closed except an ErrShouldIgnore is returned.

type Response

type Response interface {
	redeo.Contextable

	// Command get command
	Command() string

	// Prepare overwrite to customize fields of a Response.
	Prepare()

	// Flush waits for the response to be received by the proxy.
	// Because the data will be written to a buffer and return immediately, a successful "flush"
	// may not guarantee that the data has been received by the proxy. In a function, the function
	// can be suspended any time and leave the data in the middle of the network stack, which may lead
	// to unexpected behavior. To avoid this, an acknowledgement is required to ensure that the data
	// has been received by the proxy. On timeout, the link will be closed.
	Flush() error

	// Size overwrite to return the size of a Response.
	Size() int64
	// contains filtered or unexported methods
}

type SimpleResponse

type SimpleResponse struct {
	BaseResponse
}

type TestClient

type TestClient struct {
	Conn   sysnet.Conn
	Writer *resp.RequestWriter
	Reader resp.ResponseReader
}

func NewTestClient

func NewTestClient(cn sysnet.Conn) *TestClient

type Worker

type Worker struct {
	*redeo.Server
	// contains filtered or unexported fields
}

Worker Lambda serve worker. A worker uses two types of links: control and data. Control link: Stable connection to serve control commands and small requests. Data link: Short lived (one time mostly) connection serve all requests, data link is established on demand via a dynamic connection system Dynamic connection system: Use token to control minimum active connections. on connecting, each connection consumes a token,

and the token is returned on first request or disconnection, among which is first.

func NewWorker

func NewWorker(lifeId int64) *Worker

func (*Worker) AddResponses

func (wrk *Worker) AddResponses(rsp Response, links ...interface{}) (err error)

Add asynchronize response, error if the client is closed. The function will use the link specified by the second parameter, and use datalink automatically if payload is large enough. In case an error is returned on closing, the caller can safely ignore the error and call rsp.Flush() afterward without side affect.

func (*Worker) AddResponsesWithPreparer

func (wrk *Worker) AddResponsesWithPreparer(cmd string, preparer Preparer, links ...interface{}) (Response, error)

func (*Worker) Close

func (wrk *Worker) Close()

func (*Worker) CloseWithOptions

func (wrk *Worker) CloseWithOptions(opts ...bool)

func (*Worker) GetStats

func (wrk *Worker) GetStats() types.ServerStats

types.ServerStats implementation.

func (*Worker) Handler

func (wrk *Worker) Handler(fn redeo.HandlerFunc) redeo.HandlerFunc

func (*Worker) Id

func (wrk *Worker) Id() int32

func (*Worker) IsClosed

func (wrk *Worker) IsClosed() bool

func (*Worker) Pause

func (wrk *Worker) Pause()

func (*Worker) RTT

func (wrk *Worker) RTT() time.Duration

func (*Worker) SetFailure

func (wrk *Worker) SetFailure(link interface{}, err error)

func (*Worker) SetHeartbeater

func (wrk *Worker) SetHeartbeater(heartbeater Heartbeater)

func (*Worker) SetManualAck

func (wrk *Worker) SetManualAck(enable bool)

func (*Worker) StartOrResume

func (wrk *Worker) StartOrResume(proxyAddr sysnet.Addr, args ...*WorkerOptions) (isStart bool, err error)

func (*Worker) StreamHandler

func (wrk *Worker) StreamHandler(fn redeo.StreamHandlerFunc) redeo.StreamHandlerFunc
func (wrk *Worker) VerifyDataLinks(availableLinks int, reportedAt time.Time)

VerifyDataLinks Verify the number of available data links at proxy side.

func (*Worker) WaitAck

func (wrk *Worker) WaitAck(cmd string, cb func(), links ...interface{})

type WorkerOptions

type WorkerOptions struct {
	DryRun       bool
	MinDataLinks int
	LogLevel     int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL