server

package
v0.5.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2023 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package server is the webserver which sends simulation requests to the simulator.

Index

Constants

This section is empty.

Variables

View Source
var (
	JobChannelBuffer = GetEnvInt("JOB_CHAN_BUFFER", 2)          // buffer for JobC in backends (for transporting jobs from server -> backend node)
	RequestMaxTries  = GetEnvInt("RETRIES_MAX", 3)              // 3 tries means it will be retried 2 additional times, and on third error would fail
	PayloadMaxBytes  = GetEnvInt("PAYLOAD_MAX_KB", 8192) * 1024 // Max payload size in bytes. If a payload sent to the webserver is larger, it returns "400 Bad Request".

	MaxQueueItemsFastTrack = GetEnvInt("ITEMS_FASTTRACK_MAX", 0) // Max number of items in fast-track queue. 0 means no limit.
	MaxQueueItemsHighPrio  = GetEnvInt("ITEMS_HIGHPRIO_MAX", 0)  // Max number of items in high-prio queue. 0 means no limit.
	MaxQueueItemsLowPrio   = GetEnvInt("ITEMS_LOWPRIO_MAX", 0)   // Max number of items in low-prio queue. 0 means no limit.

	// How often fast-track queue items should be popped before popping a high-priority item
	FastTrackPerHighPrio = GetEnvInt("ITEMS_FASTTRACK_PER_HIGHPRIO", 2)
	FastTrackDrainFirst  = os.Getenv("FASTTRACK_DRAIN_FIRST") == "1" // whether to fully drain the fast-track queue first

	RequestTimeout       = time.Duration(GetEnvInt("REQUEST_TIMEOUT", 5)) * time.Second       // Time between creation and receive in the node worker, after which a SimRequest will not be processed anymore
	ServerJobSendTimeout = time.Duration(GetEnvInt("JOB_SEND_TIMEOUT", 2)) * time.Second      // How long the server tries to send a job into the nodepool for processing
	ProxyRequestTimeout  = time.Duration(GetEnvInt("REQUEST_PROXY_TIMEOUT", 3)) * time.Second // HTTP request timeout for proxy requests to the backend node
)
View Source
var (
	ErrRequestTimeout   = errors.New("request timeout hit before processing")
	ErrNodeTimeout      = errors.New("node timeout")
	ErrNoNodesAvailable = errors.New("no nodes available")
)
View Source
var (
	RedisPrefix   = "prio-load-balancer:"
	RedisKeyNodes = RedisPrefix + "prio-load-balancer:nodes"
)

Functions

func GetEnvInt

func GetEnvInt(key string, defaultValue int) int

func LogConfig

func LogConfig(log *zap.SugaredLogger)

func LoggingMiddleware

func LoggingMiddleware(log *zap.SugaredLogger, next http.Handler) http.Handler

LoggingMiddleware logs the incoming HTTP request & its duration.

Types

type Node

type Node struct {
	URI     string
	AddedAt time.Time
	// contains filtered or unexported fields
}

func NewNode

func NewNode(log *zap.SugaredLogger, uri string, jobC chan *SimRequest, numWorkers int32) (*Node, error)

func (*Node) HealthCheck

func (n *Node) HealthCheck() error

func (*Node) ProxyRequest added in v0.5.0

func (n *Node) ProxyRequest(payload []byte, timeout time.Duration) (resp []byte, statusCode int, err error)

func (*Node) StartWorkers

func (n *Node) StartWorkers()

StartWorkers spawns the proxy workers in goroutines. Workers that are already running will be cancelled.

func (*Node) StopWorkers

func (n *Node) StopWorkers()

func (*Node) StopWorkersAndWait

func (n *Node) StopWorkersAndWait()

type NodePool

type NodePool struct {
	JobC chan *SimRequest
	// contains filtered or unexported fields
}

func NewNodePool

func NewNodePool(log *zap.SugaredLogger, redisState *RedisState, numWorkersPerNode int32) *NodePool

func (*NodePool) AddNode

func (gp *NodePool) AddNode(uri string) error

AddNode adds a node to the pool and starts the workers. If a new node is added, the list of nodes is saved to redis.

func (*NodePool) DelNode

func (gp *NodePool) DelNode(uri string) (deleted bool, err error)

func (*NodePool) HasNode

func (gp *NodePool) HasNode(uri string) bool

HasNode returns true if a node with the URI is already in the pool

func (*NodePool) LoadNodesFromRedis

func (gp *NodePool) LoadNodesFromRedis() error

func (*NodePool) NodeUris

func (gp *NodePool) NodeUris() []string

func (*NodePool) Shutdown

func (gp *NodePool) Shutdown()

Shutdown will stop all node workers, but let's them finish the ongoing connections

type NodeURIPayload

type NodeURIPayload struct {
	URI string `json:"uri"`
}

type PrioQueue

type PrioQueue struct {
	// contains filtered or unexported fields
}

PrioQueue has 3 queues: fastTrack, highPrio and lowPrio - items will be popped 1:1 from fastTrack and highPrio, until both are empty - then items from lowPrio queue are used

maybe we should configure that every n-th item is used from low-prio?

func NewPrioQueue

func NewPrioQueue(maxFastTrack, maxHighPrio, maxLowPrio, numFastTrackForHighPrio int, fastTrackDrainFirst bool) *PrioQueue

func (*PrioQueue) Close

func (q *PrioQueue) Close()

Close disallows adding any new items with Push(), and lets readers using Pop() return nil if queue is empty

func (*PrioQueue) CloseAndWait

func (q *PrioQueue) CloseAndWait()

CloseAndWait closes the queue and waits until the queue is empty

func (*PrioQueue) Len

func (q *PrioQueue) Len() (lenFastTrack, lenHighPrio, lenLowPrio int)

func (*PrioQueue) NumRequests

func (q *PrioQueue) NumRequests() int

func (*PrioQueue) Pop

func (q *PrioQueue) Pop() (nextReq *SimRequest)

Pop returns the next Bid. If no task in queue, blocks until there is one again. First drains the high-prio queue, then the low-prio one. Will return nil only after calling Close() when the queue is empty

func (*PrioQueue) Push

func (q *PrioQueue) Push(r *SimRequest) bool

Push adds a new item to the end of the queue. Returns true if added, false if queue is closed or at max capacity

func (*PrioQueue) String

func (q *PrioQueue) String() string

type RedisState

type RedisState struct {
	RedisClient *redis.Client
}

func NewRedisState

func NewRedisState(redisURI string) (*RedisState, error)

func (*RedisState) GetNodes

func (s *RedisState) GetNodes() (nodeUris []string, err error)

func (*RedisState) SaveNodes

func (s *RedisState) SaveNodes(nodeUris []string) error

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the overall load balancer server

func NewServer

func NewServer(opts ServerOpts) (*Server, error)

NewServer creates a new Server instance, loads the nodes from Redis and starts the node workers

func (*Server) AddNode

func (s *Server) AddNode(uri string) error

AddNode adds a new execution node to the pool and starts the workers. If a new node is added, the list of nodes is saved to redis.

func (*Server) NumNodeWorkersAlive

func (s *Server) NumNodeWorkersAlive() int

NumNodeWorkersAlive returns the number of currently active node workers

func (*Server) QueueSize

func (s *Server) QueueSize() (lenFastTrack, lenHighPrio, lenLowPrio int)

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown gracefully shuts down the server. Allows ongoing requests to complete, but no further requests will be accepted or those from the queue processed.

func (*Server) Start

func (s *Server) Start()

Start starts the webserver and the main loop (pumping jobs from the queue to the workers)

type ServerOpts

type ServerOpts struct {
	Log            *zap.SugaredLogger
	HTTPAddrPtr    string // listen address for the webserver
	RedisURI       string // (optional) URI for the redis instance. If empty then don't use Redis.
	WorkersPerNode int32  // Number of concurrent workers per execution node
}

type SimRequest

type SimRequest struct {
	// can be none of, or one of high-prio / fast-track
	IsHighPrio  bool
	IsFastTrack bool

	Payload   []byte
	ResponseC chan SimResponse
	Cancelled bool
	CreatedAt time.Time
	Tries     int
}

func NewSimRequest

func NewSimRequest(payload []byte, isHighPrio, IsFastTrack bool) *SimRequest

func (*SimRequest) SendResponse

func (r *SimRequest) SendResponse(resp SimResponse) (wasSent bool)

SendResponse sends the response to ResponseC. If noone is listening on the channel, it is dropped.

type SimResponse

type SimResponse struct {
	StatusCode  int
	Payload     []byte
	Error       error
	ShouldRetry bool // When response has an error, whether it should be retried
}

type Webserver

type Webserver struct {
	// contains filtered or unexported fields
}

func NewWebserver

func NewWebserver(log *zap.SugaredLogger, listenAddr string, prioQueue *PrioQueue, nodePool *NodePool) *Webserver

func (*Webserver) HandleNodesRequest

func (s *Webserver) HandleNodesRequest(w http.ResponseWriter, req *http.Request)

func (*Webserver) HandleQueueRequest

func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request)

func (*Webserver) HandleRootRequest

func (s *Webserver) HandleRootRequest(w http.ResponseWriter, req *http.Request)

func (*Webserver) Start

func (s *Webserver) Start()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL