consumer

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2021 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BatchSizeLimit                    int64  = 10
	ReceiveMessageWaitSecondsLimit    int64  = 20
	DefaultReceiveVisibilityTimeout   int64  = 30
	DefaultTerminateVisibilityTimeout int64  = 0
	INFO                              string = "INFO "
	WARN                              string = "WARN "
	ERROR                             string = "ERROR"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchHandlerFunc

type BatchHandlerFunc func(ctx context.Context, message []*sqs.Message) error

BatchHandlerFunc batch handler function

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (con *Consumer) Close()

Close allowing the process to exit gracefully

func (*Consumer) Closed

func (con *Consumer) Closed() bool

Closed check worker is closed

func (*Consumer) Context

func (con *Consumer) Context() context.Context

func (*Consumer) Pause

func (con *Consumer) Pause()

Stop processing

func (*Consumer) Paused

func (con *Consumer) Paused() bool

Paused check worker is paused

func (*Consumer) Resume

func (con *Consumer) Resume()

func (*Consumer) Running

func (con *Consumer) Running() bool

Running check if the mq client is running

func (*Consumer) WaitGroup added in v1.4.0

func (con *Consumer) WaitGroup() *sync.WaitGroup

func (*Consumer) WithBatchSize

func (con *Consumer) WithBatchSize(batchSize int64) *Consumer

func (*Consumer) WithContext

func (con *Consumer) WithContext(ctx context.Context) *Consumer

func (*Consumer) WithEnableDebug

func (con *Consumer) WithEnableDebug(enabled bool) *Consumer

func (*Consumer) WithInterval

func (con *Consumer) WithInterval(ms int) *Consumer

func (*Consumer) WithReceiveVisibilityTimeout added in v1.3.1

func (con *Consumer) WithReceiveVisibilityTimeout(visibilityTimeout int64) *Consumer

func (*Consumer) WithReceiveWaitTimeSeconds added in v1.3.1

func (con *Consumer) WithReceiveWaitTimeSeconds(waitSeconds int64) *Consumer

func (*Consumer) WithTerminateVisibilityTimeout added in v1.3.0

func (con *Consumer) WithTerminateVisibilityTimeout(visibilityTimeout int64) *Consumer

func (*Consumer) WithWaitGroup added in v1.4.0

func (con *Consumer) WithWaitGroup(wg *sync.WaitGroup) *Consumer

func (*Consumer) Worker

func (con *Consumer) Worker(h Handler)

Worker Start polling and will continue polling till the application is forcibly stopped

func (*Consumer) WorkerPool

func (con *Consumer) WorkerPool(h Handler, poolSize int)

WorkerPool worker pool

type Handler

type Handler interface {
	HandleMessage(ctx context.Context, msg *sqs.Message) error
}

Handler interface

type HandlerFunc

type HandlerFunc func(ctx context.Context, message *sqs.Message) error

HandlerFunc handler function

func (HandlerFunc) HandleMessage

func (f HandlerFunc) HandleMessage(ctx context.Context, msg *sqs.Message) error

HandleMessage is used for the actual execution of each message

type IConsumer

type IConsumer interface {
	Resume()
	Pause()
	Close()
	Paused() bool
	Closed() bool
	Running() bool
	Context() context.Context
	WaitGroup() *sync.WaitGroup
	Worker(h Handler)
	WorkerPool(h Handler, poolSize int)
	WithWaitGroup(wg *sync.WaitGroup) *Consumer
	WithContext(ctx context.Context) *Consumer
	WithInterval(interval int) *Consumer
	WithEnableDebug(enabled bool) *Consumer
	WithBatchSize(batchSize int64) *Consumer
	WithReceiveWaitTimeSeconds(waitSeconds int64) *Consumer
	WithReceiveVisibilityTimeout(visibilityTimeout int64) *Consumer
	WithTerminateVisibilityTimeout(visibilityTimeout int64) *Consumer
}

func New

func New(sqs *sqs.SQS, queueUrl string) IConsumer

New create a new consumer

type ISqsClient

type ISqsClient interface {
	WithSqsClient(sqs *sqs.SQS) *SqsClient
	WithQueueUrl(queueUrl string) *SqsClient
	WithBatchSize(batchSize int64) *SqsClient
	WithReceiveWaitTimeSeconds(waitSeconds int64) *SqsClient
	WithReceiveVisibilityTimeout(visibilityTimeout int64) *SqsClient
	WithTerminateVisibilityTimeout(visibilityTimeout int64) *SqsClient
	GetQueueUrl(queueName string) string
	GetQueueUrlWithContext(ctx context.Context, queueName string) string
	ReceiveMessage() ([]*sqs.Message, error)
	ReceiveMessageWithContext(ctx context.Context) ([]*sqs.Message, error)
	SendMessage(message string, delaySeconds int64) (*sqs.SendMessageOutput, error)
	SendMessageWithContext(ctx context.Context, message string, delaySeconds int64) (*sqs.SendMessageOutput, error)
	DeleteMessage(message *sqs.Message) error
	DeleteMessageWithContext(ctx context.Context, message *sqs.Message) error
	DeleteMessageBatch(messages []*sqs.Message) error
	DeleteMessageBatchWithContext(ctx context.Context, messages []*sqs.Message) error
	TerminateVisibilityTimeout(message *sqs.Message) error
	TerminateVisibilityTimeoutWithContext(ctx context.Context, message *sqs.Message) error
	TerminateVisibilityTimeoutBatch(messages []*sqs.Message) error
	TerminateVisibilityTimeoutBatchWithContext(ctx context.Context, messages []*sqs.Message) error
}

ISqsClient interface

func NewSQSClient

func NewSQSClient(sqs *sqs.SQS, queueUrl string) ISqsClient

NewSQSClient create new sqs client

type SqsClient

type SqsClient struct {
	// contains filtered or unexported fields
}

SqsClient sqs client

func (*SqsClient) DeleteMessage

func (client *SqsClient) DeleteMessage(message *sqs.Message) error

DeleteMessage delete message from sqs queue

func (*SqsClient) DeleteMessageBatch

func (client *SqsClient) DeleteMessageBatch(messages []*sqs.Message) error

DeleteMessageBatch delete messages from sqs queue

func (*SqsClient) DeleteMessageBatchWithContext

func (client *SqsClient) DeleteMessageBatchWithContext(ctx context.Context, messages []*sqs.Message) error

DeleteMessageBatchWithContext delete messages from sqs queue

func (*SqsClient) DeleteMessageWithContext

func (client *SqsClient) DeleteMessageWithContext(ctx context.Context, message *sqs.Message) error

DeleteMessageWithContext delete message from sqs queue

func (*SqsClient) GetQueueUrl

func (client *SqsClient) GetQueueUrl(queueName string) string

GetQueueUrl get queue url

func (*SqsClient) GetQueueUrlWithContext

func (client *SqsClient) GetQueueUrlWithContext(ctx context.Context, queueName string) string

GetQueueUrlWithContext get queue url

func (*SqsClient) ReceiveMessage

func (client *SqsClient) ReceiveMessage() ([]*sqs.Message, error)

ReceiveMessage retrive message from sqs queue

func (*SqsClient) ReceiveMessageWithContext

func (client *SqsClient) ReceiveMessageWithContext(ctx context.Context) ([]*sqs.Message, error)

ReceiveMessageWithContext retrive message from sqs queue

func (*SqsClient) SendMessage

func (client *SqsClient) SendMessage(message string, delaySeconds int64) (*sqs.SendMessageOutput, error)

SendMessage send message to sqs queue

func (*SqsClient) SendMessageWithContext

func (client *SqsClient) SendMessageWithContext(ctx context.Context, message string, delaySeconds int64) (*sqs.SendMessageOutput, error)

SendMessageWithContext send message to sqs queue

func (*SqsClient) TerminateVisibilityTimeout

func (client *SqsClient) TerminateVisibilityTimeout(message *sqs.Message) error

TerminateVisibilityTimeout make message visible to be processed from another worker

func (*SqsClient) TerminateVisibilityTimeoutBatch

func (client *SqsClient) TerminateVisibilityTimeoutBatch(messages []*sqs.Message) error

TerminateVisibilityTimeoutBatch make messages visible to be processed from another worker

func (*SqsClient) TerminateVisibilityTimeoutBatchWithContext

func (client *SqsClient) TerminateVisibilityTimeoutBatchWithContext(ctx context.Context, messages []*sqs.Message) error

TerminateVisibilityTimeoutBatchWithContext make messages visible to be processed from another worker

func (*SqsClient) TerminateVisibilityTimeoutWithContext

func (client *SqsClient) TerminateVisibilityTimeoutWithContext(ctx context.Context, message *sqs.Message) error

TerminateVisibilityTimeoutWithContext make message visible to be processed from another worker

func (*SqsClient) WithBatchSize

func (client *SqsClient) WithBatchSize(batchSize int64) *SqsClient

func (*SqsClient) WithQueueUrl

func (client *SqsClient) WithQueueUrl(queueUrl string) *SqsClient

func (*SqsClient) WithReceiveVisibilityTimeout added in v1.3.0

func (client *SqsClient) WithReceiveVisibilityTimeout(visibilityTimeout int64) *SqsClient

func (*SqsClient) WithReceiveWaitTimeSeconds added in v1.3.0

func (client *SqsClient) WithReceiveWaitTimeSeconds(waitSeconds int64) *SqsClient

func (*SqsClient) WithSqsClient

func (client *SqsClient) WithSqsClient(sqs *sqs.SQS) *SqsClient

func (*SqsClient) WithTerminateVisibilityTimeout added in v1.3.0

func (client *SqsClient) WithTerminateVisibilityTimeout(visibilityTimeout int64) *SqsClient

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL