dispatch

package
v0.0.0-...-27e8298 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2020 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DependentExecutionEvent = iota
	JobExecutionEvent
	AgentDownEvent
	AgentUpEvent
)

Variables

View Source
var (
	// AgentManager Agents manager
	AgentManager Agents

	// JobExecutions Job execution results
	JobExecutions Execution

	// JobDispatchers Job dispatcher manager
	JobDispatchers *JobDispatcherManager
)

Functions

func Leader

func Leader() string

Leader 原子获取 Leader 节点

func Run

func Run(pctx context.Context)

Run 运行 Dispatch 实例 Dispatch 只能有一个主,需要先选举

Types

type AgentExecution

type AgentExecution struct {
	// contains filtered or unexported fields
}

AgentExectuion 执行结果处理

func NewAgentExecution

func NewAgentExecution(pctx context.Context, jd Dispatcher) *AgentExecution

NewAgentExecution 新建执行结果处理程序 每个 JobDispatcher 一个

func (*AgentExecution) Serve

func (agentExec *AgentExecution) Serve()

Serve 开始处理服务 Watch 新执行结果

type AgentMonitor

type AgentMonitor struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*AgentMonitor) Check

func (m *AgentMonitor) Check(agent string) bool

func (*AgentMonitor) Get

func (m *AgentMonitor) Get(tag string) map[string]string

type AgentUpdate

type AgentUpdate interface {
	AgentUp(node string, tags []string)
	AgentDown(node string, tags []string)
}

type Agents

type Agents interface {
	Get(tag string) map[string]string
	Check(agent string) bool
}

func NewAgentMonitor

func NewAgentMonitor(pctx context.Context, callback AgentUpdate) Agents

type Dispatcher

type Dispatcher interface {
	// agent node up/down notification
	AgentUp(string)
	AgentDown(string)

	// dispatcher properties Name() string
	Name() string
	Concurrency() int

	// operations
	Serve()
	Stop()

	// agent/job results notification
	DependencyCompleted(*JobExecutionResult)
	AgentsCompleted(*JobExecutionResult)
}

func NewJobDispatcher

func NewJobDispatcher(ctx context.Context, job *logic.JobConfig) Dispatcher

type DispatcherEvent

type DispatcherEvent struct {
	EventType int
	EventData interface{}
}

type Execution

type Execution interface {
	// Get jobs execution group id
	GetMaxGroupIds([]string) map[string]int64
	// Job complete notify
	JobCompleted(*JobExecutionResult)

	JobDeleted(string)

	// Subscribe/Unsubscribe job complete events
	Subscribe(Dispatcher, []string)
	Unsubscribe(Dispatcher, []string)
}

func NewJobExecution

func NewJobExecution(pctx context.Context) Execution

type JobDispatchState

type JobDispatchState struct {
	// job dependents state
	Dependents map[string]int64 `json:"dependents"`

	// job running agents state
	// map[node]map[dispatch id]ok
	RunningAgents map[string]map[string]bool `json:"running_agents"`

	// job all completed state
	// will not dispatch again
	JobCompleted bool `json:"job_completed"`
}

func (*JobDispatchState) RunningCount

func (jds *JobDispatchState) RunningCount() int

func (*JobDispatchState) ToString

func (jds *JobDispatchState) ToString() (string, error)

type JobDispatcher

type JobDispatcher struct {
	// contains filtered or unexported fields
}

func (*JobDispatcher) AgentDown

func (jd *JobDispatcher) AgentDown(node string)

func (*JobDispatcher) AgentUp

func (jd *JobDispatcher) AgentUp(node string)

func (*JobDispatcher) AgentsCompleted

func (jd *JobDispatcher) AgentsCompleted(res *JobExecutionResult)

func (*JobDispatcher) Concurrency

func (jd *JobDispatcher) Concurrency() int

func (*JobDispatcher) DependencyCompleted

func (jd *JobDispatcher) DependencyCompleted(res *JobExecutionResult)

func (*JobDispatcher) Name

func (jd *JobDispatcher) Name() string

func (*JobDispatcher) Serve

func (jd *JobDispatcher) Serve()

func (*JobDispatcher) Stop

func (jd *JobDispatcher) Stop()

stop job dispatcher

type JobDispatcherManager

type JobDispatcherManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewJobDispatcherManager

func NewJobDispatcherManager(pctx context.Context) *JobDispatcherManager

func (*JobDispatcherManager) AgentDown

func (mgr *JobDispatcherManager) AgentDown(node string, tags []string)

func (*JobDispatcherManager) AgentUp

func (mgr *JobDispatcherManager) AgentUp(node string, tags []string)

func (*JobDispatcherManager) Serve

func (mgr *JobDispatcherManager) Serve()

type JobExecution

type JobExecution struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*JobExecution) GetMaxGroupIds

func (jobExec *JobExecution) GetMaxGroupIds(names []string) map[string]int64

func (*JobExecution) JobCompleted

func (jobExec *JobExecution) JobCompleted(res *JobExecutionResult)

func (*JobExecution) JobDeleted

func (jobExec *JobExecution) JobDeleted(jobName string)

func (*JobExecution) Subscribe

func (jobExec *JobExecution) Subscribe(jobDisp Dispatcher, dependents []string)

func (*JobExecution) Unsubscribe

func (jobExec *JobExecution) Unsubscribe(jobDisp Dispatcher, dependents []string)

type JobExecutionResult

type JobExecutionResult struct {
	logic.ExecutionResult

	DispatchIDs []string `json:"dispatch_ids"`

	// Number of successful executions of this job.
	SuccessCount int `json:"success_count"`

	// Number of errors running this job.
	ErrorCount int `json:"error_count"`

	// Last time this job executed successful.
	LastSuccess time.Time `json:"last_success"`

	// Last time this job failed.
	LastError time.Time `json:"last_error"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL