Documentation ¶
Index ¶
- Constants
- Variables
- func GenerateTopicAndSubName(topic, subscription string) string
- type AWSQueueMessage
- type AWSQueueService
- func (service *AWSQueueService) AckMessage(ctx context.Context, message Message) error
- func (service *AWSQueueService) Close() error
- func (service *AWSQueueService) CreateConsumer() (Consumer, error)
- func (service *AWSQueueService) CreateProducer() (Producer, error)
- func (service *AWSQueueService) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)
- func (service *AWSQueueService) SendMessage(ctx context.Context, body string) error
- type BaseRedisQueueConsumer
- type BaseRedisQueueMessage
- type BaseRedisQueueService
- func (service *BaseRedisQueueService) Close() error
- func (service *BaseRedisQueueService) CreateConsumer() (Consumer, error)
- func (service *BaseRedisQueueService) CreateProducer() (Producer, error)
- func (service *BaseRedisQueueService) IfCreateMkStream(ctx context.Context) error
- func (service *BaseRedisQueueService) SendMessage(ctx context.Context, body string) error
- type ClusterRedisQueueOption
- func (option ClusterRedisQueueOption) CheckAWS() error
- func (option ClusterRedisQueueOption) CheckAliCloudStorage() error
- func (option ClusterRedisQueueOption) CheckClusterRedis() error
- func (option ClusterRedisQueueOption) CheckStandaloneRedis() error
- func (option ClusterRedisQueueOption) CheckTencentCloud() error
- func (option ClusterRedisQueueOption) GetAssumeRegion() string
- func (option ClusterRedisQueueOption) GetAssumeRoleArn() string
- func (option ClusterRedisQueueOption) GetProvider() cloud.Provider
- func (option ClusterRedisQueueOption) GetRegion() string
- func (option ClusterRedisQueueOption) GetSecretID() string
- func (option ClusterRedisQueueOption) GetSecretKey() string
- type ClusterRedisQueueOptionV7
- type Consumer
- type Message
- type Producer
- type QueueService
- func GetAWSQueueService(queueName string, option cloud.Option) (QueueService, error)
- func GetClusterRedisQueueService(queueName string, option cloud.Option) (QueueService, error)
- func GetQueueService(queueOrTopicSubName string, option cloud.Option) (QueueService, error)
- func GetStandaloneRedisQueueService(queueName string, option cloud.Option) (QueueService, error)
- func GetTencentCloudQueueService(topicSubName string, option cloud.Option) (QueueService, error)
- type RedisQueueConsumerV7
- type RedisQueueMessageV7
- type RedisQueueServiceV7
- func (service *RedisQueueServiceV7) Close() error
- func (service *RedisQueueServiceV7) CreateConsumer() (Consumer, error)
- func (service *RedisQueueServiceV7) CreateProducer() (Producer, error)
- func (service *RedisQueueServiceV7) IfCreateMkStream(ctx context.Context) error
- func (service *RedisQueueServiceV7) SendMessage(ctx context.Context, body string) error
- type StandaloneRedisQueueOption
- func (option StandaloneRedisQueueOption) CheckAWS() error
- func (option StandaloneRedisQueueOption) CheckAliCloudStorage() error
- func (option StandaloneRedisQueueOption) CheckClusterRedis() error
- func (option StandaloneRedisQueueOption) CheckStandaloneRedis() error
- func (option StandaloneRedisQueueOption) CheckTencentCloud() error
- func (option StandaloneRedisQueueOption) GetAssumeRegion() string
- func (option StandaloneRedisQueueOption) GetAssumeRoleArn() string
- func (option StandaloneRedisQueueOption) GetProvider() cloud.Provider
- func (option StandaloneRedisQueueOption) GetRegion() string
- func (option StandaloneRedisQueueOption) GetSecretID() string
- func (option StandaloneRedisQueueOption) GetSecretKey() string
- type StandaloneRedisQueueOptionV7
- type TencentCloudQueueConsumer
- type TencentCloudQueueMessage
- type TencentCloudQueueOption
- func (option TencentCloudQueueOption) CheckAWS() error
- func (option TencentCloudQueueOption) CheckAliCloudStorage() error
- func (option TencentCloudQueueOption) CheckClusterRedis() error
- func (option TencentCloudQueueOption) CheckStandaloneRedis() error
- func (option TencentCloudQueueOption) CheckTencentCloud() error
- func (option TencentCloudQueueOption) GetAssumeRegion() string
- func (option TencentCloudQueueOption) GetAssumeRoleArn() string
- func (option TencentCloudQueueOption) GetProvider() cloud.Provider
- func (option TencentCloudQueueOption) GetRegion() string
- func (option TencentCloudQueueOption) GetSecretID() string
- func (option TencentCloudQueueOption) GetSecretKey() string
- type TencentCloudQueueProducer
- type TencentCloudQueueService
Constants ¶
View Source
const ( DefaultIdle = 10 * time.Second // 即多长时间后未收到 ACK 的消息被认为是 Pending 状态需要被处理 DefaultGlobalIdle = 30 * time.Second // 消费者会从全局 Pending 获取超过这个时间的消息 )
Variables ¶
View Source
var ( ErrBaseRedisQueueNameEmpty = errors.New("standalone-redis queue name is empty") ErrClusterRedisQueueNameEmpty = errors.New("cluster-redis queue name is empty") )
View Source
var ( ErrStandaloneRedisQueueAddrEmpty = errors.New("standalone-redis addr is empty") ErrStandaloneRedisQueueConsumerGroupEmpty = errors.New("standalone-redis queue consumer group name is empty") ErrClusterRedisQueueAddrsEmpty = errors.New("cluster-redis addrs is empty") ErrClusterRedisQueueConsumerGroupEmpty = errors.New("cluster-redis queue consumer group name is empty") )
View Source
var ( ErrTencentCloudQueueServiceTokenEmpty = errors.New("token for tencentcloud queue service is empty") ErrTencentCloudQueueServiceURLEmpty = errors.New("url for tencentcloud queue service is empty") ErrTencentCloudQueueServiceEmptySubscriptionName = errors.New("subscription name for tencentcloud queue service is empty") ErrTencentCloudQueueServiceEmptyTopic = errors.New("topic name for tencentcloud queue service is empty") )
View Source
var ErrAWSQueueNameEmpty = errors.New("aws queue name is empty")
Functions ¶
func GenerateTopicAndSubName ¶
Types ¶
type AWSQueueMessage ¶
type AWSQueueMessage struct {
// contains filtered or unexported fields
}
func (*AWSQueueMessage) Body ¶
func (message *AWSQueueMessage) Body() string
type AWSQueueService ¶
type AWSQueueService struct {
// contains filtered or unexported fields
}
func (*AWSQueueService) AckMessage ¶
func (service *AWSQueueService) AckMessage(ctx context.Context, message Message) error
func (*AWSQueueService) Close ¶
func (service *AWSQueueService) Close() error
func (*AWSQueueService) CreateConsumer ¶
func (service *AWSQueueService) CreateConsumer() (Consumer, error)
func (*AWSQueueService) CreateProducer ¶
func (service *AWSQueueService) CreateProducer() (Producer, error)
func (*AWSQueueService) ReceiveMessages ¶
func (*AWSQueueService) SendMessage ¶
func (service *AWSQueueService) SendMessage(ctx context.Context, body string) error
type BaseRedisQueueConsumer ¶ added in v1.2.0
type BaseRedisQueueConsumer struct {
// contains filtered or unexported fields
}
func (*BaseRedisQueueConsumer) AckMessage ¶ added in v1.2.0
func (c *BaseRedisQueueConsumer) AckMessage(ctx context.Context, message Message) error
func (*BaseRedisQueueConsumer) Close ¶ added in v1.2.0
func (c *BaseRedisQueueConsumer) Close() error
func (*BaseRedisQueueConsumer) ReceiveMessages ¶ added in v1.2.0
func (c *BaseRedisQueueConsumer) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)
ReceiveMessages 获取待消费消息
首先获取当前消费者 Pending 状态超过 X 秒的消息 而后获取全局所有的 Pending 状态超过 Y 秒的消息 最后获取 Stream 队列的消息
type BaseRedisQueueMessage ¶ added in v1.2.0
type BaseRedisQueueMessage struct {
// contains filtered or unexported fields
}
func (*BaseRedisQueueMessage) Body ¶ added in v1.2.0
func (message *BaseRedisQueueMessage) Body() string
type BaseRedisQueueService ¶ added in v1.2.0
type BaseRedisQueueService struct {
// contains filtered or unexported fields
}
func (*BaseRedisQueueService) Close ¶ added in v1.2.0
func (service *BaseRedisQueueService) Close() error
func (*BaseRedisQueueService) CreateConsumer ¶ added in v1.2.0
func (service *BaseRedisQueueService) CreateConsumer() (Consumer, error)
func (*BaseRedisQueueService) CreateProducer ¶ added in v1.2.0
func (service *BaseRedisQueueService) CreateProducer() (Producer, error)
func (*BaseRedisQueueService) IfCreateMkStream ¶ added in v1.2.0
func (service *BaseRedisQueueService) IfCreateMkStream(ctx context.Context) error
func (*BaseRedisQueueService) SendMessage ¶ added in v1.2.0
func (service *BaseRedisQueueService) SendMessage(ctx context.Context, body string) error
type ClusterRedisQueueOption ¶ added in v1.2.0
type ClusterRedisQueueOption struct { Addrs []string Password string DB *int MaxRetries *int PoolSize *int DialTimeout *time.Duration ReadTimeout *time.Duration WriteTimeout *time.Duration MinIdleConns *int MaxIdleConns *int ConnMaxIdleTime *time.Duration ConnMaxLifetime *time.Duration // queue ConsumerGroup string Idle int GlobalIdle int }
func (ClusterRedisQueueOption) CheckAWS ¶ added in v1.2.0
func (option ClusterRedisQueueOption) CheckAWS() error
func (ClusterRedisQueueOption) CheckAliCloudStorage ¶ added in v1.3.0
func (option ClusterRedisQueueOption) CheckAliCloudStorage() error
func (ClusterRedisQueueOption) CheckClusterRedis ¶ added in v1.2.0
func (option ClusterRedisQueueOption) CheckClusterRedis() error
func (ClusterRedisQueueOption) CheckStandaloneRedis ¶ added in v1.2.0
func (option ClusterRedisQueueOption) CheckStandaloneRedis() error
func (ClusterRedisQueueOption) CheckTencentCloud ¶ added in v1.2.0
func (option ClusterRedisQueueOption) CheckTencentCloud() error
func (ClusterRedisQueueOption) GetAssumeRegion ¶ added in v1.2.0
func (option ClusterRedisQueueOption) GetAssumeRegion() string
func (ClusterRedisQueueOption) GetAssumeRoleArn ¶ added in v1.2.0
func (option ClusterRedisQueueOption) GetAssumeRoleArn() string
func (ClusterRedisQueueOption) GetProvider ¶ added in v1.2.0
func (option ClusterRedisQueueOption) GetProvider() cloud.Provider
func (ClusterRedisQueueOption) GetRegion ¶ added in v1.2.0
func (option ClusterRedisQueueOption) GetRegion() string
func (ClusterRedisQueueOption) GetSecretID ¶ added in v1.2.0
func (option ClusterRedisQueueOption) GetSecretID() string
func (ClusterRedisQueueOption) GetSecretKey ¶ added in v1.2.0
func (option ClusterRedisQueueOption) GetSecretKey() string
type ClusterRedisQueueOptionV7 ¶ added in v1.3.3
type ClusterRedisQueueOptionV7 struct {
ClusterRedisQueueOption
}
func (ClusterRedisQueueOptionV7) GetProvider ¶ added in v1.3.3
func (option ClusterRedisQueueOptionV7) GetProvider() cloud.Provider
type QueueService ¶
type QueueService interface { CreateProducer() (Producer, error) CreateConsumer() (Consumer, error) Close() error }
func GetAWSQueueService ¶
func GetAWSQueueService(queueName string, option cloud.Option) (QueueService, error)
func GetClusterRedisQueueService ¶ added in v1.2.0
func GetClusterRedisQueueService(queueName string, option cloud.Option) (QueueService, error)
func GetQueueService ¶
func GetQueueService(queueOrTopicSubName string, option cloud.Option) (QueueService, error)
func GetStandaloneRedisQueueService ¶ added in v1.2.0
func GetStandaloneRedisQueueService(queueName string, option cloud.Option) (QueueService, error)
func GetTencentCloudQueueService ¶
func GetTencentCloudQueueService(topicSubName string, option cloud.Option) (QueueService, error)
type RedisQueueConsumerV7 ¶ added in v1.3.3
type RedisQueueConsumerV7 struct {
// contains filtered or unexported fields
}
func (*RedisQueueConsumerV7) AckMessage ¶ added in v1.3.3
func (c *RedisQueueConsumerV7) AckMessage(ctx context.Context, message Message) error
func (*RedisQueueConsumerV7) Close ¶ added in v1.3.3
func (c *RedisQueueConsumerV7) Close() error
func (*RedisQueueConsumerV7) ReceiveMessages ¶ added in v1.3.3
func (c *RedisQueueConsumerV7) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)
ReceiveMessages 获取待消费消息
首先获取当前消费者 Pending 状态超过 X 秒的消息 而后获取全局所有的 Pending 状态超过 Y 秒的消息 最后获取 Stream 队列的消息
type RedisQueueMessageV7 ¶ added in v1.3.3
type RedisQueueMessageV7 struct {
// contains filtered or unexported fields
}
func (*RedisQueueMessageV7) Body ¶ added in v1.3.3
func (message *RedisQueueMessageV7) Body() string
type RedisQueueServiceV7 ¶ added in v1.3.3
type RedisQueueServiceV7 struct {
// contains filtered or unexported fields
}
func (*RedisQueueServiceV7) Close ¶ added in v1.3.3
func (service *RedisQueueServiceV7) Close() error
func (*RedisQueueServiceV7) CreateConsumer ¶ added in v1.3.3
func (service *RedisQueueServiceV7) CreateConsumer() (Consumer, error)
func (*RedisQueueServiceV7) CreateProducer ¶ added in v1.3.3
func (service *RedisQueueServiceV7) CreateProducer() (Producer, error)
func (*RedisQueueServiceV7) IfCreateMkStream ¶ added in v1.3.3
func (service *RedisQueueServiceV7) IfCreateMkStream(ctx context.Context) error
func (*RedisQueueServiceV7) SendMessage ¶ added in v1.3.3
func (service *RedisQueueServiceV7) SendMessage(ctx context.Context, body string) error
type StandaloneRedisQueueOption ¶ added in v1.2.0
type StandaloneRedisQueueOption struct { Addr string Password string DB *int MaxRetries *int PoolSize *int DialTimeout *time.Duration ReadTimeout *time.Duration WriteTimeout *time.Duration MinIdleConns *int // queue ConsumerGroup string Idle int GlobalIdle int }
func (StandaloneRedisQueueOption) CheckAWS ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) CheckAWS() error
func (StandaloneRedisQueueOption) CheckAliCloudStorage ¶ added in v1.3.0
func (option StandaloneRedisQueueOption) CheckAliCloudStorage() error
func (StandaloneRedisQueueOption) CheckClusterRedis ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) CheckClusterRedis() error
func (StandaloneRedisQueueOption) CheckStandaloneRedis ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) CheckStandaloneRedis() error
func (StandaloneRedisQueueOption) CheckTencentCloud ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) CheckTencentCloud() error
func (StandaloneRedisQueueOption) GetAssumeRegion ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) GetAssumeRegion() string
func (StandaloneRedisQueueOption) GetAssumeRoleArn ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) GetAssumeRoleArn() string
func (StandaloneRedisQueueOption) GetProvider ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) GetProvider() cloud.Provider
func (StandaloneRedisQueueOption) GetRegion ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) GetRegion() string
func (StandaloneRedisQueueOption) GetSecretID ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) GetSecretID() string
func (StandaloneRedisQueueOption) GetSecretKey ¶ added in v1.2.0
func (option StandaloneRedisQueueOption) GetSecretKey() string
type StandaloneRedisQueueOptionV7 ¶ added in v1.3.3
type StandaloneRedisQueueOptionV7 struct {
StandaloneRedisQueueOption
}
func (StandaloneRedisQueueOptionV7) GetProvider ¶ added in v1.3.3
func (option StandaloneRedisQueueOptionV7) GetProvider() cloud.Provider
type TencentCloudQueueConsumer ¶
type TencentCloudQueueConsumer struct {
// contains filtered or unexported fields
}
func (*TencentCloudQueueConsumer) AckMessage ¶
func (consumer *TencentCloudQueueConsumer) AckMessage(ctx context.Context, message Message) error
func (*TencentCloudQueueConsumer) Close ¶
func (consumer *TencentCloudQueueConsumer) Close() error
func (*TencentCloudQueueConsumer) ReceiveMessages ¶
type TencentCloudQueueMessage ¶
type TencentCloudQueueMessage struct {
// contains filtered or unexported fields
}
func (*TencentCloudQueueMessage) Body ¶
func (message *TencentCloudQueueMessage) Body() string
type TencentCloudQueueOption ¶
func (TencentCloudQueueOption) CheckAWS ¶
func (option TencentCloudQueueOption) CheckAWS() error
func (TencentCloudQueueOption) CheckAliCloudStorage ¶ added in v1.3.0
func (option TencentCloudQueueOption) CheckAliCloudStorage() error
func (TencentCloudQueueOption) CheckClusterRedis ¶ added in v1.2.0
func (option TencentCloudQueueOption) CheckClusterRedis() error
func (TencentCloudQueueOption) CheckStandaloneRedis ¶ added in v1.2.0
func (option TencentCloudQueueOption) CheckStandaloneRedis() error
func (TencentCloudQueueOption) CheckTencentCloud ¶
func (option TencentCloudQueueOption) CheckTencentCloud() error
func (TencentCloudQueueOption) GetAssumeRegion ¶ added in v1.1.3
func (option TencentCloudQueueOption) GetAssumeRegion() string
func (TencentCloudQueueOption) GetAssumeRoleArn ¶ added in v1.1.3
func (option TencentCloudQueueOption) GetAssumeRoleArn() string
func (TencentCloudQueueOption) GetProvider ¶
func (option TencentCloudQueueOption) GetProvider() cloud.Provider
func (TencentCloudQueueOption) GetRegion ¶
func (option TencentCloudQueueOption) GetRegion() string
func (TencentCloudQueueOption) GetSecretID ¶
func (option TencentCloudQueueOption) GetSecretID() string
func (TencentCloudQueueOption) GetSecretKey ¶
func (option TencentCloudQueueOption) GetSecretKey() string
type TencentCloudQueueProducer ¶
type TencentCloudQueueProducer struct {
// contains filtered or unexported fields
}
func (*TencentCloudQueueProducer) Close ¶
func (producer *TencentCloudQueueProducer) Close() error
func (*TencentCloudQueueProducer) SendMessage ¶
func (producer *TencentCloudQueueProducer) SendMessage(ctx context.Context, body string) error
type TencentCloudQueueService ¶
type TencentCloudQueueService struct {
// contains filtered or unexported fields
}
func (*TencentCloudQueueService) Close ¶
func (service *TencentCloudQueueService) Close() error
func (*TencentCloudQueueService) CreateConsumer ¶
func (service *TencentCloudQueueService) CreateConsumer() (Consumer, error)
func (*TencentCloudQueueService) CreateProducer ¶
func (service *TencentCloudQueueService) CreateProducer() (Producer, error)
Click to show internal directories.
Click to hide internal directories.