backend

package
v0.0.0-...-d500d3c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2019 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthMeta = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowMeta   = fmt.Errorf("proto: integer overflow")
)
View Source
var (
	// ErrRunCanceled is returned from the RunResult when a Run is Canceled.  It is used mostly internally.
	ErrRunCanceled = errors.New("run canceled")

	// ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not.
	ErrTaskNotClaimed = errors.New("task not claimed")

	// ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is.
	ErrTaskAlreadyClaimed = errors.New("task already claimed")
)
View Source
var (
	// ErrTaskNotFound indicates no task could be found for given parameters.
	ErrTaskNotFound = errors.New("task not found")

	// ErrUserNotFound is an error for when we can't find a user
	ErrUserNotFound = errors.New("user not found")

	// ErrOrgNotFound is an error for when we can't find an org
	ErrOrgNotFound = errors.New("org not found")

	// ErrManualQueueFull is returned when a manual run request cannot be completed.
	ErrManualQueueFull = errors.New("manual queue at capacity")

	// ErrRunNotFound is returned when searching for a run that doesn't exist.
	ErrRunNotFound = errors.New("run not found")

	// ErrRunNotFinished is returned when a retry is invalid due to the run not being finished yet.
	ErrRunNotFinished = errors.New("run is still in progress")
)

Functions

func NewInMemRunReaderWriter

func NewInMemRunReaderWriter() *runReaderWriter

Types

type CreateTaskRequest

type CreateTaskRequest struct {
	// Owners.
	Org, User platform.ID

	// Script content of the task.
	Script string

	// Unix timestamp (seconds elapsed since January 1, 1970 UTC).
	// The first run of the task will be run according to the earliest time after ScheduleAfter,
	// matching the task's schedul via its cron or every option.
	ScheduleAfter int64

	// The initial task status.
	// If empty, will be treated as DefaultTaskStatus.
	Status TaskStatus
}

CreateTaskRequest encapsulates state of a new task to be created.

type DesiredState

type DesiredState interface {
	// CreateNextRun requests the next run from the desired state, delegating to (*StoreTaskMeta).CreateNextRun.
	// This allows the scheduler to be "dumb" and just tell DesiredState what time the scheduler thinks it is,
	// and the DesiredState will create the appropriate run according to the task's cron schedule,
	// and according to what's in progress and what's been finished.
	//
	// If a Run is requested and the cron schedule says the schedule isn't ready, a RunNotYetDueError is returned.
	CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error)

	// FinishRun indicates that the given run is no longer intended to be executed.
	// This may be called after a successful or failed execution, or upon cancellation.
	FinishRun(ctx context.Context, taskID, runID platform.ID) error
}

DesiredState persists the desired state of a run.

type Executor

type Executor interface {
	// Execute attempts to begin execution of a run.
	// If there is an error invoking execution, that error is returned and RunPromise is nil.
	// TODO(mr): this assumes you can execute a run just from a taskID and a now time.
	// We may need to include the script content in this method signature.
	Execute(ctx context.Context, run QueuedRun) (RunPromise, error)

	// Wait blocks until all RunPromises created through Execute have finished.
	// Once Wait has been called, it is an error to call Execute before Wait has returned.
	// After Wait returns, it is safe to call Execute again.
	Wait()
}

Executor handles execution of a run.

type LogReader

type LogReader interface {
	// ListRuns returns a list of runs belonging to a task.
	ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error)

	// FindRunByID finds a run given a orgID and runID.
	// orgID is necessary to look in the correct system bucket.
	FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error)

	// ListLogs lists logs for a task or a specified run of a task.
	ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error)
}

LogReader reads log information and log data from a store.

type LogWriter

type LogWriter interface {
	// UpdateRunState sets the run state and the respective time.
	UpdateRunState(ctx context.Context, base RunLogBase, when time.Time, state RunStatus) error

	// AddRunLog adds a log line to the run.
	AddRunLog(ctx context.Context, base RunLogBase, when time.Time, log string) error
}

LogWriter writes task logs and task state changes to a store.

type NopLogReader

type NopLogReader struct{}

NopLogWriter is a LogWriter that doesn't do anything when its methods are called. This is useful for test, but not much else.

func (NopLogReader) FindRunByID

func (NopLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error)

func (NopLogReader) ListLogs

func (NopLogReader) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error)

func (NopLogReader) ListRuns

func (NopLogReader) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error)

type NopLogWriter

type NopLogWriter struct{}

NopLogWriter is a LogWriter that doesn't do anything when its methods are called. This is useful for test, but not much else.

func (NopLogWriter) AddRunLog

func (NopLogWriter) UpdateRunState

type PointLogWriter

type PointLogWriter struct {
	// contains filtered or unexported fields
}

PointLogWriter writes task and run logs as time-series points.

func NewPointLogWriter

func NewPointLogWriter(pw PointsWriter) *PointLogWriter

NewPointLogWriter returns a PointLogWriter.

func (*PointLogWriter) AddRunLog

func (p *PointLogWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when time.Time, log string) error

func (*PointLogWriter) UpdateRunState

func (p *PointLogWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error

type PointsWriter

type PointsWriter interface {
	WritePoints(points []models.Point) error
}

Copy of storage.PointsWriter interface. Duplicating it here to avoid having tasks/backend depend directly on storage.

type QueryLogReader

type QueryLogReader struct {
	// contains filtered or unexported fields
}

func NewQueryLogReader

func NewQueryLogReader(qs query.QueryService) *QueryLogReader

func (*QueryLogReader) FindRunByID

func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error)

func (*QueryLogReader) ListLogs

func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error)

func (*QueryLogReader) ListRuns

func (qlr *QueryLogReader) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error)

type QueuedRun

type QueuedRun struct {
	TaskID, RunID platform.ID

	// The Unix timestamp (seconds since January 1, 1970 UTC) that will be set when a run a manually requested
	RequestedAt int64

	// The Unix timestamp (seconds since January 1, 1970 UTC) that will be set
	// as the "now" option when executing the task.
	Now int64
}

QueuedRun is a task run that has been assigned an ID, but whose execution has not necessarily started.

type RequestStillQueuedError

type RequestStillQueuedError struct {
	// Unix timestamps matching existing request's start and end.
	Start, End int64
}

RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.

func ParseRequestStillQueuedError

func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError

ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg. If msg is formatted correctly, the resultant error is returned; otherwise it returns nil.

func (RequestStillQueuedError) Error

func (e RequestStillQueuedError) Error() string

type RunCreation

type RunCreation struct {
	Created QueuedRun

	// Unix timestamp for when the next run is due.
	NextDue int64

	// Whether there are any manual runs queued for this task.
	// If so, the scheduler should begin executing them after handling real-time tasks.
	HasQueue bool
}

RunCreation is returned by CreateNextRun.

type RunLogBase

type RunLogBase struct {
	// The parent task that owns the run.
	Task *StoreTask

	// The ID of the run.
	RunID platform.ID

	// The Unix timestamp indicating the run's scheduled time.
	RunScheduledFor int64

	// When the log is requested, should be ignored when it is zero.
	RequestedAt int64
}

RunLogBase is the base information for a logs about an individual run.

type RunNotYetDueError

type RunNotYetDueError struct {
	// DueAt is the unix timestamp of when the next run is due.
	DueAt int64
}

RunNotYetDueError is returned from CreateNextRun if a run is not yet due.

func (RunNotYetDueError) Error

func (e RunNotYetDueError) Error() string

type RunPromise

type RunPromise interface {
	// Run returns the details about the queued run.
	Run() QueuedRun

	// Wait blocks until the run completes.
	// Wait may be called concurrently.
	// Subsequent calls to Wait will return identical values.
	Wait() (RunResult, error)

	// Cancel interrupts the RunFuture.
	// Calls to Wait() will immediately unblock and return nil, ErrRunCanceled.
	// Cancel is safe to call concurrently.
	// If Wait() has already returned, Cancel is a no-op.
	Cancel()
}

RunPromise represents an in-progress run whose result is not yet known.

type RunResult

type RunResult interface {
	// If the run did not succeed, Err returns the error associated with the run.
	Err() error

	// IsRetryable returns true if the error was non-terminal and the run is eligible for retry.
	IsRetryable() bool
}

type RunStatus

type RunStatus int
const (
	RunStarted RunStatus = iota
	RunSuccess
	RunFail
	RunCanceled
	RunScheduled
)

func (RunStatus) String

func (r RunStatus) String() string

type Scheduler

type Scheduler interface {
	// Start allows the scheduler to Tick. A scheduler without start will do nothing
	Start(ctx context.Context)

	// Stop a scheduler from ticking.
	Stop()

	// ClaimTask begins control of task execution in this scheduler.
	ClaimTask(task *StoreTask, meta *StoreTaskMeta) error

	// UpdateTask will update the concurrency and the runners for a task
	UpdateTask(task *StoreTask, meta *StoreTaskMeta) error

	// ReleaseTask immediately cancels any in-progress runs for the given task ID,
	// and releases any resources related to management of that task.
	ReleaseTask(taskID platform.ID) error

	// Cancel stops an executing run.
	CancelRun(ctx context.Context, taskID, runID platform.ID) error
}

Scheduler accepts tasks and handles their scheduling.

TODO(mr): right now the methods on Scheduler are synchronous. We'll probably want to make them asynchronous in the near future, which likely means we will change the method signatures to something where we can wait for the result to complete and possibly inspect any relevant output.

type Store

type Store interface {
	// CreateTask creates a task with from the given CreateTaskRequest.
	// If the task is created successfully, the ID of the new task is returned.
	CreateTask(ctx context.Context, req CreateTaskRequest) (platform.ID, error)

	// UpdateTask updates an existing task.
	// It returns an error if there was no task matching the given ID.
	// If the returned error is not nil, the returned result should not be inspected.
	UpdateTask(ctx context.Context, req UpdateTaskRequest) (UpdateTaskResult, error)

	// ListTasks lists the tasks in the store that match the search params.
	ListTasks(ctx context.Context, params TaskSearchParams) ([]StoreTaskWithMeta, error)

	// FindTaskByID returns the task with the given ID.
	// If no task matches the ID, the returned task is nil and ErrTaskNotFound is returned.
	FindTaskByID(ctx context.Context, id platform.ID) (*StoreTask, error)

	// FindTaskMetaByID returns the metadata about a task.
	// If no task meta matches the ID, the returned meta is nil and ErrTaskNotFound is returned.
	FindTaskMetaByID(ctx context.Context, id platform.ID) (*StoreTaskMeta, error)

	// FindTaskByIDWithMeta combines finding the task and the meta into a single call.
	FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*StoreTask, *StoreTaskMeta, error)

	// DeleteTask returns whether an entry matching the given ID was deleted.
	// If err is non-nil, deleted is false.
	// If err is nil, deleted is false if no entry matched the ID,
	// or deleted is true if there was a matching entry and it was deleted.
	DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error)

	// CreateNextRun creates the earliest needed run scheduled no later than the given Unix timestamp now.
	// Internally, the Store should rely on the underlying task's StoreTaskMeta to create the next run.
	CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error)

	// FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
	FinishRun(ctx context.Context, taskID, runID platform.ID) error

	// ManuallyRunTimeRange enqueues a request to run the task with the given ID for all schedules no earlier than start and no later than end (Unix timestamps).
	// requestedAt is the Unix timestamp when the request was initiated.
	// ManuallyRunTimeRange must delegate to an underlying StoreTaskMeta's ManuallyRunTimeRange method.
	ManuallyRunTimeRange(ctx context.Context, taskID platform.ID, start, end, requestedAt int64) (*StoreTaskMetaManualRun, error)

	// DeleteOrg deletes the org.
	DeleteOrg(ctx context.Context, orgID platform.ID) error

	// DeleteUser deletes a user with userID.
	DeleteUser(ctx context.Context, userID platform.ID) error

	// Close closes the store for usage and cleans up running processes.
	Close() error
}

Store is the interface around persisted tasks.

func NewInMemStore

func NewInMemStore() Store

NewInMemStore returns a new in-memory store. This store is not designed to be efficient, it is here for testing purposes.

type StoreTask

type StoreTask struct {
	ID platform.ID

	// IDs for the owning organization and user.
	Org, User platform.ID

	// The user-supplied name of the Task.
	Name string

	// The script content of the task.
	Script string
}

StoreTask is a stored representation of a Task.

type StoreTaskMeta

type StoreTaskMeta struct {
	MaxConcurrency int32 `protobuf:"varint,1,opt,name=max_concurrency,json=maxConcurrency,proto3" json:"max_concurrency,omitempty"`
	// latest_completed is the unix timestamp of the latest "naturally" completed run.
	// If a run for time t finishes before a run for time t - u, latest_completed will reflect time t.
	LatestCompleted int64 `protobuf:"varint,2,opt,name=latest_completed,json=latestCompleted,proto3" json:"latest_completed,omitempty"`
	// status indicates if the task is enabled or disabled.
	Status string `protobuf:"bytes,3,opt,name=status,proto3" json:"status,omitempty"`
	// currently_running is the collection of runs in-progress.
	// If a runner crashes or otherwise disappears, this indicates to the new runner what needs to be picked up.
	CurrentlyRunning []*StoreTaskMetaRun `protobuf:"bytes,4,rep,name=currently_running,json=currentlyRunning" json:"currently_running,omitempty"`
	// effective_cron is the effective cron string as reported by the task's options.
	EffectiveCron string `protobuf:"bytes,5,opt,name=effective_cron,json=effectiveCron,proto3" json:"effective_cron,omitempty"`
	// Task's configured delay, in seconds.
	Offset               int32                     `protobuf:"varint,6,opt,name=offset,proto3" json:"offset,omitempty"`
	ManualRuns           []*StoreTaskMetaManualRun `protobuf:"bytes,16,rep,name=manual_runs,json=manualRuns" json:"manual_runs,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                  `json:"-"`
	XXX_sizecache        int32                     `json:"-"`
}

StoreTaskMeta is the internal state of a task.

func NewStoreTaskMeta

func NewStoreTaskMeta(req CreateTaskRequest, o options.Options) StoreTaskMeta

NewStoreTaskMeta returns a new StoreTaskMeta based on the given request and parsed options.

func (*StoreTaskMeta) CreateNextRun

func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, error)) (RunCreation, error)

CreateNextRun attempts to update stm's CurrentlyRunning slice with a new run. The new run's now is assigned the earliest possible time according to stm.EffectiveCron, that is later than any in-progress run and stm's LatestCompleted timestamp. If the run's now would be later than the passed-in now, CreateNextRun returns a RunNotYetDueError.

makeID is a function provided by the caller to create an ID, in case we can create a run. Because a StoreTaskMeta doesn't know the ID of the task it belongs to, it never sets RunCreation.Created.TaskID.

func (*StoreTaskMeta) Descriptor

func (*StoreTaskMeta) Descriptor() ([]byte, []int)

func (StoreTaskMeta) Equal

func (stm StoreTaskMeta) Equal(other StoreTaskMeta) bool

Equal returns true if all of stm's fields compare equal to other. Note that this method operates on values, unlike the other methods which operate on pointers.

Equal is probably not very useful outside of test.

func (*StoreTaskMeta) FinishRun

func (stm *StoreTaskMeta) FinishRun(runID platform.ID) bool

FinishRun removes the run matching runID from m's CurrentlyRunning slice, and if that run's Now value is greater than m's LatestCompleted value, updates the value of LatestCompleted to the run's Now value.

If runID matched a run, FinishRun returns true. Otherwise it returns false.

func (*StoreTaskMeta) GetCurrentlyRunning

func (m *StoreTaskMeta) GetCurrentlyRunning() []*StoreTaskMetaRun

func (*StoreTaskMeta) GetEffectiveCron

func (m *StoreTaskMeta) GetEffectiveCron() string

func (*StoreTaskMeta) GetLatestCompleted

func (m *StoreTaskMeta) GetLatestCompleted() int64

func (*StoreTaskMeta) GetManualRuns

func (m *StoreTaskMeta) GetManualRuns() []*StoreTaskMetaManualRun

func (*StoreTaskMeta) GetMaxConcurrency

func (m *StoreTaskMeta) GetMaxConcurrency() int32

func (*StoreTaskMeta) GetOffset

func (m *StoreTaskMeta) GetOffset() int32

func (*StoreTaskMeta) GetStatus

func (m *StoreTaskMeta) GetStatus() string

func (*StoreTaskMeta) ManuallyRunTimeRange

func (stm *StoreTaskMeta) ManuallyRunTimeRange(start, end, requestedAt int64, makeID func() (platform.ID, error)) error

ManuallyRunTimeRange requests a manual run covering the approximate range specified by the Unix timestamps start and end. More specifically, it requests runs scheduled no earlier than start, but possibly later than start, if start does not land on the task's schedule; and as late as, but not necessarily equal to, end. requestedAt is the Unix timestamp indicating when this run range was requested.

There is no schedule validation in this method, so ManuallyRunTimeRange can be used to create a run at a specific time that isn't aligned with the task's schedule.

If adding the range would exceed the queue size, ManuallyRunTimeRange returns ErrManualQueueFull.

func (*StoreTaskMeta) Marshal

func (m *StoreTaskMeta) Marshal() (dAtA []byte, err error)

func (*StoreTaskMeta) MarshalTo

func (m *StoreTaskMeta) MarshalTo(dAtA []byte) (int, error)

func (*StoreTaskMeta) NextDueRun

func (stm *StoreTaskMeta) NextDueRun() (int64, error)

NextDueRun returns the Unix timestamp of when the next call to CreateNextRun will be ready. The returned timestamp reflects the task's delay, so it does not necessarily exactly match the schedule time.

func (*StoreTaskMeta) ProtoMessage

func (*StoreTaskMeta) ProtoMessage()

func (*StoreTaskMeta) Reset

func (m *StoreTaskMeta) Reset()

func (*StoreTaskMeta) Size

func (m *StoreTaskMeta) Size() (n int)

func (*StoreTaskMeta) String

func (m *StoreTaskMeta) String() string

func (*StoreTaskMeta) Unmarshal

func (m *StoreTaskMeta) Unmarshal(dAtA []byte) error

func (*StoreTaskMeta) XXX_DiscardUnknown

func (m *StoreTaskMeta) XXX_DiscardUnknown()

func (*StoreTaskMeta) XXX_Marshal

func (m *StoreTaskMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*StoreTaskMeta) XXX_Merge

func (dst *StoreTaskMeta) XXX_Merge(src proto.Message)

func (*StoreTaskMeta) XXX_Size

func (m *StoreTaskMeta) XXX_Size() int

func (*StoreTaskMeta) XXX_Unmarshal

func (m *StoreTaskMeta) XXX_Unmarshal(b []byte) error

type StoreTaskMetaManualRun

type StoreTaskMetaManualRun struct {
	// start is the earliest allowable unix time stamp for this queue of runs.
	Start int64 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"`
	// end is the latest allowable unix time stamp for this queue of runs.
	End int64 `protobuf:"varint,2,opt,name=end,proto3" json:"end,omitempty"`
	// latest_completed is the timestamp of the latest completed run from this queue.
	LatestCompleted int64 `protobuf:"varint,3,opt,name=latest_completed,json=latestCompleted,proto3" json:"latest_completed,omitempty"`
	// requested_at is the unix timestamp indicating when this run was requested.
	RequestedAt int64 `protobuf:"varint,4,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"`
	// run_id is set ahead of time for retries of individual runs. Manually run time ranges do not receive an ID.
	RunID                uint64   `protobuf:"varint,5,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

StoreTaskMetaManualRun indicates a manually requested run for a time range. It has a start and end pair of unix timestamps indicating the time range covered by the request.

func (*StoreTaskMetaManualRun) Descriptor

func (*StoreTaskMetaManualRun) Descriptor() ([]byte, []int)

func (*StoreTaskMetaManualRun) GetEnd

func (m *StoreTaskMetaManualRun) GetEnd() int64

func (*StoreTaskMetaManualRun) GetLatestCompleted

func (m *StoreTaskMetaManualRun) GetLatestCompleted() int64

func (*StoreTaskMetaManualRun) GetRequestedAt

func (m *StoreTaskMetaManualRun) GetRequestedAt() int64

func (*StoreTaskMetaManualRun) GetRunID

func (m *StoreTaskMetaManualRun) GetRunID() uint64

func (*StoreTaskMetaManualRun) GetStart

func (m *StoreTaskMetaManualRun) GetStart() int64

func (*StoreTaskMetaManualRun) Marshal

func (m *StoreTaskMetaManualRun) Marshal() (dAtA []byte, err error)

func (*StoreTaskMetaManualRun) MarshalTo

func (m *StoreTaskMetaManualRun) MarshalTo(dAtA []byte) (int, error)

func (*StoreTaskMetaManualRun) ProtoMessage

func (*StoreTaskMetaManualRun) ProtoMessage()

func (*StoreTaskMetaManualRun) Reset

func (m *StoreTaskMetaManualRun) Reset()

func (*StoreTaskMetaManualRun) Size

func (m *StoreTaskMetaManualRun) Size() (n int)

func (*StoreTaskMetaManualRun) String

func (m *StoreTaskMetaManualRun) String() string

func (*StoreTaskMetaManualRun) Unmarshal

func (m *StoreTaskMetaManualRun) Unmarshal(dAtA []byte) error

func (*StoreTaskMetaManualRun) XXX_DiscardUnknown

func (m *StoreTaskMetaManualRun) XXX_DiscardUnknown()

func (*StoreTaskMetaManualRun) XXX_Marshal

func (m *StoreTaskMetaManualRun) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*StoreTaskMetaManualRun) XXX_Merge

func (dst *StoreTaskMetaManualRun) XXX_Merge(src proto.Message)

func (*StoreTaskMetaManualRun) XXX_Size

func (m *StoreTaskMetaManualRun) XXX_Size() int

func (*StoreTaskMetaManualRun) XXX_Unmarshal

func (m *StoreTaskMetaManualRun) XXX_Unmarshal(b []byte) error

type StoreTaskMetaRun

type StoreTaskMetaRun struct {
	// now is the unix timestamp of the "now" value for the run.
	Now   int64  `protobuf:"varint,1,opt,name=now,proto3" json:"now,omitempty"`
	Try   uint32 `protobuf:"varint,2,opt,name=try,proto3" json:"try,omitempty"`
	RunID uint64 `protobuf:"varint,3,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"`
	// range_start is the start of the manual run's time range.
	RangeStart int64 `protobuf:"varint,4,opt,name=range_start,json=rangeStart,proto3" json:"range_start,omitempty"`
	// range_end is the end of the manual run's time range.
	RangeEnd int64 `protobuf:"varint,5,opt,name=range_end,json=rangeEnd,proto3" json:"range_end,omitempty"`
	// requested_at is the unix timestamp indicating when this run was requested.
	// It is the same value as the "parent" StoreTaskMetaManualRun, if this run was the result of a manual request.
	RequestedAt          int64    `protobuf:"varint,6,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*StoreTaskMetaRun) Descriptor

func (*StoreTaskMetaRun) Descriptor() ([]byte, []int)

func (*StoreTaskMetaRun) GetNow

func (m *StoreTaskMetaRun) GetNow() int64

func (*StoreTaskMetaRun) GetRangeEnd

func (m *StoreTaskMetaRun) GetRangeEnd() int64

func (*StoreTaskMetaRun) GetRangeStart

func (m *StoreTaskMetaRun) GetRangeStart() int64

func (*StoreTaskMetaRun) GetRequestedAt

func (m *StoreTaskMetaRun) GetRequestedAt() int64

func (*StoreTaskMetaRun) GetRunID

func (m *StoreTaskMetaRun) GetRunID() uint64

func (*StoreTaskMetaRun) GetTry

func (m *StoreTaskMetaRun) GetTry() uint32

func (*StoreTaskMetaRun) Marshal

func (m *StoreTaskMetaRun) Marshal() (dAtA []byte, err error)

func (*StoreTaskMetaRun) MarshalTo

func (m *StoreTaskMetaRun) MarshalTo(dAtA []byte) (int, error)

func (*StoreTaskMetaRun) ProtoMessage

func (*StoreTaskMetaRun) ProtoMessage()

func (*StoreTaskMetaRun) Reset

func (m *StoreTaskMetaRun) Reset()

func (*StoreTaskMetaRun) Size

func (m *StoreTaskMetaRun) Size() (n int)

func (*StoreTaskMetaRun) String

func (m *StoreTaskMetaRun) String() string

func (*StoreTaskMetaRun) Unmarshal

func (m *StoreTaskMetaRun) Unmarshal(dAtA []byte) error

func (*StoreTaskMetaRun) XXX_DiscardUnknown

func (m *StoreTaskMetaRun) XXX_DiscardUnknown()

func (*StoreTaskMetaRun) XXX_Marshal

func (m *StoreTaskMetaRun) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*StoreTaskMetaRun) XXX_Merge

func (dst *StoreTaskMetaRun) XXX_Merge(src proto.Message)

func (*StoreTaskMetaRun) XXX_Size

func (m *StoreTaskMetaRun) XXX_Size() int

func (*StoreTaskMetaRun) XXX_Unmarshal

func (m *StoreTaskMetaRun) XXX_Unmarshal(b []byte) error

type StoreTaskWithMeta

type StoreTaskWithMeta struct {
	Task StoreTask
	Meta StoreTaskMeta
}

StoreTaskWithMeta is a single struct with a StoreTask and a StoreTaskMeta.

type StoreValidation

type StoreValidation struct{}

StoreValidation is used for namespacing the store validation methods.

var StoreValidator StoreValidation

StoreValidator is a package-level StoreValidation, so that you can write

backend.StoreValidator.CreateArgs(...)

func (StoreValidation) CreateArgs

CreateArgs returns the script's parsed options, and an error if any of the provided fields are invalid for creating a task.

func (StoreValidation) UpdateArgs

UpdateArgs validates the UpdateTaskRequest. If the update only includes a new status (i.e. req.Script is empty), the returned options are zero. If the update contains neither a new script nor a new status, or if the script is invalid, an error is returned.

type TaskSearchParams

type TaskSearchParams struct {
	// Return tasks belonging to this exact organization ID. May be nil.
	Org platform.ID

	// Return tasks belonging to this exact user ID. May be nil.
	User platform.ID

	// Return tasks starting after this ID.
	After platform.ID

	// Size of each page. Must be non-negative.
	// If zero, the implementation picks an appropriate default page size.
	// Valid page sizes are implementation-dependent.
	PageSize int
}

TaskSearchParams is used when searching or listing tasks.

type TaskStatus

type TaskStatus string
const (
	TaskActive   TaskStatus = "active"
	TaskInactive TaskStatus = "inactive"

	DefaultTaskStatus TaskStatus = TaskActive
)

type TickScheduler

type TickScheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(desiredState DesiredState, executor Executor, lw LogWriter, now int64, opts ...TickSchedulerOption) *TickScheduler

NewScheduler returns a new scheduler with the given desired state and the given now UTC timestamp.

func (*TickScheduler) CancelRun

func (s *TickScheduler) CancelRun(_ context.Context, taskID, runID platform.ID) error

CancelRun cancels a run, it has the unused Context argument so that it can implement a task.RunController

func (*TickScheduler) ClaimTask

func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err error)

func (*TickScheduler) PrometheusCollectors

func (s *TickScheduler) PrometheusCollectors() []prometheus.Collector

func (*TickScheduler) ReleaseTask

func (s *TickScheduler) ReleaseTask(taskID platform.ID) error

func (*TickScheduler) Start

func (s *TickScheduler) Start(ctx context.Context)

func (*TickScheduler) Stop

func (s *TickScheduler) Stop()

func (*TickScheduler) Tick

func (s *TickScheduler) Tick(now int64)

Tick updates the time of the scheduler. Any owned tasks who are due to execute and who have a free concurrency slot, will begin a new execution.

func (*TickScheduler) UpdateTask

func (s *TickScheduler) UpdateTask(task *StoreTask, meta *StoreTaskMeta) error

type TickSchedulerOption

type TickSchedulerOption func(*TickScheduler)

TickSchedulerOption is a option you can use to modify the schedulers behavior.

func WithLogger

func WithLogger(logger *zap.Logger) TickSchedulerOption

WithLogger sets the logger for the scheduler. If not set, the scheduler will use a no-op logger.

func WithTicker

func WithTicker(ctx context.Context, d time.Duration) TickSchedulerOption

WithTicker sets a time.Ticker with period d, and calls TickScheduler.Tick when the ticker rolls over to a new second. With a sub-second d, TickScheduler.Tick should be called roughly no later than d after a second: this can help ensure tasks happen early with a second window.

type UpdateTaskRequest

type UpdateTaskRequest struct {
	// ID of the task.
	ID platform.ID

	// New script content of the task.
	// If empty, do not modify the existing script.
	Script string

	// The new desired task status.
	// If empty, do not modify the existing status.
	Status TaskStatus
}

UpdateTaskRequest encapsulates requested changes to a task.

type UpdateTaskResult

type UpdateTaskResult struct {
	OldScript string
	OldStatus TaskStatus

	NewTask StoreTask
	NewMeta StoreTaskMeta
}

UpdateTaskResult describes the result of modifying a single task. Having the content returned from ModifyTask makes it much simpler for callers to decide how to notify on status changes, etc.

Directories

Path Synopsis
Package bolt provides an bolt-backed store implementation.
Package bolt provides an bolt-backed store implementation.
Package executor contains implementations of backend.Executor that depend on the query service.
Package executor contains implementations of backend.Executor that depend on the query service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL