queue

package
v1.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultIdle       = 10 * time.Second // 即多长时间后未收到 ACK 的消息被认为是 Pending 状态需要被处理
	DefaultGlobalIdle = 30 * time.Second // 消费者会从全局 Pending 获取超过这个时间的消息
)

Variables

View Source
var (
	ErrBaseRedisQueueNameEmpty    = errors.New("standalone-redis queue name is empty")
	ErrClusterRedisQueueNameEmpty = errors.New("cluster-redis queue name is empty")
)
View Source
var (
	ErrStandaloneRedisQueueAddrEmpty          = errors.New("standalone-redis addr is empty")
	ErrStandaloneRedisQueueConsumerGroupEmpty = errors.New("standalone-redis queue consumer group name is empty")
	ErrClusterRedisQueueAddrsEmpty            = errors.New("cluster-redis addrs is empty")
	ErrClusterRedisQueueConsumerGroupEmpty    = errors.New("cluster-redis queue consumer group name is empty")
)
View Source
var (
	ErrTencentCloudQueueServiceTokenEmpty            = errors.New("token for tencentcloud queue service is empty")
	ErrTencentCloudQueueServiceURLEmpty              = errors.New("url for tencentcloud queue service is empty")
	ErrTencentCloudQueueServiceEmptySubscriptionName = errors.New("subscription name for tencentcloud queue service is empty")
	ErrTencentCloudQueueServiceEmptyTopic            = errors.New("topic name for tencentcloud queue service is empty")
)
View Source
var ErrAWSQueueNameEmpty = errors.New("aws queue name is empty")

Functions

func GenerateTopicAndSubName

func GenerateTopicAndSubName(topic, subscription string) string

Types

type AWSQueueMessage

type AWSQueueMessage struct {
	// contains filtered or unexported fields
}

func (*AWSQueueMessage) Body

func (message *AWSQueueMessage) Body() string

type AWSQueueService

type AWSQueueService struct {
	// contains filtered or unexported fields
}

func (*AWSQueueService) AckMessage

func (service *AWSQueueService) AckMessage(ctx context.Context, message Message) error

func (*AWSQueueService) Close

func (service *AWSQueueService) Close() error

func (*AWSQueueService) CreateConsumer

func (service *AWSQueueService) CreateConsumer() (Consumer, error)

func (*AWSQueueService) CreateProducer

func (service *AWSQueueService) CreateProducer() (Producer, error)

func (*AWSQueueService) ReceiveMessages

func (service *AWSQueueService) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)

func (*AWSQueueService) SendMessage

func (service *AWSQueueService) SendMessage(ctx context.Context, body string) error

type BaseRedisQueueConsumer added in v1.2.0

type BaseRedisQueueConsumer struct {
	// contains filtered or unexported fields
}

func (*BaseRedisQueueConsumer) AckMessage added in v1.2.0

func (c *BaseRedisQueueConsumer) AckMessage(ctx context.Context, message Message) error

func (*BaseRedisQueueConsumer) Close added in v1.2.0

func (c *BaseRedisQueueConsumer) Close() error

func (*BaseRedisQueueConsumer) ReceiveMessages added in v1.2.0

func (c *BaseRedisQueueConsumer) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)

ReceiveMessages 获取待消费消息

首先获取当前消费者 Pending 状态超过 X 秒的消息
而后获取全局所有的 Pending 状态超过 Y 秒的消息
最后获取 Stream 队列的消息

type BaseRedisQueueMessage added in v1.2.0

type BaseRedisQueueMessage struct {
	// contains filtered or unexported fields
}

func (*BaseRedisQueueMessage) Body added in v1.2.0

func (message *BaseRedisQueueMessage) Body() string

type BaseRedisQueueService added in v1.2.0

type BaseRedisQueueService struct {
	// contains filtered or unexported fields
}

func (*BaseRedisQueueService) Close added in v1.2.0

func (service *BaseRedisQueueService) Close() error

func (*BaseRedisQueueService) CreateConsumer added in v1.2.0

func (service *BaseRedisQueueService) CreateConsumer() (Consumer, error)

func (*BaseRedisQueueService) CreateProducer added in v1.2.0

func (service *BaseRedisQueueService) CreateProducer() (Producer, error)

func (*BaseRedisQueueService) IfCreateMkStream added in v1.2.0

func (service *BaseRedisQueueService) IfCreateMkStream(ctx context.Context) error

func (*BaseRedisQueueService) SendMessage added in v1.2.0

func (service *BaseRedisQueueService) SendMessage(ctx context.Context, body string) error

type ClusterRedisQueueOption added in v1.2.0

type ClusterRedisQueueOption struct {
	Addrs           []string
	Password        string
	DB              *int
	MaxRetries      *int
	PoolSize        *int
	DialTimeout     *time.Duration
	ReadTimeout     *time.Duration
	WriteTimeout    *time.Duration
	MinIdleConns    *int
	MaxIdleConns    *int
	ConnMaxIdleTime *time.Duration
	ConnMaxLifetime *time.Duration

	// queue
	ConsumerGroup string
	Idle          int
	GlobalIdle    int
}

func (ClusterRedisQueueOption) CheckAWS added in v1.2.0

func (option ClusterRedisQueueOption) CheckAWS() error

func (ClusterRedisQueueOption) CheckAliCloudStorage added in v1.3.0

func (option ClusterRedisQueueOption) CheckAliCloudStorage() error

func (ClusterRedisQueueOption) CheckClusterRedis added in v1.2.0

func (option ClusterRedisQueueOption) CheckClusterRedis() error

func (ClusterRedisQueueOption) CheckStandaloneRedis added in v1.2.0

func (option ClusterRedisQueueOption) CheckStandaloneRedis() error

func (ClusterRedisQueueOption) CheckTencentCloud added in v1.2.0

func (option ClusterRedisQueueOption) CheckTencentCloud() error

func (ClusterRedisQueueOption) GetAssumeRegion added in v1.2.0

func (option ClusterRedisQueueOption) GetAssumeRegion() string

func (ClusterRedisQueueOption) GetAssumeRoleArn added in v1.2.0

func (option ClusterRedisQueueOption) GetAssumeRoleArn() string

func (ClusterRedisQueueOption) GetProvider added in v1.2.0

func (option ClusterRedisQueueOption) GetProvider() cloud.Provider

func (ClusterRedisQueueOption) GetRegion added in v1.2.0

func (option ClusterRedisQueueOption) GetRegion() string

func (ClusterRedisQueueOption) GetSecretID added in v1.2.0

func (option ClusterRedisQueueOption) GetSecretID() string

func (ClusterRedisQueueOption) GetSecretKey added in v1.2.0

func (option ClusterRedisQueueOption) GetSecretKey() string

type ClusterRedisQueueOptionV7 added in v1.3.3

type ClusterRedisQueueOptionV7 struct {
	ClusterRedisQueueOption
}

func (ClusterRedisQueueOptionV7) GetProvider added in v1.3.3

func (option ClusterRedisQueueOptionV7) GetProvider() cloud.Provider

type Consumer

type Consumer interface {
	ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)
	AckMessage(ctx context.Context, message Message) error
	Close() error
}

type Message

type Message interface {
	Body() string
}

type Producer

type Producer interface {
	SendMessage(ctx context.Context, body string) error
	Close() error
}

type QueueService

type QueueService interface {
	CreateProducer() (Producer, error)
	CreateConsumer() (Consumer, error)
	Close() error
}

func GetAWSQueueService

func GetAWSQueueService(queueName string, option cloud.Option) (QueueService, error)

func GetClusterRedisQueueService added in v1.2.0

func GetClusterRedisQueueService(queueName string, option cloud.Option) (QueueService, error)

func GetQueueService

func GetQueueService(queueOrTopicSubName string, option cloud.Option) (QueueService, error)

func GetStandaloneRedisQueueService added in v1.2.0

func GetStandaloneRedisQueueService(queueName string, option cloud.Option) (QueueService, error)

func GetTencentCloudQueueService

func GetTencentCloudQueueService(topicSubName string, option cloud.Option) (QueueService, error)

type RedisQueueConsumerV7 added in v1.3.3

type RedisQueueConsumerV7 struct {
	// contains filtered or unexported fields
}

func (*RedisQueueConsumerV7) AckMessage added in v1.3.3

func (c *RedisQueueConsumerV7) AckMessage(ctx context.Context, message Message) error

func (*RedisQueueConsumerV7) Close added in v1.3.3

func (c *RedisQueueConsumerV7) Close() error

func (*RedisQueueConsumerV7) ReceiveMessages added in v1.3.3

func (c *RedisQueueConsumerV7) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)

ReceiveMessages 获取待消费消息

首先获取当前消费者 Pending 状态超过 X 秒的消息
而后获取全局所有的 Pending 状态超过 Y 秒的消息
最后获取 Stream 队列的消息

type RedisQueueMessageV7 added in v1.3.3

type RedisQueueMessageV7 struct {
	// contains filtered or unexported fields
}

func (*RedisQueueMessageV7) Body added in v1.3.3

func (message *RedisQueueMessageV7) Body() string

type RedisQueueServiceV7 added in v1.3.3

type RedisQueueServiceV7 struct {
	// contains filtered or unexported fields
}

func (*RedisQueueServiceV7) Close added in v1.3.3

func (service *RedisQueueServiceV7) Close() error

func (*RedisQueueServiceV7) CreateConsumer added in v1.3.3

func (service *RedisQueueServiceV7) CreateConsumer() (Consumer, error)

func (*RedisQueueServiceV7) CreateProducer added in v1.3.3

func (service *RedisQueueServiceV7) CreateProducer() (Producer, error)

func (*RedisQueueServiceV7) IfCreateMkStream added in v1.3.3

func (service *RedisQueueServiceV7) IfCreateMkStream(ctx context.Context) error

func (*RedisQueueServiceV7) SendMessage added in v1.3.3

func (service *RedisQueueServiceV7) SendMessage(ctx context.Context, body string) error

type StandaloneRedisQueueOption added in v1.2.0

type StandaloneRedisQueueOption struct {
	Addr         string
	Password     string
	DB           *int
	MaxRetries   *int
	PoolSize     *int
	DialTimeout  *time.Duration
	ReadTimeout  *time.Duration
	WriteTimeout *time.Duration
	MinIdleConns *int

	// queue
	ConsumerGroup string
	Idle          int
	GlobalIdle    int
}

func (StandaloneRedisQueueOption) CheckAWS added in v1.2.0

func (option StandaloneRedisQueueOption) CheckAWS() error

func (StandaloneRedisQueueOption) CheckAliCloudStorage added in v1.3.0

func (option StandaloneRedisQueueOption) CheckAliCloudStorage() error

func (StandaloneRedisQueueOption) CheckClusterRedis added in v1.2.0

func (option StandaloneRedisQueueOption) CheckClusterRedis() error

func (StandaloneRedisQueueOption) CheckStandaloneRedis added in v1.2.0

func (option StandaloneRedisQueueOption) CheckStandaloneRedis() error

func (StandaloneRedisQueueOption) CheckTencentCloud added in v1.2.0

func (option StandaloneRedisQueueOption) CheckTencentCloud() error

func (StandaloneRedisQueueOption) GetAssumeRegion added in v1.2.0

func (option StandaloneRedisQueueOption) GetAssumeRegion() string

func (StandaloneRedisQueueOption) GetAssumeRoleArn added in v1.2.0

func (option StandaloneRedisQueueOption) GetAssumeRoleArn() string

func (StandaloneRedisQueueOption) GetProvider added in v1.2.0

func (option StandaloneRedisQueueOption) GetProvider() cloud.Provider

func (StandaloneRedisQueueOption) GetRegion added in v1.2.0

func (option StandaloneRedisQueueOption) GetRegion() string

func (StandaloneRedisQueueOption) GetSecretID added in v1.2.0

func (option StandaloneRedisQueueOption) GetSecretID() string

func (StandaloneRedisQueueOption) GetSecretKey added in v1.2.0

func (option StandaloneRedisQueueOption) GetSecretKey() string

type StandaloneRedisQueueOptionV7 added in v1.3.3

type StandaloneRedisQueueOptionV7 struct {
	StandaloneRedisQueueOption
}

func (StandaloneRedisQueueOptionV7) GetProvider added in v1.3.3

func (option StandaloneRedisQueueOptionV7) GetProvider() cloud.Provider

type TencentCloudQueueConsumer

type TencentCloudQueueConsumer struct {
	// contains filtered or unexported fields
}

func (*TencentCloudQueueConsumer) AckMessage

func (consumer *TencentCloudQueueConsumer) AckMessage(ctx context.Context, message Message) error

func (*TencentCloudQueueConsumer) Close

func (consumer *TencentCloudQueueConsumer) Close() error

func (*TencentCloudQueueConsumer) ReceiveMessages

func (consumer *TencentCloudQueueConsumer) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)

type TencentCloudQueueMessage

type TencentCloudQueueMessage struct {
	// contains filtered or unexported fields
}

func (*TencentCloudQueueMessage) Body

func (message *TencentCloudQueueMessage) Body() string

type TencentCloudQueueOption

type TencentCloudQueueOption struct {
	Token string
	URL   string
}

func (TencentCloudQueueOption) CheckAWS

func (option TencentCloudQueueOption) CheckAWS() error

func (TencentCloudQueueOption) CheckAliCloudStorage added in v1.3.0

func (option TencentCloudQueueOption) CheckAliCloudStorage() error

func (TencentCloudQueueOption) CheckClusterRedis added in v1.2.0

func (option TencentCloudQueueOption) CheckClusterRedis() error

func (TencentCloudQueueOption) CheckStandaloneRedis added in v1.2.0

func (option TencentCloudQueueOption) CheckStandaloneRedis() error

func (TencentCloudQueueOption) CheckTencentCloud

func (option TencentCloudQueueOption) CheckTencentCloud() error

func (TencentCloudQueueOption) GetAssumeRegion added in v1.1.3

func (option TencentCloudQueueOption) GetAssumeRegion() string

func (TencentCloudQueueOption) GetAssumeRoleArn added in v1.1.3

func (option TencentCloudQueueOption) GetAssumeRoleArn() string

func (TencentCloudQueueOption) GetProvider

func (option TencentCloudQueueOption) GetProvider() cloud.Provider

func (TencentCloudQueueOption) GetRegion

func (option TencentCloudQueueOption) GetRegion() string

func (TencentCloudQueueOption) GetSecretID

func (option TencentCloudQueueOption) GetSecretID() string

func (TencentCloudQueueOption) GetSecretKey

func (option TencentCloudQueueOption) GetSecretKey() string

type TencentCloudQueueProducer

type TencentCloudQueueProducer struct {
	// contains filtered or unexported fields
}

func (*TencentCloudQueueProducer) Close

func (producer *TencentCloudQueueProducer) Close() error

func (*TencentCloudQueueProducer) SendMessage

func (producer *TencentCloudQueueProducer) SendMessage(ctx context.Context, body string) error

type TencentCloudQueueService

type TencentCloudQueueService struct {
	// contains filtered or unexported fields
}

func (*TencentCloudQueueService) Close

func (service *TencentCloudQueueService) Close() error

func (*TencentCloudQueueService) CreateConsumer

func (service *TencentCloudQueueService) CreateConsumer() (Consumer, error)

func (*TencentCloudQueueService) CreateProducer

func (service *TencentCloudQueueService) CreateProducer() (Producer, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL