queue

package
v0.5.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueueKey string = "qk"
)
View Source
const (
	TypeNode = "queues"
)

Variables

View Source
var DefaultMaxQueueLen = 100

默认最大队列长度 100

View Source
var (
	MaxRetrtCount int64 = 3 //最大重试三次
)
View Source
var Nil error = errors.New("Queue Nil")

Functions

func DeregisterProducer

func DeregisterProducer(name string)

Deregister 清理配置适配器

func DeregisterrConsumer

func DeregisterrConsumer(name string)

Deregister 清理配置适配器

func NewBuilder

func NewBuilder() container.StandardBuilder

func RegisterConsumer

func RegisterConsumer(resolver MqcResover)

RegisterConsumer 注册消息消费

func RegisterProducer

func RegisterProducer(resolver MqpResover)

RegisterProducer 注册配置文件适配器

Types

type ConsumeCallback

type ConsumeCallback func(IMQCMessage)

type DelayCallback added in v0.4.6

type DelayCallback func(key string, msgList ...Message) error

DelayCallback 延迟消息处理回调

type DelayProcessor added in v0.4.6

type DelayProcessor interface {
	Start(done chan struct{})
	AppendMessage(msg Message, delaySeconds int64) error
}

DelayProcessor 延迟消息队列处理器

type IComponentQueue

type IComponentQueue interface {
	GetQueue(name string) (q IQueue)
}

IComponentQueue Component Queue

type IMQC

type IMQC interface {
	Connect() error
	Consume(task TaskInfo, callback ConsumeCallback) (err error)
	Unconsume(queue string)
	Start()
	Close()
}

IMQC consumer接口

func NewMQC

func NewMQC(proto, configName string, setting config.Config) (IMQC, error)

NewMQC 根据适配器名称及参数返回配置处理器

type IMQCMessage

type IMQCMessage interface {
	MessageId() string
	RetryCount() int64
	Ack() error
	Nack(error) error
	Original() string
	GetMessage() Message
}

IMQCMessage 队列消息

type IMQP

type IMQP interface {
	Push(key string, value Message) error
	DelayPush(key string, value Message, delaySeconds int64) error
	Count(key string) (int64, error)
	Close() error
}

IMQP 消息生产

func NewMQP

func NewMQP(proto string, setting config.Config, opts ...Option) (IMQP, error)

NewMQP 根据适配器名称及参数返回配置处理器

type IQueue

type IQueue interface {
	Send(ctx context.Context, key string, value interface{}) error
	DelaySend(ctx context.Context, key string, value interface{}, delaySeconds int64) error
	Count(key string) (int64, error)
}

IQueue 消息队列

type Message

type Message interface {
	encoding.BinaryMarshaler
	Header() map[string]string
	Body() []byte
	String() string
}

func NewMsg added in v0.3.0

func NewMsg(obj interface{}, opts ...MsgOption) (msg Message)

type MqcResover added in v0.4.4

type MqcResover interface {
	Name() string
	Resolve(configName string, setting config.Config) (IMQC, error)
}

mqcResover 定义消息消费解析器

type MqpResover added in v0.4.4

type MqpResover interface {
	Name() string
	Resolve(setting config.Config, opts ...Option) (IMQP, error)
}

MqpResover 定义配置文件转换方法

type MsgItem

type MsgItem struct {
	HeaderMap xtypes.SMap     `json:"header"`
	BodyBytes json.RawMessage `json:"body"`
	ItemBytes json.RawMessage `json:"-"`
}

func (MsgItem) Body

func (w MsgItem) Body() []byte

func (MsgItem) Header

func (w MsgItem) Header() map[string]string

func (MsgItem) MarshalBinary added in v0.4.6

func (w MsgItem) MarshalBinary() (data []byte, err error)

func (*MsgItem) String

func (w *MsgItem) String() string

type MsgOption added in v0.4.3

type MsgOption func(m *MsgWrap)

func WithHeader added in v0.3.0

func WithHeader(key, val string) MsgOption

func WithXRequestID added in v0.3.0

func WithXRequestID(reqId string) MsgOption

type MsgWrap added in v0.3.0

type MsgWrap struct {
	HeaderMap xtypes.SMap     `json:"header,omitempty"`
	BodyBytes json.RawMessage `json:"body"`
	// contains filtered or unexported fields
}

func (*MsgWrap) Body added in v0.3.0

func (w *MsgWrap) Body() []byte

func (*MsgWrap) Header added in v0.3.0

func (w *MsgWrap) Header() map[string]string

func (MsgWrap) MarshalBinary added in v0.4.6

func (w MsgWrap) MarshalBinary() (data []byte, err error)

func (MsgWrap) String added in v0.3.0

func (w MsgWrap) String() string

type Option added in v0.3.0

type Option func(*Options)

Option 配置选项

func WithDBIndex added in v0.4.3

func WithDBIndex(idx int) Option

func WithOption added in v0.4.3

func WithOption(key string, val any) Option

func WithPoolSize added in v0.4.3

func WithPoolSize(size int) Option

type Options added in v0.4.3

type Options struct {
	CfgData map[string]any
}

type StandardQueue

type StandardQueue interface {
	GetQueue(name string, opts ...Option) (q IQueue)
}

func NewStandardQueue

func NewStandardQueue(c container.Container) StandardQueue

NewStandardQueue 创建queue

type TaskInfo added in v0.2.0

type TaskInfo interface {
	GetQueue() string
	GetConcurrency() int
	GetVisibilityTimeout() int
	GetBufferSize() int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL