Documentation ¶
Index ¶
- Constants
- Variables
- func GetGormFromContext(ctx context.Context) *gorm.DB
- func GetRedisClientFromContext(ctx context.Context) *redis.Client
- func LogFailedJob(ctx context.Context, j Job, err error)
- func MakeConsumer(jobFactory JobFactory, handlerFactory HandlerFactory, publisher Publisher, ...) *defaultConsumer
- func NewRelicGormWithTransaction(dbConn *gorm.DB) func(next HandleFunc) HandleFunc
- func NewRelicToGorm(dbConn *gorm.DB) func(next HandleFunc) HandleFunc
- func NewRelicToRedis(c *redis.Client) func(next HandleFunc) HandleFunc
- func NewRmqConn(redisConn *redis.Client) (rmq.Connection, error)
- func NewRmqConnFromRedisConfig(redisConfig *RedisConfig) (rmq.Connection, error)
- func SetGormToContext(ctx context.Context, dbConn *gorm.DB) context.Context
- func SetRedisClientToContext(ctx context.Context, c *redis.Client) context.Context
- func WithNewRelicForConsumer(nrApp newrelic.Application) func(next HandleFunc) HandleFunc
- type BaseHandler
- type ConsumerManager
- type ConsumerManagerMock
- type FailHandler
- type HandleFunc
- type Handler
- type HandlerFactory
- type HandlerMiddleWare
- type HandlerMock
- type Job
- type JobFactory
- type Publisher
- type PublisherHandlerFunc
- type PublisherHandlerMiddleWare
- type PublisherMock
- type RedisConfig
- type RedisJob
- func (job *RedisJob) Attempt()
- func (job *RedisJob) Delay() int
- func (job *RedisJob) Fail(err error)
- func (job *RedisJob) GetAttempts() int
- func (job *RedisJob) GetFailedError() string
- func (job *RedisJob) GetID() string
- func (job *RedisJob) GetMaxTries() int
- func (job *RedisJob) GetQueue() string
- func (job *RedisJob) GetTraceID() string
- func (job *RedisJob) GetUserID() string
- func (job *RedisJob) HasFailed() bool
- func (job *RedisJob) OnQueue(queue string)
- func (job *RedisJob) Retry(err error)
Constants ¶
View Source
const (
FieldJobPayload = "job"
)
Variables ¶
View Source
var ( ErrFailedWithUnknownData = errors.New("consume failed with unknown data") ErrJobExceedRetryTimes = errors.New("job exceeds retry times") ErrorInValidJobModel = errors.New("invalid job struct") ErrDbEmpty = errors.New("db connection invalid") ErrRedisConnectionEmpty = errors.New("Redis connection invalid") )
Functions ¶
func MakeConsumer ¶
func MakeConsumer( jobFactory JobFactory, handlerFactory HandlerFactory, publisher Publisher, failJobHandlers []FailHandler, middlewareList []HandlerMiddleWare, ) *defaultConsumer
func NewRelicGormWithTransaction ¶
func NewRelicGormWithTransaction(dbConn *gorm.DB) func(next HandleFunc) HandleFunc
func NewRelicToGorm ¶
func NewRelicToGorm(dbConn *gorm.DB) func(next HandleFunc) HandleFunc
func NewRelicToRedis ¶
func NewRelicToRedis(c *redis.Client) func(next HandleFunc) HandleFunc
func NewRmqConn ¶
func NewRmqConn(redisConn *redis.Client) (rmq.Connection, error)
NewRmqConn returns a connection to RedisQueue with redisClient
func NewRmqConnFromRedisConfig ¶
func NewRmqConnFromRedisConfig(redisConfig *RedisConfig) (rmq.Connection, error)
NewRmqConnFromRedisConfig returns a connection to RedisQueue using redisConfig
func SetRedisClientToContext ¶
func WithNewRelicForConsumer ¶
func WithNewRelicForConsumer(nrApp newrelic.Application) func(next HandleFunc) HandleFunc
Types ¶
type BaseHandler ¶
type BaseHandler struct{}
Base handler, all handler should embed this one
func (*BaseHandler) Handle ¶
func (handler *BaseHandler) Handle(_ context.Context, _ Job) error
Handle job
func (*BaseHandler) ShouldRejectOnFailure ¶
func (handler *BaseHandler) ShouldRejectOnFailure(err error) bool
Depend on error, we can move job to rejected queue OR just skip, ignore the job
func (*BaseHandler) ShouldRetryOnError ¶
func (handler *BaseHandler) ShouldRetryOnError(err error) bool
Determine if which this error job should retry or fail
type ConsumerManager ¶
type ConsumerManager interface { Add(queueName string, consumer rmq.Consumer) StartConsuming(queueName string, replicas int, pollDuration time.Duration) StopConsuming(queueName string) }
func NewConsumerManager ¶
func NewConsumerManager() (ConsumerManager, error)
NewConsumerManager returns a ConsumerManager
func NewConsumerManagerFromConfig ¶
func NewConsumerManagerFromConfig(conf *RedisConfig) (ConsumerManager, error)
func NewConsumerManagerWithConnection ¶
func NewConsumerManagerWithConnection(conn rmq.Connection) ConsumerManager
type ConsumerManagerMock ¶
type ConsumerManagerMock struct { AddFn func(queueName string, consumer rmq.Consumer) StartConsumingFn func(queueName string, replicas int, pollDuration time.Duration) StopConsumingFn func(queueName string) }
func (ConsumerManagerMock) Add ¶
func (p ConsumerManagerMock) Add(queueName string, consumer rmq.Consumer)
func (ConsumerManagerMock) StartConsuming ¶
func (p ConsumerManagerMock) StartConsuming(queueName string, replicas int, pollDuration time.Duration)
func (ConsumerManagerMock) StopConsuming ¶
func (p ConsumerManagerMock) StopConsuming(queueName string)
type HandleFunc ¶
type Handler ¶
type Handler interface { // Handle job return nil error mean job was processed successfully, otherwise job // was failed and method FailJob will be called Handle(ctx context.Context, job Job) error // Should retry on error ShouldRetryOnError(err error) bool // Should be move to rejected queue in case of fail ShouldRejectOnFailure(err error) bool }
type HandlerFactory ¶
Make handler from context Brand new handler is created for every request
type HandlerMiddleWare ¶
type HandlerMiddleWare func(next HandleFunc) HandleFunc
type HandlerMock ¶
type HandlerMock struct { HandleFn func(ctx context.Context, job Job) error ShouldRetryOnErrorFn func(err error) bool ShouldRejectOnFailureFn func(err error) bool }
func (HandlerMock) ShouldRejectOnFailure ¶
func (handler HandlerMock) ShouldRejectOnFailure(err error) bool
func (HandlerMock) ShouldRetryOnError ¶
func (handler HandlerMock) ShouldRetryOnError(err error) bool
type Job ¶
type Job interface { // Get job ID GetID() string // Get user who triggered job GetUserID() string // Get trace id of request which triggered job GetTraceID() string // Set Queue OnQueue(queueName string) // Increase number of attempt times Attempt() // Get the number of job's attempt times GetAttempts() int // Mark job as failed Fail(err error) // Retry on error Retry(err error) // Determine if the job has been marked as a failure. HasFailed() bool // Get the number of times to attempt a job. Default is 1. GetMaxTries() int // Get job's Queue name GetQueue() string // Get delay time time in second before the job is retried again Delay() int // Return error string why job failed GetFailedError() string }
type JobFactory ¶
type JobFactory func() Job
Return a new job instance for consumer to decode payload json
type Publisher ¶
type Publisher interface { Publish(ctx context.Context, job Job) error PublishOnDelay(ctx context.Context, job Job) error PublishRejected(ctx context.Context, job Job) error UseMiddlewares(m ...PublisherHandlerMiddleWare) }
func NewPublisher ¶
func NewPublisherFromConfig ¶
func NewPublisherFromConfig(conf *RedisConfig) (Publisher, error)
func NewPublisherWithConnection ¶
func NewPublisherWithConnection(conn rmq.Connection) (Publisher, error)
type PublisherHandlerMiddleWare ¶
type PublisherHandlerMiddleWare func(next PublisherHandlerFunc) PublisherHandlerFunc
func WithNewRelicTransaction ¶
func WithNewRelicTransaction() PublisherHandlerMiddleWare
type PublisherMock ¶
type PublisherMock struct { PublishFn func(queue string, job Job) error PublishOnDelayFn func(queue string, job Job, delayAt time.Time) error PublishRejectedFn func(job Job) error }
func (PublisherMock) PublishOnDelay ¶
func (PublisherMock) PublishRejected ¶
func (p PublisherMock) PublishRejected(job Job) error
type RedisConfig ¶
type RedisConfig struct { RedisMaster string `envconfig:"REDIS_MASTER" required:"true"` SentinelHost string `envconfig:"SENTINEL_HOST" required:"true"` SentinelPort string `envconfig:"SENTINEL_PORT" required:"true"` RedisMaxActiveConnection int `envconfig:"REDIS_MAX_ACTIVE" required:"false"` MaxIdle int `envconfig:"REDIS_MAX_IDLE" required:"false"` }
func GetConfigFromEnv ¶
func GetConfigFromEnv() RedisConfig
func (*RedisConfig) GetSentinelAddress ¶
func (redisConfig *RedisConfig) GetSentinelAddress() string
type RedisJob ¶
type RedisJob struct { ID string UserID string TraceID string Queue string Attempts int Failed bool Error string }
func NewRedisJob ¶
func (*RedisJob) GetAttempts ¶
func (*RedisJob) GetFailedError ¶
func (*RedisJob) GetMaxTries ¶
func (*RedisJob) GetTraceID ¶
Click to show internal directories.
Click to hide internal directories.