Documentation ¶
Index ¶
- Variables
- type ClusterServer
- func (c *ClusterServer) IsNodeLeader(ctx context.Context, req *v1.Empty) (*v1.IsNodeLeaderResponse, error)
- func (c *ClusterServer) Join(ctx context.Context, req *v1.JoinRequest) (*v1.Empty, error)
- func (c *ClusterServer) Leave(ctx context.Context, req *v1.LeaveRequest) (*v1.Empty, error)
- func (c *ClusterServer) Snapshot(ctx context.Context, req *v1.Empty) (*v1.Empty, error)
- type HealthCheckServer
- type JSMServer
- func (j *JSMServer) Bury(ctx context.Context, req *v1.BuryRequest) (*v1.Empty, error)
- func (j *JSMServer) CheckClientState(ctx context.Context, req *v1.CheckClientStateRequest) (*v1.CheckClientStateResponse, error)
- func (j *JSMServer) Delete(ctx context.Context, req *v1.DeleteRequest) (*v1.Empty, error)
- func (j *JSMServer) GetJob(ctx context.Context, req *v1.GetJobRequest) (*v1.GetJobResponse, error)
- func (j *JSMServer) GetStatsJobYaml(ctx context.Context, req *v1.GetStatsJobYamlRequest) (*v1.GetStatsJobYamlResponse, error)
- func (j *JSMServer) GetStatsTubeYaml(ctx context.Context, req *v1.GetStatsTubeYamlRequest) (*v1.GetStatsTubeYamlResponse, error)
- func (j *JSMServer) GetStatsYaml(ctx context.Context, req *v1.Empty) (*v1.GetStatsYamlResponse, error)
- func (j *JSMServer) Kick(ctx context.Context, req *v1.KickRequest) (*v1.Empty, error)
- func (j *JSMServer) KickN(ctx context.Context, req *v1.KickNRequest) (*v1.KickNResponse, error)
- func (j *JSMServer) ListTubes(ctx context.Context, req *v1.Empty) (*v1.ListTubesResponse, error)
- func (j *JSMServer) PeekBuried(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
- func (j *JSMServer) PeekDelayed(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
- func (j *JSMServer) PeekReady(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
- func (j *JSMServer) Put(ctx context.Context, req *v1.PutRequest) (*v1.PutResponse, error)
- func (j *JSMServer) Release(ctx context.Context, req *v1.ReleaseRequest) (*v1.Empty, error)
- func (j *JSMServer) Reserve(ctx context.Context, req *v1.ReserveRequest) (*v1.ReserveResponse, error)
- func (j *JSMServer) RunController()
- func (j *JSMServer) StreamReserveUpdates(req *v1.ReserveUpdateRequest, ...) error
- func (j *JSMServer) Tick() (*v1.TickResponse, error)
- func (j *JSMServer) Touch(ctx context.Context, req *v1.TouchRequest) (*v1.Empty, error)
- type JsmTick
- type ProxyJoinReq
- type ProxyLeaveReq
- type ProxyResp
- type ProxyRespType
- type RaftCluster
- type ReplicatedJsm
- type ReservationsController
- type ServiceReadiness
Constants ¶
This section is empty.
Variables ¶
var ( // Returned if the same proxy client attempts to connect with the controller ErrProxyExists = errors.New("proxy with id exists") ErrNotLeader = errors.New("current node is not a leader") )
Functions ¶
This section is empty.
Types ¶
type ClusterServer ¶
type ClusterServer struct { v1.UnimplementedClusterServer // contains filtered or unexported fields }
func NewClusterServer ¶
func NewClusterServer(rc RaftCluster) *ClusterServer
func (*ClusterServer) IsNodeLeader ¶
func (c *ClusterServer) IsNodeLeader(ctx context.Context, req *v1.Empty) (*v1.IsNodeLeaderResponse, error)
func (*ClusterServer) Join ¶
func (c *ClusterServer) Join(ctx context.Context, req *v1.JoinRequest) (*v1.Empty, error)
Join joins a node, identified by nodeID and located at addr, to this cluster. The node must be ready to respond to Raft communications at that address.
It is required that the node that this is called into is a leader node.
func (*ClusterServer) Leave ¶
func (c *ClusterServer) Leave(ctx context.Context, req *v1.LeaveRequest) (*v1.Empty, error)
Leave leaves a node, identified by nodeID and located at addr, to this store.
It is required that the node that this is called into is a leader node.
type HealthCheckServer ¶
type HealthCheckServer struct { healthV1.UnimplementedHealthServer // contains filtered or unexported fields }
func NewHealthCheckServer ¶
func NewHealthCheckServer(s ServiceReadiness) *HealthCheckServer
func (*HealthCheckServer) Check ¶
func (h *HealthCheckServer) Check(ctx context.Context, req *healthV1.HealthCheckRequest) (*healthV1.HealthCheckResponse, error)
func (*HealthCheckServer) Watch ¶
func (h *HealthCheckServer) Watch(req *healthV1.HealthCheckRequest, stream healthV1.Health_WatchServer) error
type JSMServer ¶
type JSMServer struct { v1.UnimplementedJobStateMachineServer // contains filtered or unexported fields }
func NewJSMServer ¶
func NewJSMServer(r ReplicatedJsm) *JSMServer
func (*JSMServer) CheckClientState ¶
func (j *JSMServer) CheckClientState(ctx context.Context, req *v1.CheckClientStateRequest) (*v1.CheckClientStateResponse, error)
func (*JSMServer) GetJob ¶
func (j *JSMServer) GetJob(ctx context.Context, req *v1.GetJobRequest) (*v1.GetJobResponse, error)
func (*JSMServer) GetStatsJobYaml ¶
func (j *JSMServer) GetStatsJobYaml(ctx context.Context, req *v1.GetStatsJobYamlRequest) (*v1.GetStatsJobYamlResponse, error)
func (*JSMServer) GetStatsTubeYaml ¶
func (j *JSMServer) GetStatsTubeYaml(ctx context.Context, req *v1.GetStatsTubeYamlRequest) (*v1.GetStatsTubeYamlResponse, error)
func (*JSMServer) GetStatsYaml ¶
func (*JSMServer) KickN ¶
func (j *JSMServer) KickN(ctx context.Context, req *v1.KickNRequest) (*v1.KickNResponse, error)
func (*JSMServer) PeekBuried ¶
func (j *JSMServer) PeekBuried(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
func (*JSMServer) PeekDelayed ¶
func (j *JSMServer) PeekDelayed(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
func (*JSMServer) PeekReady ¶
func (j *JSMServer) PeekReady(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
func (*JSMServer) Put ¶
func (j *JSMServer) Put(ctx context.Context, req *v1.PutRequest) (*v1.PutResponse, error)
func (*JSMServer) Reserve ¶
func (j *JSMServer) Reserve(ctx context.Context, req *v1.ReserveRequest) (*v1.ReserveResponse, error)
func (*JSMServer) RunController ¶
func (j *JSMServer) RunController()
func (*JSMServer) StreamReserveUpdates ¶
func (j *JSMServer) StreamReserveUpdates(req *v1.ReserveUpdateRequest, stream v1.JobStateMachine_StreamReserveUpdatesServer) error
type JsmTick ¶
type JsmTick interface {
Tick() (*v1.TickResponse, error)
}
type ProxyJoinReq ¶
type ProxyJoinReq struct {
// contains filtered or unexported fields
}
type ProxyLeaveReq ¶
type ProxyLeaveReq struct {
// contains filtered or unexported fields
}
type ProxyResp ¶
type ProxyResp struct { RespType ProxyRespType Reservations []*v1.Reservation Err error }
type ProxyRespType ¶
type ProxyRespType int
const ( Unknown ProxyRespType = iota Join Leave Reservation )
func (ProxyRespType) String ¶
func (p ProxyRespType) String() string
type RaftCluster ¶
type RaftCluster interface { // Join, joins this node, identified by nodeID and reachable at addr, // to an existing Raft cluster. Join(nodeID, addr string) error // Leave, leave this specified node, identified by nodeID from // an existing Raft cluster. Leave(nodeID string) error // Returns true if this specified node is a Leader IsLeader() bool // Ask the node to take a snapshot Snapshot() error }
type ReplicatedJsm ¶
type ReplicatedJsm interface { // Apply the provided request ApplyOp(req *v1.ApplyOpRequest) *v1.ApplyOpResponse // Ask the server for the current clock (now in secs) NowSeconds() int64 // Returns true if this node is a leader IsLeader() bool }
type ReservationsController ¶
type ReservationsController struct {
// contains filtered or unexported fields
}
reservationsController provides the ability to stream reservation updates from the job state machine (jsm) back to the connected clients (aka. "proxy" clients)
A high level overview:
┌----------------┐ ┌----------------┐ │ State Proxy │ │ State Proxy │ │ Client │ ...... │ Client │ └----------------┘ └----------------┘
^ ^ | | | | (stream Reservations) | |
┌---------------------------------------------------┐ │ reservationsController │ └---------------------------------------------------┘
| ^ | (every 1s) | Reservations | | V |
┌---------------------------------------------------┐ │ JSM.Tick() │ └---------------------------------------------------┘
func NewReservationsController ¶
func NewReservationsController(jsmTick JsmTick) *ReservationsController
func (*ReservationsController) Register ¶
func (rctrl *ReservationsController) Register(proxyID string) (<-chan *ProxyResp, error)
Register makes a request to add this proxy client (identified by proxyID)
Register returns back a read only channel to receive updates
func (*ReservationsController) Run ¶
func (rctrl *ReservationsController) Run() error
Run, runs this controller. the control loop does not return immediately (unless there is an error)
Run performs the following functions
- Periodically (for every second), queries the underlying JSM (job state machine) for any reservation updates. (These could include newly assigned jobs, timeouts or deadline-soon to any Reservations request). These updates are dispatched to the appropriate client proxy (if they are connected.
- Processes any register (join) or un-register (leave) requests from proxies
func (*ReservationsController) Stop ¶
func (rctrl *ReservationsController) Stop()
func (*ReservationsController) UnRegister ¶
func (rctrl *ReservationsController) UnRegister(proxyID string, respCh <-chan *ProxyResp)
UnRegister makes a request to remove this proxy client (identified by the proxyID)
Additionally, once the unRegister is complete, it drains the response channel
type ServiceReadiness ¶
type ServiceReadiness interface { // Ready indicates if this service is ready to accept traffic Ready() bool }