Documentation ¶
Overview ¶
Package gmq is a generated GoMock package.
Index ¶
- Constants
- Variables
- func MakeRedisUniversalClient(opts *redis.Options) (rs interface{})
- func NewKeyDailyStatCompleted(ns, queueName, YYYYMMDD string) string
- func NewKeyDailyStatFailed(ns, queueName, YYYYMMDD string) string
- func NewKeyMsgDetail(ns, queueName, msgId string) string
- func NewKeyMsgUnique(ns, queueName, msgId string) string
- func NewKeyQueueCompleted(ns, queueName string) string
- func NewKeyQueueFailed(ns, queueName string) string
- func NewKeyQueueFailedHistory(ns, queueName, msgId string) string
- func NewKeyQueueList() string
- func NewKeyQueuePattern(ns, queueName string) string
- func NewKeyQueuePaused(ns, queueName string) string
- func NewKeyQueuePending(ns, queueName string) string
- func NewKeyQueueProcessing(ns, queueName string) string
- func NewKeyQueueState(ns, queueName string, state string) string
- func NotFound(ctx context.Context, msg IMsg) error
- func TestBroker_AutoDeduplicateCompletedMsg(t *testing.T, broker Broker)
- func TestBroker_AutoDeduplicateFailedMsg(t *testing.T, broker Broker)
- func TestBroker_AutoDeduplicateMsgByDefault(t *testing.T, broker Broker)
- func TestBroker_Complete(t *testing.T, broker Broker)
- func TestBroker_DeleteAgo(t *testing.T, broker Broker)
- func TestBroker_DeleteMsg(t *testing.T, broker Broker)
- func TestBroker_DeleteQueue(t *testing.T, broker Broker)
- func TestBroker_Dequeue(t *testing.T, broker Broker)
- func TestBroker_Enqueue(t *testing.T, broker Broker)
- func TestBroker_Fail(t *testing.T, broker Broker)
- func TestBroker_GetMsg(t *testing.T, broker Broker)
- func TestBroker_GetStats(t *testing.T, broker Broker)
- func TestBroker_GetStatsByDate(t *testing.T, broker Broker)
- func TestBroker_ListFailed(t *testing.T, broker Broker)
- func TestBroker_ListFailedMaxItems(t *testing.T, broker Broker)
- func TestBroker_ListMsg(t *testing.T, broker Broker)
- func TestBroker_ListQueue(t *testing.T, broker Broker)
- type Broker
- type BrokerInMemory
- func (it *BrokerInMemory) Close() error
- func (it *BrokerInMemory) Complete(ctx context.Context, msg IMsg) error
- func (it *BrokerInMemory) DeleteAgo(ctx context.Context, queueName string, duration time.Duration) error
- func (it *BrokerInMemory) DeleteMsg(ctx context.Context, queueName string, msgId string) error
- func (it *BrokerInMemory) DeleteQueue(ctx context.Context, queueName string) error
- func (it *BrokerInMemory) Dequeue(ctx context.Context, queueName string) (*Msg, error)
- func (it *BrokerInMemory) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)
- func (it *BrokerInMemory) Fail(ctx context.Context, msg IMsg, errFail error) error
- func (it *BrokerInMemory) GetMsg(ctx context.Context, queueName string, msgId string) (*Msg, error)
- func (it *BrokerInMemory) GetStats(ctx context.Context) ([]*QueueStat, error)
- func (it *BrokerInMemory) GetStatsByDate(ctx context.Context, listQueueNames []string, YYYYMMDD string) (*QueueDailyStat, error)
- func (it *BrokerInMemory) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)
- func (it *BrokerInMemory) Init(ctx context.Context, queueName string) error
- func (it *BrokerInMemory) ListFailed(ctx context.Context, queueName string, msgId string, limit int64, offset int64) (rs []*Msg, err error)
- func (it *BrokerInMemory) ListMsg(ctx context.Context, queueName string, state string, limit int64, offset int64) ([]string, error)
- func (it *BrokerInMemory) ListQueue(ctx context.Context) (rs []string, err error)
- func (it *BrokerInMemory) Pause(ctx context.Context, queueName string) error
- func (it *BrokerInMemory) Ping(ctx context.Context) error
- func (it *BrokerInMemory) Resume(ctx context.Context, queueName string) error
- func (it *BrokerInMemory) SetClock(c Clock)
- func (it *BrokerInMemory) UTC(flag bool)
- type BrokerInMemoryOpts
- type BrokerRedis
- func (it *BrokerRedis) Close() error
- func (it *BrokerRedis) Complete(ctx context.Context, msg IMsg) (err error)
- func (it *BrokerRedis) DeleteAgo(ctx context.Context, queueName string, duration time.Duration) (err error)
- func (it *BrokerRedis) DeleteMsg(ctx context.Context, queueName, msgId string) (err error)
- func (it *BrokerRedis) DeleteQueue(ctx context.Context, queueName string) (err error)
- func (it *BrokerRedis) Dequeue(ctx context.Context, queueName string) (msg *Msg, err error)
- func (it *BrokerRedis) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (rs *Msg, err error)
- func (it *BrokerRedis) Fail(ctx context.Context, msg IMsg, errFail error) (err error)
- func (it *BrokerRedis) GetMsg(ctx context.Context, queueName, msgId string) (msg *Msg, err error)
- func (it *BrokerRedis) GetStats(ctx context.Context) (rs []*QueueStat, err error)
- func (it *BrokerRedis) GetStatsByDate(ctx context.Context, listQueueNames []string, date string) (rs *QueueDailyStat, err error)
- func (it *BrokerRedis) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)
- func (it *BrokerRedis) Init(ctx context.Context, queueName string) (err error)
- func (it *BrokerRedis) ListFailed(ctx context.Context, queueName string, msgId string, limit int64, offset int64) (rs []*Msg, err error)
- func (it *BrokerRedis) ListMsg(ctx context.Context, queueName, state string, limit, offset int64) (values []string, err error)
- func (it *BrokerRedis) ListQueue(ctx context.Context) (rs []string, err error)
- func (it *BrokerRedis) Pause(ctx context.Context, qname string) error
- func (it *BrokerRedis) Ping(ctx context.Context) error
- func (it *BrokerRedis) Resume(ctx context.Context, qname string) error
- func (it *BrokerRedis) SetClock(c Clock)
- func (it *BrokerRedis) UTC(flag bool)
- type BrokerUnimplemented
- func (*BrokerUnimplemented) Close() error
- func (*BrokerUnimplemented) Complete(ctx context.Context, msg IMsg) error
- func (*BrokerUnimplemented) DeleteAgo(ctx context.Context, queueName string, seconds int64) error
- func (*BrokerUnimplemented) DeleteMsg(ctx context.Context, queueName string, msgId string) error
- func (*BrokerUnimplemented) DeleteQueue(ctx context.Context, queueName string) error
- func (*BrokerUnimplemented) Dequeue(ctx context.Context, queueName string) (*Msg, error)
- func (*BrokerUnimplemented) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)
- func (*BrokerUnimplemented) Fail(ctx context.Context, msg IMsg, errFail error) error
- func (*BrokerUnimplemented) GetMsg(ctx context.Context, queueName string, msgId string) (*Msg, error)
- func (*BrokerUnimplemented) GetStats(ctx context.Context) ([]*QueueStat, error)
- func (*BrokerUnimplemented) GetStatsByDate(ctx context.Context, YYYYMMDD string) (*QueueDailyStat, error)
- func (*BrokerUnimplemented) GetStatsWeekly(ctx context.Context) (*[]QueueDailyStat, *QueueDailyStat, error)
- func (*BrokerUnimplemented) Init(ctx context.Context, queueName string) error
- func (*BrokerUnimplemented) ListMsg(ctx context.Context, queueName string, state string, limit int64, offset int64) ([]string, error)
- func (*BrokerUnimplemented) Pause(ctx context.Context, queueName string) error
- func (*BrokerUnimplemented) Ping(ctx context.Context) error
- func (*BrokerUnimplemented) Resume(ctx context.Context, queueName string) error
- func (*BrokerUnimplemented) SetClock(c Clock)
- type Cleaner
- type CleanerParams
- type Client
- type Clock
- type Config
- type CronJob
- type FuncEnqueueErrorHandler
- type FuncWorkInterval
- type Handler
- type HandlerFunc
- type IMsg
- type Logger
- type MiddlewareFunc
- type MockBroker
- func (m *MockBroker) Close() error
- func (m *MockBroker) Complete(ctx context.Context, msg IMsg) error
- func (m *MockBroker) DeleteAgo(ctx context.Context, queueName string, duration time.Duration) error
- func (m *MockBroker) DeleteMsg(ctx context.Context, queueName, id string) error
- func (m *MockBroker) DeleteQueue(ctx context.Context, queueName string) error
- func (m *MockBroker) Dequeue(ctx context.Context, queueName string) (*Msg, error)
- func (m *MockBroker) EXPECT() *MockBrokerMockRecorder
- func (m *MockBroker) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)
- func (m *MockBroker) Fail(ctx context.Context, msg IMsg, errFail error) error
- func (m *MockBroker) GetMsg(ctx context.Context, queueName, id string) (*Msg, error)
- func (m *MockBroker) GetStats(ctx context.Context) ([]*QueueStat, error)
- func (m *MockBroker) GetStatsByDate(ctx context.Context, listQueueNames []string, YYYYMMDD string) (*QueueDailyStat, error)
- func (m *MockBroker) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)
- func (m *MockBroker) Init(ctx context.Context, queueName string) error
- func (m *MockBroker) ListFailed(ctx context.Context, queueName, msgId string, limit, offset int64) ([]*Msg, error)
- func (m *MockBroker) ListMsg(ctx context.Context, queueName, state string, limit, offset int64) ([]string, error)
- func (m *MockBroker) ListQueue(ctx context.Context) ([]string, error)
- func (m *MockBroker) Pause(ctx context.Context, queueName string) error
- func (m *MockBroker) Ping(ctx context.Context) error
- func (m *MockBroker) Resume(ctx context.Context, queueName string) error
- func (m *MockBroker) SetClock(c Clock)
- func (m *MockBroker) UTC(flag bool)
- type MockBrokerMockRecorder
- func (mr *MockBrokerMockRecorder) Close() *gomock.Call
- func (mr *MockBrokerMockRecorder) Complete(ctx, msg any) *gomock.Call
- func (mr *MockBrokerMockRecorder) DeleteAgo(ctx, queueName, duration any) *gomock.Call
- func (mr *MockBrokerMockRecorder) DeleteMsg(ctx, queueName, id any) *gomock.Call
- func (mr *MockBrokerMockRecorder) DeleteQueue(ctx, queueName any) *gomock.Call
- func (mr *MockBrokerMockRecorder) Dequeue(ctx, queueName any) *gomock.Call
- func (mr *MockBrokerMockRecorder) Enqueue(ctx, msg any, opts ...any) *gomock.Call
- func (mr *MockBrokerMockRecorder) Fail(ctx, msg, errFail any) *gomock.Call
- func (mr *MockBrokerMockRecorder) GetMsg(ctx, queueName, id any) *gomock.Call
- func (mr *MockBrokerMockRecorder) GetStats(ctx any) *gomock.Call
- func (mr *MockBrokerMockRecorder) GetStatsByDate(ctx, listQueueNames, YYYYMMDD any) *gomock.Call
- func (mr *MockBrokerMockRecorder) GetStatsWeekly(ctx, listQueueNames any) *gomock.Call
- func (mr *MockBrokerMockRecorder) Init(ctx, queueName any) *gomock.Call
- func (mr *MockBrokerMockRecorder) ListFailed(ctx, queueName, msgId, limit, offset any) *gomock.Call
- func (mr *MockBrokerMockRecorder) ListMsg(ctx, queueName, state, limit, offset any) *gomock.Call
- func (mr *MockBrokerMockRecorder) ListQueue(ctx any) *gomock.Call
- func (mr *MockBrokerMockRecorder) Pause(ctx, queueName any) *gomock.Call
- func (mr *MockBrokerMockRecorder) Ping(ctx any) *gomock.Call
- func (mr *MockBrokerMockRecorder) Resume(ctx, queueName any) *gomock.Call
- func (mr *MockBrokerMockRecorder) SetClock(c any) *gomock.Call
- func (mr *MockBrokerMockRecorder) UTC(flag any) *gomock.Call
- type Msg
- type MsgHistory
- type Mux
- type MuxEntry
- type OptTypeClient
- type OptTypeServer
- type OptionClient
- type OptionServer
- type Processor
- type ProcessorParams
- type QueueCfg
- type QueueDailyStat
- type QueueStat
- type Scheduler
- type SchedulerParams
- type Server
- type SimulatedClock
- type WallClock
Constants ¶
const ( // any state except failed message life time TTLMsg = 60 * 60 * 24 * 7 // 7 days // failed state message life time TTLDeadMsg = 60 * 60 * 24 * 3 // 3 days )
const ( LuaReturnCodeSucc = iota // confirm to POSIX shell/C return code common rule, 0 means successfully LuaReturnCodeError )
const ( QueueNameList = "queues" QueueNamePaused = "paused" )
const ( // message init state MsgStatePending = "pending" // message is processing by worker MsgStateProcessing = "processing" // message has been processed and failed MsgStateFailed = "failed" // message has been processed successfully MsgStateCompleted = "completed" )
message state list
const (
DefaultDurationRestIfNoMsg = time.Second * time.Duration(1)
)
const (
DefaultMaxBytes = 1024 * 1024 * 32 // 32 MB
)
const (
DefaultMaxItemsLimit = 10
)
const (
DefaultQueueName = "default"
)
const (
DefaultTTLMsgUniq = time.Second * time.Duration(60*60*24) // 1 day
)
const (
Namespace = "gmq"
)
Variables ¶
Functions ¶
func MakeRedisUniversalClient ¶
func MakeRedisUniversalClient(opts *redis.Options) (rs interface{})
func NewKeyDailyStatCompleted ¶ added in v0.0.13
<namespace>:<queueName>:completed:<YYYY-MM-DD>
func NewKeyDailyStatFailed ¶
<namespace>:<queueName>:failed:<YYYY-MM-DD>
func NewKeyMsgDetail ¶
func NewKeyMsgUnique ¶
func NewKeyQueueCompleted ¶ added in v0.0.13
func NewKeyQueueFailed ¶
func NewKeyQueueFailedHistory ¶ added in v0.0.13
<namespace>:<queueName>:his:<msgId>
func NewKeyQueueList ¶
func NewKeyQueueList() string
func NewKeyQueuePattern ¶ added in v0.0.11
func NewKeyQueuePaused ¶
func NewKeyQueuePending ¶
func NewKeyQueueProcessing ¶
func NewKeyQueueState ¶ added in v0.0.11
func TestBroker_AutoDeduplicateCompletedMsg ¶ added in v0.0.14
func TestBroker_AutoDeduplicateFailedMsg ¶ added in v0.0.14
func TestBroker_AutoDeduplicateMsgByDefault ¶ added in v0.0.14
func TestBroker_Complete ¶ added in v0.0.14
func TestBroker_DeleteAgo ¶ added in v0.0.14
func TestBroker_DeleteMsg ¶ added in v0.0.14
func TestBroker_DeleteQueue ¶ added in v0.0.14
func TestBroker_Dequeue ¶ added in v0.0.14
func TestBroker_Enqueue ¶ added in v0.0.14
func TestBroker_Fail ¶ added in v0.0.14
func TestBroker_GetMsg ¶ added in v0.0.14
func TestBroker_GetStats ¶ added in v0.0.14
func TestBroker_GetStatsByDate ¶ added in v0.0.14
func TestBroker_ListFailed ¶ added in v0.0.14
func TestBroker_ListFailedMaxItems ¶ added in v0.0.14
func TestBroker_ListMsg ¶ added in v0.0.14
func TestBroker_ListQueue ¶ added in v0.0.14
Types ¶
type Broker ¶
type Broker interface { Close() error Complete(ctx context.Context, msg IMsg) error Dequeue(ctx context.Context, queueName string) (*Msg, error) DeleteMsg(ctx context.Context, queueName, id string) error DeleteQueue(ctx context.Context, queueName string) error DeleteAgo(ctx context.Context, queueName string, duration time.Duration) error Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error) Fail(ctx context.Context, msg IMsg, errFail error) error GetMsg(ctx context.Context, queueName, id string) (*Msg, error) // return a list message id(not internal msgId) of queue with specified name and limit ListMsg(ctx context.Context, queueName, state string, limit, offset int64) ([]string, error) // return a list failed message id of queue with specified queue name and message id // NOTICE: It is ordered from fresh to old. ListFailed(ctx context.Context, queueName, msgId string, limit, offset int64) ([]*Msg, error) ListQueue(ctx context.Context) ([]string, error) GetStats(ctx context.Context) ([]*QueueStat, error) Init(ctx context.Context, queueName string) error Ping(ctx context.Context) error GetStatsByDate(ctx context.Context, listQueueNames []string, YYYYMMDD string) (*QueueDailyStat, error) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error) Pause(ctx context.Context, queueName string) error Resume(ctx context.Context, queueName string) error // SetClock custom internal clock for testing SetClock(c Clock) // processing time in UTC instead of local UTC(flag bool) }
func NewBrokerInMemory ¶ added in v0.0.13
func NewBrokerInMemory(opts *BrokerInMemoryOpts) (rs Broker, err error)
func NewBrokerRedis ¶
type BrokerInMemory ¶ added in v0.0.13
type BrokerInMemory struct { BrokerUnimplemented // contains filtered or unexported fields }
func (*BrokerInMemory) Close ¶ added in v0.0.13
func (it *BrokerInMemory) Close() error
Close implements Broker
func (*BrokerInMemory) Complete ¶ added in v0.0.13
func (it *BrokerInMemory) Complete(ctx context.Context, msg IMsg) error
Complete implements Broker
func (*BrokerInMemory) DeleteAgo ¶ added in v0.0.13
func (it *BrokerInMemory) DeleteAgo(ctx context.Context, queueName string, duration time.Duration) error
DeleteAgo implements Broker
func (*BrokerInMemory) DeleteQueue ¶ added in v0.0.13
func (it *BrokerInMemory) DeleteQueue(ctx context.Context, queueName string) error
DeleteQueue implements Broker
func (*BrokerInMemory) Enqueue ¶ added in v0.0.13
func (it *BrokerInMemory) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)
Enqueue implements Broker
func (*BrokerInMemory) GetStats ¶ added in v0.0.13
func (it *BrokerInMemory) GetStats(ctx context.Context) ([]*QueueStat, error)
GetStats implements Broker
func (*BrokerInMemory) GetStatsByDate ¶ added in v0.0.13
func (it *BrokerInMemory) GetStatsByDate(ctx context.Context, listQueueNames []string, YYYYMMDD string) (*QueueDailyStat, error)
GetStatsByDate implements Broker
func (*BrokerInMemory) GetStatsWeekly ¶ added in v0.0.13
func (it *BrokerInMemory) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)
GetStatsWeekly implements Broker
func (*BrokerInMemory) Init ¶ added in v0.0.13
func (it *BrokerInMemory) Init(ctx context.Context, queueName string) error
Init implements Broker
func (*BrokerInMemory) ListFailed ¶ added in v0.0.13
func (it *BrokerInMemory) ListFailed(ctx context.Context, queueName string, msgId string, limit int64, offset int64) (rs []*Msg, err error)
ListFailed implements Broker
func (*BrokerInMemory) ListMsg ¶ added in v0.0.13
func (it *BrokerInMemory) ListMsg(ctx context.Context, queueName string, state string, limit int64, offset int64) ([]string, error)
ListMsg implements Broker
func (*BrokerInMemory) ListQueue ¶ added in v0.0.13
func (it *BrokerInMemory) ListQueue(ctx context.Context) (rs []string, err error)
ListQueue implements Broker
func (*BrokerInMemory) Pause ¶ added in v0.0.13
func (it *BrokerInMemory) Pause(ctx context.Context, queueName string) error
Pause implements Broker
func (*BrokerInMemory) Ping ¶ added in v0.0.13
func (it *BrokerInMemory) Ping(ctx context.Context) error
Ping implements Broker
func (*BrokerInMemory) Resume ¶ added in v0.0.13
func (it *BrokerInMemory) Resume(ctx context.Context, queueName string) error
Resume implements Broker
func (*BrokerInMemory) SetClock ¶ added in v0.0.13
func (it *BrokerInMemory) SetClock(c Clock)
SetClock implements Broker
func (*BrokerInMemory) UTC ¶ added in v0.0.13
func (it *BrokerInMemory) UTC(flag bool)
UTC implements Broker
type BrokerInMemoryOpts ¶ added in v0.0.13
type BrokerInMemoryOpts struct {
MaxBytes int64
}
type BrokerRedis ¶
type BrokerRedis struct { BrokerUnimplemented // contains filtered or unexported fields }
func (*BrokerRedis) Close ¶
func (it *BrokerRedis) Close() error
func (*BrokerRedis) Complete ¶
func (it *BrokerRedis) Complete(ctx context.Context, msg IMsg) (err error)
func (*BrokerRedis) DeleteAgo ¶
func (it *BrokerRedis) DeleteAgo(ctx context.Context, queueName string, duration time.Duration) (err error)
delete messages old than <nowUnixTimestamp - duration>
func (*BrokerRedis) DeleteMsg ¶ added in v0.0.11
func (it *BrokerRedis) DeleteMsg(ctx context.Context, queueName, msgId string) (err error)
func (*BrokerRedis) DeleteQueue ¶ added in v0.0.11
func (it *BrokerRedis) DeleteQueue(ctx context.Context, queueName string) (err error)
func (*BrokerRedis) Enqueue ¶
func (it *BrokerRedis) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (rs *Msg, err error)
func (*BrokerRedis) GetStats ¶
func (it *BrokerRedis) GetStats(ctx context.Context) (rs []*QueueStat, err error)
func (*BrokerRedis) GetStatsByDate ¶
func (it *BrokerRedis) GetStatsByDate(ctx context.Context, listQueueNames []string, date string) (rs *QueueDailyStat, err error)
func (*BrokerRedis) GetStatsWeekly ¶ added in v0.0.10
func (it *BrokerRedis) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)
func (*BrokerRedis) Init ¶
func (it *BrokerRedis) Init(ctx context.Context, queueName string) (err error)
func (*BrokerRedis) ListFailed ¶ added in v0.0.13
func (*BrokerRedis) ListQueue ¶ added in v0.0.13
func (it *BrokerRedis) ListQueue(ctx context.Context) (rs []string, err error)
ListQueue implements Broker
func (*BrokerRedis) Pause ¶ added in v0.0.10
func (it *BrokerRedis) Pause(ctx context.Context, qname string) error
func (*BrokerRedis) Resume ¶ added in v0.0.10
func (it *BrokerRedis) Resume(ctx context.Context, qname string) error
func (*BrokerRedis) SetClock ¶
func (it *BrokerRedis) SetClock(c Clock)
func (*BrokerRedis) UTC ¶ added in v0.0.13
func (it *BrokerRedis) UTC(flag bool)
UTC implements Broker
type BrokerUnimplemented ¶ added in v0.0.13
type BrokerUnimplemented struct{}
BrokerUnimplemented must be embedded to have forward compatible implementations.
func (*BrokerUnimplemented) Close ¶ added in v0.0.13
func (*BrokerUnimplemented) Close() error
Close implements Broker
func (*BrokerUnimplemented) Complete ¶ added in v0.0.13
func (*BrokerUnimplemented) Complete(ctx context.Context, msg IMsg) error
Complete implements Broker
func (*BrokerUnimplemented) DeleteQueue ¶ added in v0.0.13
func (*BrokerUnimplemented) DeleteQueue(ctx context.Context, queueName string) error
DeleteQueue implements Broker
func (*BrokerUnimplemented) Enqueue ¶ added in v0.0.13
func (*BrokerUnimplemented) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)
Enqueue implements Broker
func (*BrokerUnimplemented) GetMsg ¶ added in v0.0.13
func (*BrokerUnimplemented) GetMsg(ctx context.Context, queueName string, msgId string) (*Msg, error)
GetMsg implements Broker
func (*BrokerUnimplemented) GetStats ¶ added in v0.0.13
func (*BrokerUnimplemented) GetStats(ctx context.Context) ([]*QueueStat, error)
GetStats implements Broker
func (*BrokerUnimplemented) GetStatsByDate ¶ added in v0.0.13
func (*BrokerUnimplemented) GetStatsByDate(ctx context.Context, YYYYMMDD string) (*QueueDailyStat, error)
GetStatsByDate implements Broker
func (*BrokerUnimplemented) GetStatsWeekly ¶ added in v0.0.13
func (*BrokerUnimplemented) GetStatsWeekly(ctx context.Context) (*[]QueueDailyStat, *QueueDailyStat, error)
GetStatsWeekly implements Broker
func (*BrokerUnimplemented) Init ¶ added in v0.0.13
func (*BrokerUnimplemented) Init(ctx context.Context, queueName string) error
Init implements Broker
func (*BrokerUnimplemented) ListMsg ¶ added in v0.0.13
func (*BrokerUnimplemented) ListMsg(ctx context.Context, queueName string, state string, limit int64, offset int64) ([]string, error)
ListMsg implements Broker
func (*BrokerUnimplemented) Pause ¶ added in v0.0.13
func (*BrokerUnimplemented) Pause(ctx context.Context, queueName string) error
Pause implements Broker
func (*BrokerUnimplemented) Ping ¶ added in v0.0.13
func (*BrokerUnimplemented) Ping(ctx context.Context) error
Ping implements Broker
func (*BrokerUnimplemented) Resume ¶ added in v0.0.13
func (*BrokerUnimplemented) Resume(ctx context.Context, queueName string) error
Resume implements Broker
func (*BrokerUnimplemented) SetClock ¶ added in v0.0.13
func (*BrokerUnimplemented) SetClock(c Clock)
SetClock implements Broker
type Cleaner ¶
type Cleaner struct {
// contains filtered or unexported fields
}
Cleaner auto-delete dead or failed messages, completed messages at intervals.
func NewCleaner ¶
func NewCleaner(params CleanerParams) *Cleaner
type CleanerParams ¶
type Client ¶
type Client struct {
Broker Broker
}
func NewClientFromBroker ¶
func NewClientRedis ¶ added in v0.0.13
type FuncEnqueueErrorHandler ¶ added in v0.0.9
type FuncEnqueueErrorHandler func(msg IMsg, opts []OptionClient, err error)
type FuncWorkInterval ¶
type HandlerFunc ¶
func (HandlerFunc) ProcessMsg ¶
func (fn HandlerFunc) ProcessMsg(ctx context.Context, msg IMsg) error
ProcessMsg calls fn(ctx, task)
type Logger ¶
type Logger interface { // Debug logs a message at Debug level. Debug(args ...interface{}) // Debugf logs a message at Debug level. Debugf(template string, args ...interface{}) // Info logs a message at Info level. Info(args ...interface{}) // Infof logs a message at Info level. Infof(template string, args ...interface{}) // Warn logs a message at Warning level. Warn(args ...interface{}) // Warnf logs a message at Warning level. Warnf(template string, args ...interface{}) // Error logs a message at Error level. Error(args ...interface{}) // Errorf logs a message at Error level. Errorf(template string, args ...interface{}) // Fatal logs a message at Fatal level // and process will exit with status set to 1. Fatal(args ...interface{}) // Fatalf logs a message at Fatal level. Fatalf(template string, args ...interface{}) }
Logger supports logging at various log levels.
type MiddlewareFunc ¶
type MockBroker ¶ added in v0.0.14
type MockBroker struct {
// contains filtered or unexported fields
}
MockBroker is a mock of Broker interface.
func NewMockBroker ¶ added in v0.0.14
func NewMockBroker(ctrl *gomock.Controller) *MockBroker
NewMockBroker creates a new mock instance.
func (*MockBroker) Close ¶ added in v0.0.14
func (m *MockBroker) Close() error
Close mocks base method.
func (*MockBroker) Complete ¶ added in v0.0.14
func (m *MockBroker) Complete(ctx context.Context, msg IMsg) error
Complete mocks base method.
func (*MockBroker) DeleteMsg ¶ added in v0.0.14
func (m *MockBroker) DeleteMsg(ctx context.Context, queueName, id string) error
DeleteMsg mocks base method.
func (*MockBroker) DeleteQueue ¶ added in v0.0.14
func (m *MockBroker) DeleteQueue(ctx context.Context, queueName string) error
DeleteQueue mocks base method.
func (*MockBroker) EXPECT ¶ added in v0.0.14
func (m *MockBroker) EXPECT() *MockBrokerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockBroker) Enqueue ¶ added in v0.0.14
func (m *MockBroker) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)
Enqueue mocks base method.
func (*MockBroker) GetStats ¶ added in v0.0.14
func (m *MockBroker) GetStats(ctx context.Context) ([]*QueueStat, error)
GetStats mocks base method.
func (*MockBroker) GetStatsByDate ¶ added in v0.0.14
func (m *MockBroker) GetStatsByDate(ctx context.Context, listQueueNames []string, YYYYMMDD string) (*QueueDailyStat, error)
GetStatsByDate mocks base method.
func (*MockBroker) GetStatsWeekly ¶ added in v0.0.14
func (m *MockBroker) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)
GetStatsWeekly mocks base method.
func (*MockBroker) Init ¶ added in v0.0.14
func (m *MockBroker) Init(ctx context.Context, queueName string) error
Init mocks base method.
func (*MockBroker) ListFailed ¶ added in v0.0.14
func (m *MockBroker) ListFailed(ctx context.Context, queueName, msgId string, limit, offset int64) ([]*Msg, error)
ListFailed mocks base method.
func (*MockBroker) ListMsg ¶ added in v0.0.14
func (m *MockBroker) ListMsg(ctx context.Context, queueName, state string, limit, offset int64) ([]string, error)
ListMsg mocks base method.
func (*MockBroker) ListQueue ¶ added in v0.0.14
func (m *MockBroker) ListQueue(ctx context.Context) ([]string, error)
ListQueue mocks base method.
func (*MockBroker) Pause ¶ added in v0.0.14
func (m *MockBroker) Pause(ctx context.Context, queueName string) error
Pause mocks base method.
func (*MockBroker) Ping ¶ added in v0.0.14
func (m *MockBroker) Ping(ctx context.Context) error
Ping mocks base method.
func (*MockBroker) Resume ¶ added in v0.0.14
func (m *MockBroker) Resume(ctx context.Context, queueName string) error
Resume mocks base method.
func (*MockBroker) SetClock ¶ added in v0.0.14
func (m *MockBroker) SetClock(c Clock)
SetClock mocks base method.
func (*MockBroker) UTC ¶ added in v0.0.14
func (m *MockBroker) UTC(flag bool)
UTC mocks base method.
type MockBrokerMockRecorder ¶ added in v0.0.14
type MockBrokerMockRecorder struct {
// contains filtered or unexported fields
}
MockBrokerMockRecorder is the mock recorder for MockBroker.
func (*MockBrokerMockRecorder) Close ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close.
func (*MockBrokerMockRecorder) Complete ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) Complete(ctx, msg any) *gomock.Call
Complete indicates an expected call of Complete.
func (*MockBrokerMockRecorder) DeleteAgo ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) DeleteAgo(ctx, queueName, duration any) *gomock.Call
DeleteAgo indicates an expected call of DeleteAgo.
func (*MockBrokerMockRecorder) DeleteMsg ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) DeleteMsg(ctx, queueName, id any) *gomock.Call
DeleteMsg indicates an expected call of DeleteMsg.
func (*MockBrokerMockRecorder) DeleteQueue ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) DeleteQueue(ctx, queueName any) *gomock.Call
DeleteQueue indicates an expected call of DeleteQueue.
func (*MockBrokerMockRecorder) Dequeue ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) Dequeue(ctx, queueName any) *gomock.Call
Dequeue indicates an expected call of Dequeue.
func (*MockBrokerMockRecorder) Enqueue ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) Enqueue(ctx, msg any, opts ...any) *gomock.Call
Enqueue indicates an expected call of Enqueue.
func (*MockBrokerMockRecorder) Fail ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) Fail(ctx, msg, errFail any) *gomock.Call
Fail indicates an expected call of Fail.
func (*MockBrokerMockRecorder) GetMsg ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) GetMsg(ctx, queueName, id any) *gomock.Call
GetMsg indicates an expected call of GetMsg.
func (*MockBrokerMockRecorder) GetStats ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) GetStats(ctx any) *gomock.Call
GetStats indicates an expected call of GetStats.
func (*MockBrokerMockRecorder) GetStatsByDate ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) GetStatsByDate(ctx, listQueueNames, YYYYMMDD any) *gomock.Call
GetStatsByDate indicates an expected call of GetStatsByDate.
func (*MockBrokerMockRecorder) GetStatsWeekly ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) GetStatsWeekly(ctx, listQueueNames any) *gomock.Call
GetStatsWeekly indicates an expected call of GetStatsWeekly.
func (*MockBrokerMockRecorder) Init ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) Init(ctx, queueName any) *gomock.Call
Init indicates an expected call of Init.
func (*MockBrokerMockRecorder) ListFailed ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) ListFailed(ctx, queueName, msgId, limit, offset any) *gomock.Call
ListFailed indicates an expected call of ListFailed.
func (*MockBrokerMockRecorder) ListMsg ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) ListMsg(ctx, queueName, state, limit, offset any) *gomock.Call
ListMsg indicates an expected call of ListMsg.
func (*MockBrokerMockRecorder) ListQueue ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) ListQueue(ctx any) *gomock.Call
ListQueue indicates an expected call of ListQueue.
func (*MockBrokerMockRecorder) Pause ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) Pause(ctx, queueName any) *gomock.Call
Pause indicates an expected call of Pause.
func (*MockBrokerMockRecorder) Ping ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) Ping(ctx any) *gomock.Call
Ping indicates an expected call of Ping.
func (*MockBrokerMockRecorder) Resume ¶ added in v0.0.14
func (mr *MockBrokerMockRecorder) Resume(ctx, queueName any) *gomock.Call
Resume indicates an expected call of Resume.
type Msg ¶
type Msg struct { Payload []byte `json:"payload"` Id string `json:"id"` Queue string `json:"queue"` // message created at timestamp in Unix milliseconds Created int64 `json:"created"` // expired timestamp in Unix milliseconds Expireat int64 `json:"expireat"` Err string `json:"err"` State string `json:"state"` // state last changed timestamp in Unix milliseconds Updated int64 `json:"updated"` }
func GenerateNewMsg ¶ added in v0.0.14
func GenerateNewMsg() *Msg
func (*Msg) GetPayload ¶
type MsgHistory ¶ added in v0.0.13
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
func (*Mux) GetPatterns ¶
func (*Mux) Handle ¶
Handle registers the handler for the given pattern. If there is duplicated handler then panics.
func (*Mux) ProcessMsg ¶
ProcessMsg dispatches msg to the handler whose pattern most closely matches the queue name.
func (*Mux) Use ¶
func (it *Mux) Use(mws ...MiddlewareFunc)
Use appends a MiddlewareFunc to the chain. Middlewares are executed one by one in the order.
type OptTypeClient ¶
type OptTypeClient int
const ( OptTypeQueueName OptTypeClient = iota OptTypeUniqueIn OptTypeIgnoreUnique )
type OptTypeServer ¶
type OptTypeServer int
const ( OptTypeServerWorkerNum OptTypeServer = iota OptTypeServerWorkerWorkIntervalFunc )
type OptionClient ¶
type OptionClient interface { // String returns a string representation of the option. String() string // Type returns the type of the option. Type() OptTypeClient // Value returns a value used to create this option. Value() interface{} }
func OptIgnoreUnique ¶ added in v0.0.14
func OptIgnoreUnique(s bool) OptionClient
func OptQueueName ¶
func OptQueueName(s string) OptionClient
func OptUniqueIn ¶
func OptUniqueIn(a time.Duration) OptionClient
type OptionServer ¶
type OptionServer interface { // String returns a string representation of the option. String() string // Type returns the type of the option. Type() OptTypeServer // Value returns a value used to create this option. Value() interface{} }
func OptQueueWorkerNum ¶
func OptQueueWorkerNum(n uint16) OptionServer
func OptWorkerWorkInterval ¶
func OptWorkerWorkInterval(f FuncWorkInterval) OptionServer
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor manages queue worker(s) for consuming messages.
func NewProcessor ¶
func NewProcessor(params ProcessorParams) *Processor
type ProcessorParams ¶
type QueueCfg ¶
type QueueCfg struct {
// contains filtered or unexported fields
}
func NewQueueCfg ¶
func NewQueueCfg(opts ...OptionServer) *QueueCfg
type QueueDailyStat ¶
type QueueStat ¶
type QueueStat struct { Name string `json:"name"` // queue name Total int64 `json:"total"` // all state of message store in Redis Pending int64 `json:"pending"` // wait to free worker consume it Processing int64 `json:"processing"` // worker already took and consuming Failed int64 `json:"failed"` // occured error, and/or pending to retry }
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(parmas SchedulerParams) *Scheduler
func (*Scheduler) Unregister ¶
type SchedulerParams ¶
type SimulatedClock ¶
type SimulatedClock struct {
// contains filtered or unexported fields
}
A SimulatedClock is a Clock implementation that doesn't "tick" on its own. Change time by explicit call the AdvanceTime() or SetTime() functions.
func NewSimulatedClock ¶
func NewSimulatedClock(t time.Time) *SimulatedClock
func (*SimulatedClock) AdvanceTime ¶
func (it *SimulatedClock) AdvanceTime(d time.Duration)
func (*SimulatedClock) Now ¶
func (it *SimulatedClock) Now() time.Time
func (*SimulatedClock) SetTime ¶
func (it *SimulatedClock) SetTime(t time.Time)