nodes

package
v0.0.0-...-d60fe65 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2019 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BeesBaseNode = "/bees"

	PollTimeout           = 60 * time.Second
	BatchBackoffTimeoutMS = 1000

	ForwardTimeout = 10 * time.Second
)

Variables

View Source
var (
	ErrInvalidClusterID        = errors.New("invalid cluster id")
	ErrInvalidZKEndpoints      = errors.New("invalid zookeeper endpoints")
	ErrInvalidMasterID         = errors.New("invalid master id")
	ErrInvalidWorkerID         = errors.New("invalid worker id")
	ErrInvalidTaskID           = errors.New("invalid task id")
	ErrNoWorker                = errors.New("no such worker")
	ErrNoAvailableWorker       = errors.New("no available worker")
	ErrWorkerResourceExhausted = errors.New("worker resource exhausted")
	ErrNoTask                  = errors.New("no such task")
	ErrTaskExists              = errors.New("task exists")
	ErrBadAddress              = errors.New("bad address")
)

Functions

func DiffTasksState

func DiffTasksState(old DesiredState, new DesiredState) ([]string, []string, []string)

DiffTasksState picks out what added/deleted/changed between old DesiredState and new DesiredState.

func DiffWorkersState

func DiffWorkersState(old WorkerView, new WorkerView) ([]string, []string, []string)

DiffWorkersState picks out what added/deleted/changed between old WorkerView and new WorkerView.

Types

type ClusterConfig

type ClusterConfig struct {
	ID          string   `json:"cluster_id"`
	ZKEndpoints []string `json:"zk_endpoints"`
}

ClusterConfig denotes the cluster config.

func LoadClusterConfig

func LoadClusterConfig(filename string) (*ClusterConfig, error)

LoadClusterConfig loads cluster config from target file.

func (*ClusterConfig) GetAssignPath

func (c *ClusterConfig) GetAssignPath() string

GetAssignPath gets "/bees/{cluster_id}/assign".

func (*ClusterConfig) GetBeesPath

func (c *ClusterConfig) GetBeesPath() string

GetBeesPath gets "/bees".

func (*ClusterConfig) GetClusterPath

func (c *ClusterConfig) GetClusterPath() string

GetClusterPath gets "/bees/{cluster_id}".

func (*ClusterConfig) GetLeaderPath

func (c *ClusterConfig) GetLeaderPath() string

GetLeaderPath gets "/bees/{cluster_id}/leader".

func (*ClusterConfig) GetMasterPath

func (c *ClusterConfig) GetMasterPath() string

GetMasterPath gets "/bees/{cluster_id}/master".

func (*ClusterConfig) GetMyTaskPath

func (c *ClusterConfig) GetMyTaskPath(id string) string

GetMyTaskPath gets "/bees/{cluster_id}/tasks/{task_id}".

func (*ClusterConfig) GetMyTaskStatusPath

func (c *ClusterConfig) GetMyTaskStatusPath(id string) string

GetMyTaskStatusPath gets "/bees/{cluster_id}/tasks/{task_id}/status".

func (*ClusterConfig) GetMyWorkerPath

func (c *ClusterConfig) GetMyWorkerPath(id string) string

GetMyWorkerPath gets "/bees/{cluster_id}/workers/{worker_id}".

func (*ClusterConfig) GetNotifierPath

func (c *ClusterConfig) GetNotifierPath() string

GetNotifierPath gets "/bees/{cluster_id}/notifier".

func (*ClusterConfig) GetTaskAssignPath

func (c *ClusterConfig) GetTaskAssignPath(id string) string

GetTaskAssignPath gets "/bees/{cluster_id}/assign/{worker_id}/{task_id}".

func (*ClusterConfig) GetTasksPath

func (c *ClusterConfig) GetTasksPath() string

GetTasksPath gets "/bees/{cluster_id}/tasks".

func (*ClusterConfig) GetWorkerAssignPath

func (c *ClusterConfig) GetWorkerAssignPath(id string) string

GetWorkerAssignPath gets "/bees/{cluster_id}/assign/{worker_id}".

func (*ClusterConfig) GetWorkersPath

func (c *ClusterConfig) GetWorkersPath() string

GetWorkersPath gets "/bees/{cluster_id}/workers".

func (*ClusterConfig) Validate

func (c *ClusterConfig) Validate() error

Validate validates the cluster config.

type Crawler

type Crawler struct {
}

Crawler implements crawl task which acts as spider.

func (*Crawler) Implement

func (c *Crawler) Implement(task *pb.TaskSpec) (*pb.TaskSpec, error)

Implement implements the crawl task.

type DesiredState

type DesiredState map[string]pb.TaskSpec

type Forwarder

type Forwarder struct {
	// contains filtered or unexported fields
}

Forwarder forwards the request to the next opened grpc connection.

func NewForwarder

func NewForwarder(opts ...grpc.DialOption) *Forwarder

NewForwarder creates a new forwarder instance.

func (*Forwarder) Close

func (f *Forwarder) Close() error

Close closes last opened grpc connection.

func (*Forwarder) Connnect

func (f *Forwarder) Connnect(addr string) (*grpc.ClientConn, error)

Connnect creates next opened grpc connection.

type Implementer

type Implementer interface {
	Implement(task *pb.TaskSpec) (*pb.TaskSpec, error)
}

Implementer interface provides strategy to implement task.

type Master

type Master struct {
	// contains filtered or unexported fields
}

Master provides a task scheduler with HA and auto reconcile support.

func NewMaster

func NewMaster(conf *ClusterConfig, id string) (*Master, error)

NewMaster creates a new master instance.

func (*Master) AddTask

func (m *Master) AddTask(req *pb.TaskSpec) (*pb.WorkerSpec, error)

AddTask adds a new task.

func (*Master) DeleteTask

func (m *Master) DeleteTask(taskID string) error

DeleteTask deletes a specific task.

func (*Master) GetTask

func (m *Master) GetTask(taskID string, loadStatus bool) (*pb.TaskSpec, error)

GetTask gets a specific task.

func (*Master) GetWorker

func (m *Master) GetWorker(id string) (*pb.WorkerSpec, error)

GetWorker gets a specific worker.

func (*Master) ListTasks

func (m *Master) ListTasks(workerID string, loadStatus bool) ([]*pb.TaskSpec, error)

ListTasks lists all tasks.

func (*Master) ListWorkers

func (m *Master) ListWorkers() ([]*pb.WorkerSpec, error)

ListWorkers lists all workers.

func (*Master) Run

func (m *Master) Run()

Run runs master main loop.

func (*Master) Stop

func (m *Master) Stop()

Stop stops the manager, and waits until main loop exits.

type NotLeaderError

type NotLeaderError struct {
	Leader string
}

func (*NotLeaderError) Error

func (e *NotLeaderError) Error() string

type QuotaBasedScheduler

type QuotaBasedScheduler struct {
}

QuotaBasedScheduler schedules task to worker with corresponding labels and quotas not exceeded.

func (*QuotaBasedScheduler) Assign

func (qbs *QuotaBasedScheduler) Assign(workers WorkerView, task *pb.TaskSpec) (*pb.WorkerSpec, error)

Assign assigns task to a worker with low workload.

func (*QuotaBasedScheduler) Rebalance

func (qbs *QuotaBasedScheduler) Rebalance(workers WorkerView, tasks TaskView) (map[string]string, error)

Rebalance reblances tasks among workers.

type Scheduler

type Scheduler interface {
	Assign(workers WorkerView, task *pb.TaskSpec) (*pb.WorkerSpec, error)
	Rebalance(workers WorkerView, tasks TaskView) (map[string]string, error)
}

Scheduler interface provides strategy to assign task to a worker.

type Score

type Score int
const (
	WorkloadBasedScore Score = 1
	QuotaBasedScore    Score = 2
)

type TaskView

type TaskView map[string]pb.TaskSpec

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a worker with given labels and resource quotas.

func NewWorker

func NewWorker(conf *ClusterConfig, spec *pb.WorkerSpec) (*Worker, error)

NewWorker creates a new worker instance.

func (*Worker) GetDesiredState

func (w *Worker) GetDesiredState() <-chan DesiredState

GetDesiredState returns a channel which provides desired task states for current worker. It's caller's responsibility to calculate state diffs and change real world state.

func (*Worker) Run

func (w *Worker) Run()

Run runs worker main loop which watches task assignment changes.

func (*Worker) SetTaskStatus

func (w *Worker) SetTaskStatus(id string, status *pb.TaskStatus) error

SetTaskStatus sets user-defined status, which can be read by master. Note that status is bound to current worker, and will be invalidated on task reassignment.

func (*Worker) Stop

func (w *Worker) Stop()

Stop stops the worker, and waits until main loop exits.

type WorkerView

type WorkerView map[string]pb.WorkerSpec

type WorkloadBasedScheduler

type WorkloadBasedScheduler struct {
}

WorkloadBasedScheduler schedules task to worker with corresponding labels and low workload.

func (*WorkloadBasedScheduler) Assign

func (wbs *WorkloadBasedScheduler) Assign(workers WorkerView, task *pb.TaskSpec) (*pb.WorkerSpec, error)

Assign assigns task to a worker with low workload.

func (*WorkloadBasedScheduler) Rebalance

func (wbs *WorkloadBasedScheduler) Rebalance(workers WorkerView, tasks TaskView) (map[string]string, error)

Rebalance reblances tasks among workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL