internal

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2020 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConnAttributeNameDSN             = "DSN"
	ConnAttributeNameMaxOpenConns    = "MaxOpenConns"
	ConnAttributeNameMaxIdleConns    = "MaxMaxIdleConns"
	ConnAttributeNameConnMaxLifetime = "ConnMaxLifetime"

	ConnAttributeNameNumMaxRetries = "NumMaxRetries"
)
View Source
const (
	PackageName = "jwdk"
)

Variables

View Source
var (
	ErrNoFoundQueue = fmt.Errorf("no found queue")
)

Functions

This section is empty.

Types

type Connector

type Connector struct {
	Name               string
	DB                 *sql.DB
	SQLTemplate        SQLTemplate
	IsUniqueViolation  func(err error) bool
	IsDeadlockDetected func(err error) bool

	Retryer exponential.Retryer
	// contains filtered or unexported fields
}

func (*Connector) ChangeJobVisibility

func (c *Connector) ChangeJobVisibility(ctx context.Context, input *jobworker.ChangeJobVisibilityInput, opts ...func(*jobworker.Option)) (*jobworker.ChangeJobVisibilityOutput, error)

func (*Connector) Close

func (c *Connector) Close() error

func (*Connector) CompleteJob

func (c *Connector) CompleteJob(ctx context.Context, input *jobworker.CompleteJobInput, opts ...func(*jobworker.Option)) (*jobworker.CompleteJobOutput, error)

func (*Connector) CreateQueue

func (c *Connector) CreateQueue(ctx context.Context, input *jobworker.CreateQueueInput, opts ...func(*jobworker.Option)) (*jobworker.CreateQueueOutput, error)

func (*Connector) EnqueueJob

func (c *Connector) EnqueueJob(ctx context.Context, input *jobworker.EnqueueJobInput, opts ...func(*jobworker.Option)) (*jobworker.EnqueueJobOutput, error)

func (*Connector) EnqueueJobBatch

func (c *Connector) EnqueueJobBatch(ctx context.Context, input *jobworker.EnqueueJobBatchInput, opts ...func(*jobworker.Option)) (*jobworker.EnqueueJobBatchOutput, error)

func (*Connector) FailJob

func (c *Connector) FailJob(ctx context.Context, input *jobworker.FailJobInput, opts ...func(*jobworker.Option)) (*jobworker.FailJobOutput, error)

func (*Connector) GetName

func (c *Connector) GetName() string

func (*Connector) ReceiveJobs

func (c *Connector) ReceiveJobs(ctx context.Context, ch chan<- *jobworker.Job, input *jobworker.ReceiveJobsInput, opts ...func(*jobworker.Option)) (*jobworker.ReceiveJobsOutput, error)

func (*Connector) RedriveJob

func (c *Connector) RedriveJob(ctx context.Context, input *jobworker.RedriveJobInput, opts ...func(*jobworker.Option)) (*jobworker.RedriveJobOutput, error)

func (*Connector) SetLogger

func (c *Connector) SetLogger(logger jobworker.Logger)

func (*Connector) UpdateQueue

func (c *Connector) UpdateQueue(ctx context.Context, input *jobworker.UpdateQueueInput, opts ...func(*jobworker.Option)) (*jobworker.UpdateQueueOutput, error)

type Job

type Job struct {
	SecID           int64
	JobID           string
	Class           string
	ReceiptID       string
	Args            string
	DeduplicationID *string
	GroupID         *string
	InvisibleUntil  int64
	RetryCount      int64
	EnqueueAt       int64
}

func (*Job) ToJob

func (j *Job) ToJob(queue string, conn jobworker.Connector, logger jobworker.Logger) *jobworker.Job

type Querier

type Querier interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}

type QueueAttribute

type QueueAttribute struct {
	Name                   string
	RawName                string
	DelaySeconds           int64
	VisibilityTimeout      int64
	MaximumMessageSize     int64
	MessageRetentionPeriod int64
	DeadLetterTarget       string
	MaxReceiveCount        int64

	CachePeriod time.Time
}

func (QueueAttribute) HasDeadLetter

func (q QueueAttribute) HasDeadLetter() bool

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

func NewRepository

func NewRepository(querier Querier, tmpl SQLTemplate) *Repository

func (*Repository) CreateQueueAttribute

func (r *Repository) CreateQueueAttribute(ctx context.Context, queueName, queueRawName string, visibilityTimeout, delaySeconds, maximumMessageSize, messageRetentionPeriod, maxReceiveCount int64, deadLetterTarget string) error

func (*Repository) DefineQueue

func (r *Repository) DefineQueue(ctx context.Context, queueRawName string) error

func (*Repository) DefineQueueAttribute

func (r *Repository) DefineQueueAttribute(ctx context.Context) error

func (*Repository) DeleteJob

func (r *Repository) DeleteJob(ctx context.Context, queue string, jobID string) error

func (*Repository) EnqueueJob

func (r *Repository) EnqueueJob(ctx context.Context, queue, jobID, class, args string, deduplicationID, groupID *string, delaySeconds int64) error

func (*Repository) EnqueueJobWithTime

func (r *Repository) EnqueueJobWithTime(ctx context.Context, queue string, jobID, class, args string, deduplicationID, groupID *string, enqueueAt int64) error

func (*Repository) FindJob

func (r *Repository) FindJob(ctx context.Context, queue string, jobID string) (*Job, error)

func (*Repository) FindJobs

func (r *Repository) FindJobs(ctx context.Context, queue string, limit int64) ([]*Job, error)

func (*Repository) FindQueueAttribute

func (r *Repository) FindQueueAttribute(ctx context.Context, queue string) (*QueueAttribute, error)

func (*Repository) GrabJob

func (r *Repository) GrabJob(ctx context.Context, queue string, job *Job, invisibleTime int64) (grabbed bool, err error)

func (*Repository) UpdateJobVisibility

func (r *Repository) UpdateJobVisibility(ctx context.Context, queueRawName, jobID string, visibilityTimeout int64) (updated bool, err error)

func (*Repository) UpdateQueueAttribute

func (r *Repository) UpdateQueueAttribute(ctx context.Context, queueRawName string, visibilityTimeout, delaySeconds, maximumMessageSize, messageRetentionPeriod, maxReceiveCount *int64, deadLetterTarget *string) (updated bool, err error)

type SQLTemplate

type SQLTemplate interface {
	NewFindJobDML(queueRawName string, jobID string) (string, []interface{})
	NewFindJobsDML(queueRawName string, limit int64) (string, []interface{})
	NewHideJobDML(queueRawName string, jobID string, oldRetryCount, oldInvisibleUntil, invisibleTime int64) (string, []interface{})
	NewEnqueueJobDML(queueRawName, jobID, class, args string, deduplicationID, groupID *string, delaySeconds int64) (string, []interface{})
	NewEnqueueJobWithTimeDML(queueRawName, jobID, class, args string, deduplicationID, groupID *string, enqueueAt int64) (string, []interface{})
	NewDeleteJobDML(queueRawName, jobID string) (string, []interface{})
	NewUpdateJobByVisibilityTimeoutDML(queueRawName string, jobID string, visibilityTimeout int64) (string, []interface{})

	NewAddQueueAttributeDML(queueName, queueRawName string, delaySeconds, maximumMessageSize, messageRetentionPeriod int64, deadLetterTarget string, maxReceiveCount, visibilityTimeout int64) (string, []interface{})
	NewUpdateQueueAttributeDML(visibilityTimeout, delaySeconds, maximumMessageSize, messageRetentionPeriod *int64, deadLetterTarget *string, maxReceiveCount *int64, queueRawName string) (string, []interface{})
	NewFindQueueAttributeDML(queueName string) (string, []interface{})

	NewCreateQueueAttributeDDL() string
	NewCreateQueueDDL(queueRawName string) string
}

type Values

type Values struct {
	DSN             string
	MaxOpenConns    int
	MaxIdleConns    int
	ConnMaxLifetime *time.Duration
	NumMaxRetries   *int
}

func ConnAttrsToValues

func ConnAttrsToValues(attrs map[string]interface{}) *Values

func (*Values) ApplyDefaultValues

func (v *Values) ApplyDefaultValues()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL