Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrUnknownTopic = errors.New("unknown topic") ErrTasksComplete = errors.New("tasks complete") )
View Source
var ErrJobNotFound = errors.New("job not found")
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
type Checkpoint struct { Name string `dynamo:"Name" json:"name"` Output string `dynamo:"Output" json:"output"` }
func NewCheckpoint ¶
func NewCheckpoint(name, output string) *Checkpoint
type Checkpoints ¶
type Checkpoints []Checkpoint
func NewCheckpoints ¶
func NewCheckpoints() Checkpoints
func (Checkpoints) GetCheckpoint ¶
func (checkpoints Checkpoints) GetCheckpoint(name string) (index int, checkpoint *Checkpoint)
func (Checkpoints) SetCheckpoint ¶
func (checkpoints Checkpoints) SetCheckpoint(checkpoint *Checkpoint) Checkpoints
type DynamoClient ¶
type DynamoClient interface { PutJob(ctx context.Context, job *Job) error GetJob(ctx context.Context, jobID string) (*Job, error) }
func NewDynamoClient ¶
func NewDynamoClient(awsSession *session.Session, table string) DynamoClient
type Job ¶
type Job struct { ID string `dynamo:"ID" json:"id"` ResourceID string `dynamo:"ResourceID" json:"resource_id"` StartTime time.Time `dynamo:"StartTime" json:"start_time"` EndTime time.Time `dynamo:"EndTime" json:"end_time"` Status string `dynamo:"Status"` Checkpoints Checkpoints `dynamo:"Checkpoints" json:"checkpoints"` Error string `dynamo:"Error" json:"error"` Retries int `dynamo:"Retries" json:"retries"` Tasks Tasks `dynamo:"Tasks" json:"tasks"` CreatedAt time.Time `dynamo:"CreatedAt" json:"created_at"` UpdatedAt time.Time `dynamo:"UpdatedAt" json:"updated_at"` // contains filtered or unexported fields }
func (*Job) CompleteJob ¶
func (*Job) IsRetryable ¶
func (*Job) MarkAsUpdated ¶
func (*Job) ResetRetries ¶
func (*Job) SetCheckpoints ¶
func (job *Job) SetCheckpoints(checkpoints Checkpoints) *Job
type JobIDConsumer ¶
type JobIDConsumer struct {
// contains filtered or unexported fields
}
func (*JobIDConsumer) Cleanup ¶
func (consumer *JobIDConsumer) Cleanup(sarama.ConsumerGroupSession) error
func (*JobIDConsumer) ConsumeClaim ¶
func (consumer *JobIDConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*JobIDConsumer) Setup ¶
func (consumer *JobIDConsumer) Setup(sarama.ConsumerGroupSession) error
type KafkaClient ¶
type KafkaClient interface { PublishMessage(topic string, payload any) error UpsertTopic(name string, numPartitions int32, msgRetention time.Duration) error DeleteTopic(topic string) error }
func NewKafkaClient ¶
func NewKafkaClient( brokers []string, clientID string, version sarama.KafkaVersion, username string, password string, ) (KafkaClient, error)
type Task ¶
type Task struct { ID string `dynamo:"ID" json:"id"` Data string `dynamo:"Data" json:"data"` StartTime time.Time `dynamo:"StartTime" json:"start_time"` EndTime time.Time `dynamo:"EndTime" json:"end_time"` Status string `dynamo:"Status" json:"status"` Sent bool `dynamo:"Sent" json:"sent"` Checkpoints Checkpoints `dynamo:"Checkpoints" json:"checkpoints"` Error string `dynamo:"Error" json:"error"` Retries int `dynamo:"Retries" json:"retries"` // contains filtered or unexported fields }
func (*Task) CompleteTask ¶
func (*Task) IsRetryable ¶
func (*Task) MarkAsSent ¶
func (*Task) ResetRetries ¶
func (*Task) SetCheckpoints ¶
func (task *Task) SetCheckpoints(checkpoints Checkpoints) *Task
Click to show internal directories.
Click to hide internal directories.