job

package
v0.0.0-...-ea99318 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknownTopic  = errors.New("unknown topic")
	ErrTasksComplete = errors.New("tasks complete")
)
View Source
var ErrJobNotFound = errors.New("job not found")

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	Name string `dynamo:"Name" json:"name"`

	Output string `dynamo:"Output" json:"output"`
}

func NewCheckpoint

func NewCheckpoint(name, output string) *Checkpoint

type Checkpoints

type Checkpoints []Checkpoint

func NewCheckpoints

func NewCheckpoints() Checkpoints

func (Checkpoints) GetCheckpoint

func (checkpoints Checkpoints) GetCheckpoint(name string) (index int, checkpoint *Checkpoint)

func (Checkpoints) SetCheckpoint

func (checkpoints Checkpoints) SetCheckpoint(checkpoint *Checkpoint) Checkpoints

type DynamoClient

type DynamoClient interface {
	PutJob(ctx context.Context, job *Job) error

	GetJob(ctx context.Context, jobID string) (*Job, error)
}

func NewDynamoClient

func NewDynamoClient(awsSession *session.Session, table string) DynamoClient

type Job

type Job struct {
	ID string `dynamo:"ID" json:"id"`

	ResourceID string `dynamo:"ResourceID" json:"resource_id"`

	StartTime time.Time `dynamo:"StartTime" json:"start_time"`

	EndTime time.Time `dynamo:"EndTime" json:"end_time"`

	Status string `dynamo:"Status"`

	Checkpoints Checkpoints `dynamo:"Checkpoints" json:"checkpoints"`

	Error string `dynamo:"Error" json:"error"`

	Retries int `dynamo:"Retries" json:"retries"`

	Tasks Tasks `dynamo:"Tasks" json:"tasks"`

	CreatedAt time.Time `dynamo:"CreatedAt" json:"created_at"`

	UpdatedAt time.Time `dynamo:"UpdatedAt" json:"updated_at"`
	// contains filtered or unexported fields
}

func NewJob

func NewJob(resourceID string, maxRetries int) *Job

func (*Job) CompleteJob

func (job *Job) CompleteJob() *Job

func (*Job) IsRetryable

func (job *Job) IsRetryable() bool

func (*Job) LogError

func (job *Job) LogError(err error) *Job

func (*Job) MarkAsUpdated

func (job *Job) MarkAsUpdated() *Job

func (*Job) ResetRetries

func (job *Job) ResetRetries() *Job

func (*Job) SetCheckpoints

func (job *Job) SetCheckpoints(checkpoints Checkpoints) *Job

func (*Job) SetTasks

func (job *Job) SetTasks(tasks Tasks) *Job

type JobIDConsumer

type JobIDConsumer struct {
	// contains filtered or unexported fields
}

func (*JobIDConsumer) Cleanup

func (consumer *JobIDConsumer) Cleanup(sarama.ConsumerGroupSession) error

func (*JobIDConsumer) ConsumeClaim

func (consumer *JobIDConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*JobIDConsumer) Setup

type KafkaClient

type KafkaClient interface {
	PublishMessage(topic string, payload any) error

	UpsertTopic(name string, numPartitions int32, msgRetention time.Duration) error

	DeleteTopic(topic string) error
}

func NewKafkaClient

func NewKafkaClient(
	brokers []string,
	clientID string,
	version sarama.KafkaVersion,
	username string,
	password string,
) (KafkaClient, error)

type Task

type Task struct {
	ID string `dynamo:"ID" json:"id"`

	Data string `dynamo:"Data" json:"data"`

	StartTime time.Time `dynamo:"StartTime" json:"start_time"`

	EndTime time.Time `dynamo:"EndTime" json:"end_time"`

	Status string `dynamo:"Status" json:"status"`

	Sent bool `dynamo:"Sent" json:"sent"`

	Checkpoints Checkpoints `dynamo:"Checkpoints" json:"checkpoints"`

	Error string `dynamo:"Error" json:"error"`

	Retries int `dynamo:"Retries" json:"retries"`
	// contains filtered or unexported fields
}

func NewTask

func NewTask(data string, maxRetries int) *Task

func (*Task) CompleteTask

func (task *Task) CompleteTask() *Task

func (*Task) IsRetryable

func (task *Task) IsRetryable() bool

func (*Task) LogError

func (task *Task) LogError(err error) *Task

func (*Task) MarkAsSent

func (task *Task) MarkAsSent() *Task

func (*Task) ResetRetries

func (task *Task) ResetRetries() *Task

func (*Task) SetCheckpoints

func (task *Task) SetCheckpoints(checkpoints Checkpoints) *Task

type Tasks

type Tasks []Task

func NewTasks

func NewTasks() Tasks

func (Tasks) GetTask

func (tasks Tasks) GetTask(id string) (index int, task *Task)

func (Tasks) SetTask

func (tasks Tasks) SetTask(task *Task) Tasks

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL