gmq

package
v0.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2024 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package gmq is a generated GoMock package.

Index

Constants

View Source
const (
	// any state except failed message life time
	TTLMsg = 60 * 60 * 24 * 7 // 7 days

	// failed state message life time
	TTLDeadMsg = 60 * 60 * 24 * 3 // 3 days
)
View Source
const (
	LuaReturnCodeSucc = iota // confirm to POSIX shell/C return code common rule, 0 means successfully
	LuaReturnCodeError
)
View Source
const (
	QueueNameList   = "queues"
	QueueNamePaused = "paused"
)
View Source
const (
	// message init state
	MsgStatePending = "pending"

	// message is processing by worker
	MsgStateProcessing = "processing"

	// message has been processed and failed
	MsgStateFailed = "failed"

	// message has been processed successfully
	MsgStateCompleted = "completed"
)

message state list

View Source
const (
	DefaultDurationRestIfNoMsg = time.Second * time.Duration(1)
)
View Source
const (
	DefaultMaxBytes = 1024 * 1024 * 32 // 32 MB
)
View Source
const (
	DefaultMaxItemsLimit = 10
)
View Source
const (
	DefaultQueueName = "default"
)
View Source
const (
	DefaultTTLMsgUniq = time.Second * time.Duration(60*60*24) // 1 day
)
View Source
const (
	Namespace = "gmq"
)

Variables

View Source
var (
	ErrNoMsg = errors.New("no msg")

	ErrInternal = errors.New("internal error")

	ErrMsgIdConflict = errors.New("msg id conflict")

	ErrWaitTimeOut = errors.New("operation times out")

	ErrNotImplemented = errors.New("method not implemented")
)

Functions

func MakeRedisUniversalClient

func MakeRedisUniversalClient(opts *redis.Options) (rs interface{})

func NewKeyDailyStatCompleted added in v0.0.13

func NewKeyDailyStatCompleted(ns, queueName, YYYYMMDD string) string

<namespace>:<queueName>:completed:<YYYY-MM-DD>

func NewKeyDailyStatFailed

func NewKeyDailyStatFailed(ns, queueName, YYYYMMDD string) string

<namespace>:<queueName>:failed:<YYYY-MM-DD>

func NewKeyMsgDetail

func NewKeyMsgDetail(ns, queueName, msgId string) string

func NewKeyMsgUnique

func NewKeyMsgUnique(ns, queueName, msgId string) string

func NewKeyQueueCompleted added in v0.0.13

func NewKeyQueueCompleted(ns, queueName string) string

func NewKeyQueueFailed

func NewKeyQueueFailed(ns, queueName string) string

func NewKeyQueueFailedHistory added in v0.0.13

func NewKeyQueueFailedHistory(ns, queueName, msgId string) string

<namespace>:<queueName>:his:<msgId>

func NewKeyQueueList

func NewKeyQueueList() string

func NewKeyQueuePattern added in v0.0.11

func NewKeyQueuePattern(ns, queueName string) string

func NewKeyQueuePaused

func NewKeyQueuePaused(ns, queueName string) string

func NewKeyQueuePending

func NewKeyQueuePending(ns, queueName string) string

func NewKeyQueueProcessing

func NewKeyQueueProcessing(ns, queueName string) string

func NewKeyQueueState added in v0.0.11

func NewKeyQueueState(ns, queueName string, state string) string

func NotFound

func NotFound(ctx context.Context, msg IMsg) error

func TestBroker_AutoDeduplicateCompletedMsg added in v0.0.14

func TestBroker_AutoDeduplicateCompletedMsg(t *testing.T, broker Broker)

func TestBroker_AutoDeduplicateFailedMsg added in v0.0.14

func TestBroker_AutoDeduplicateFailedMsg(t *testing.T, broker Broker)

func TestBroker_AutoDeduplicateMsgByDefault added in v0.0.14

func TestBroker_AutoDeduplicateMsgByDefault(t *testing.T, broker Broker)

func TestBroker_Complete added in v0.0.14

func TestBroker_Complete(t *testing.T, broker Broker)

func TestBroker_DeleteAgo added in v0.0.14

func TestBroker_DeleteAgo(t *testing.T, broker Broker)

func TestBroker_DeleteMsg added in v0.0.14

func TestBroker_DeleteMsg(t *testing.T, broker Broker)

func TestBroker_DeleteQueue added in v0.0.14

func TestBroker_DeleteQueue(t *testing.T, broker Broker)

func TestBroker_Dequeue added in v0.0.14

func TestBroker_Dequeue(t *testing.T, broker Broker)

func TestBroker_Enqueue added in v0.0.14

func TestBroker_Enqueue(t *testing.T, broker Broker)

func TestBroker_Fail added in v0.0.14

func TestBroker_Fail(t *testing.T, broker Broker)

func TestBroker_GetMsg added in v0.0.14

func TestBroker_GetMsg(t *testing.T, broker Broker)

func TestBroker_GetStats added in v0.0.14

func TestBroker_GetStats(t *testing.T, broker Broker)

func TestBroker_GetStatsByDate added in v0.0.14

func TestBroker_GetStatsByDate(t *testing.T, broker Broker)

func TestBroker_ListFailed added in v0.0.14

func TestBroker_ListFailed(t *testing.T, broker Broker)

func TestBroker_ListFailedMaxItems added in v0.0.14

func TestBroker_ListFailedMaxItems(t *testing.T, broker Broker)

func TestBroker_ListMsg added in v0.0.14

func TestBroker_ListMsg(t *testing.T, broker Broker)

func TestBroker_ListQueue added in v0.0.14

func TestBroker_ListQueue(t *testing.T, broker Broker)

Types

type Broker

type Broker interface {
	Close() error
	Complete(ctx context.Context, msg IMsg) error
	Dequeue(ctx context.Context, queueName string) (*Msg, error)
	DeleteMsg(ctx context.Context, queueName, id string) error
	DeleteQueue(ctx context.Context, queueName string) error
	DeleteAgo(ctx context.Context, queueName string, duration time.Duration) error

	Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)
	Fail(ctx context.Context, msg IMsg, errFail error) error
	GetMsg(ctx context.Context, queueName, id string) (*Msg, error)

	// return a list message id(not internal msgId) of queue with specified name and limit
	ListMsg(ctx context.Context, queueName, state string, limit, offset int64) ([]string, error)

	// return a list failed message id of queue with specified queue name and message id
	// NOTICE: It is ordered from fresh to old.
	ListFailed(ctx context.Context, queueName, msgId string, limit, offset int64) ([]*Msg, error)

	ListQueue(ctx context.Context) ([]string, error)

	GetStats(ctx context.Context) ([]*QueueStat, error)
	Init(ctx context.Context, queueName string) error

	Ping(ctx context.Context) error
	GetStatsByDate(ctx context.Context, listQueueNames []string, YYYYMMDD string) (*QueueDailyStat, error)
	GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)
	Pause(ctx context.Context, queueName string) error
	Resume(ctx context.Context, queueName string) error

	// SetClock custom internal clock for testing
	SetClock(c Clock)

	// processing time in UTC instead of local
	UTC(flag bool)
}

func NewBrokerFromRedisClient

func NewBrokerFromRedisClient(cli redis.UniversalClient) (rs Broker, err error)

func NewBrokerInMemory added in v0.0.13

func NewBrokerInMemory(opts *BrokerInMemoryOpts) (rs Broker, err error)

func NewBrokerRedis

func NewBrokerRedis(dsn string) (rs Broker, err error)

type BrokerInMemory added in v0.0.13

type BrokerInMemory struct {
	BrokerUnimplemented
	// contains filtered or unexported fields
}

func (*BrokerInMemory) Close added in v0.0.13

func (it *BrokerInMemory) Close() error

Close implements Broker

func (*BrokerInMemory) Complete added in v0.0.13

func (it *BrokerInMemory) Complete(ctx context.Context, msg IMsg) error

Complete implements Broker

func (*BrokerInMemory) DeleteAgo added in v0.0.13

func (it *BrokerInMemory) DeleteAgo(ctx context.Context, queueName string, duration time.Duration) error

DeleteAgo implements Broker

func (*BrokerInMemory) DeleteMsg added in v0.0.13

func (it *BrokerInMemory) DeleteMsg(ctx context.Context, queueName string, msgId string) error

DeleteMsg implements Broker

func (*BrokerInMemory) DeleteQueue added in v0.0.13

func (it *BrokerInMemory) DeleteQueue(ctx context.Context, queueName string) error

DeleteQueue implements Broker

func (*BrokerInMemory) Dequeue added in v0.0.13

func (it *BrokerInMemory) Dequeue(ctx context.Context, queueName string) (*Msg, error)

Dequeue implements Broker

func (*BrokerInMemory) Enqueue added in v0.0.13

func (it *BrokerInMemory) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)

Enqueue implements Broker

func (*BrokerInMemory) Fail added in v0.0.13

func (it *BrokerInMemory) Fail(ctx context.Context, msg IMsg, errFail error) error

Fail implements Broker

func (*BrokerInMemory) GetMsg added in v0.0.13

func (it *BrokerInMemory) GetMsg(ctx context.Context, queueName string, msgId string) (*Msg, error)

GetMsg implements Broker

func (*BrokerInMemory) GetStats added in v0.0.13

func (it *BrokerInMemory) GetStats(ctx context.Context) ([]*QueueStat, error)

GetStats implements Broker

func (*BrokerInMemory) GetStatsByDate added in v0.0.13

func (it *BrokerInMemory) GetStatsByDate(ctx context.Context, listQueueNames []string, YYYYMMDD string) (*QueueDailyStat, error)

GetStatsByDate implements Broker

func (*BrokerInMemory) GetStatsWeekly added in v0.0.13

func (it *BrokerInMemory) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)

GetStatsWeekly implements Broker

func (*BrokerInMemory) Init added in v0.0.13

func (it *BrokerInMemory) Init(ctx context.Context, queueName string) error

Init implements Broker

func (*BrokerInMemory) ListFailed added in v0.0.13

func (it *BrokerInMemory) ListFailed(ctx context.Context, queueName string, msgId string, limit int64, offset int64) (rs []*Msg, err error)

ListFailed implements Broker

func (*BrokerInMemory) ListMsg added in v0.0.13

func (it *BrokerInMemory) ListMsg(ctx context.Context, queueName string, state string, limit int64, offset int64) ([]string, error)

ListMsg implements Broker

func (*BrokerInMemory) ListQueue added in v0.0.13

func (it *BrokerInMemory) ListQueue(ctx context.Context) (rs []string, err error)

ListQueue implements Broker

func (*BrokerInMemory) Pause added in v0.0.13

func (it *BrokerInMemory) Pause(ctx context.Context, queueName string) error

Pause implements Broker

func (*BrokerInMemory) Ping added in v0.0.13

func (it *BrokerInMemory) Ping(ctx context.Context) error

Ping implements Broker

func (*BrokerInMemory) Resume added in v0.0.13

func (it *BrokerInMemory) Resume(ctx context.Context, queueName string) error

Resume implements Broker

func (*BrokerInMemory) SetClock added in v0.0.13

func (it *BrokerInMemory) SetClock(c Clock)

SetClock implements Broker

func (*BrokerInMemory) UTC added in v0.0.13

func (it *BrokerInMemory) UTC(flag bool)

UTC implements Broker

type BrokerInMemoryOpts added in v0.0.13

type BrokerInMemoryOpts struct {
	MaxBytes int64
}

type BrokerRedis

type BrokerRedis struct {
	BrokerUnimplemented
	// contains filtered or unexported fields
}

func (*BrokerRedis) Close

func (it *BrokerRedis) Close() error

func (*BrokerRedis) Complete

func (it *BrokerRedis) Complete(ctx context.Context, msg IMsg) (err error)

func (*BrokerRedis) DeleteAgo

func (it *BrokerRedis) DeleteAgo(ctx context.Context, queueName string, duration time.Duration) (err error)

delete messages old than <nowUnixTimestamp - duration>

func (*BrokerRedis) DeleteMsg added in v0.0.11

func (it *BrokerRedis) DeleteMsg(ctx context.Context, queueName, msgId string) (err error)

func (*BrokerRedis) DeleteQueue added in v0.0.11

func (it *BrokerRedis) DeleteQueue(ctx context.Context, queueName string) (err error)

func (*BrokerRedis) Dequeue

func (it *BrokerRedis) Dequeue(ctx context.Context, queueName string) (msg *Msg, err error)

func (*BrokerRedis) Enqueue

func (it *BrokerRedis) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (rs *Msg, err error)

func (*BrokerRedis) Fail

func (it *BrokerRedis) Fail(ctx context.Context, msg IMsg, errFail error) (err error)

func (*BrokerRedis) GetMsg added in v0.0.11

func (it *BrokerRedis) GetMsg(ctx context.Context, queueName, msgId string) (msg *Msg, err error)

func (*BrokerRedis) GetStats

func (it *BrokerRedis) GetStats(ctx context.Context) (rs []*QueueStat, err error)

func (*BrokerRedis) GetStatsByDate

func (it *BrokerRedis) GetStatsByDate(ctx context.Context, listQueueNames []string, date string) (rs *QueueDailyStat, err error)

func (*BrokerRedis) GetStatsWeekly added in v0.0.10

func (it *BrokerRedis) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)

func (*BrokerRedis) Init

func (it *BrokerRedis) Init(ctx context.Context, queueName string) (err error)

func (*BrokerRedis) ListFailed added in v0.0.13

func (it *BrokerRedis) ListFailed(ctx context.Context, queueName string, msgId string, limit int64, offset int64) (rs []*Msg, err error)

func (*BrokerRedis) ListMsg added in v0.0.11

func (it *BrokerRedis) ListMsg(ctx context.Context, queueName, state string, limit, offset int64) (values []string, err error)

func (*BrokerRedis) ListQueue added in v0.0.13

func (it *BrokerRedis) ListQueue(ctx context.Context) (rs []string, err error)

ListQueue implements Broker

func (*BrokerRedis) Pause added in v0.0.10

func (it *BrokerRedis) Pause(ctx context.Context, qname string) error

func (*BrokerRedis) Ping

func (it *BrokerRedis) Ping(ctx context.Context) error

func (*BrokerRedis) Resume added in v0.0.10

func (it *BrokerRedis) Resume(ctx context.Context, qname string) error

func (*BrokerRedis) SetClock

func (it *BrokerRedis) SetClock(c Clock)

func (*BrokerRedis) UTC added in v0.0.13

func (it *BrokerRedis) UTC(flag bool)

UTC implements Broker

type BrokerUnimplemented added in v0.0.13

type BrokerUnimplemented struct{}

BrokerUnimplemented must be embedded to have forward compatible implementations.

func (*BrokerUnimplemented) Close added in v0.0.13

func (*BrokerUnimplemented) Close() error

Close implements Broker

func (*BrokerUnimplemented) Complete added in v0.0.13

func (*BrokerUnimplemented) Complete(ctx context.Context, msg IMsg) error

Complete implements Broker

func (*BrokerUnimplemented) DeleteAgo added in v0.0.13

func (*BrokerUnimplemented) DeleteAgo(ctx context.Context, queueName string, seconds int64) error

DeleteAgo implements Broker

func (*BrokerUnimplemented) DeleteMsg added in v0.0.13

func (*BrokerUnimplemented) DeleteMsg(ctx context.Context, queueName string, msgId string) error

DeleteMsg implements Broker

func (*BrokerUnimplemented) DeleteQueue added in v0.0.13

func (*BrokerUnimplemented) DeleteQueue(ctx context.Context, queueName string) error

DeleteQueue implements Broker

func (*BrokerUnimplemented) Dequeue added in v0.0.13

func (*BrokerUnimplemented) Dequeue(ctx context.Context, queueName string) (*Msg, error)

Dequeue implements Broker

func (*BrokerUnimplemented) Enqueue added in v0.0.13

func (*BrokerUnimplemented) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)

Enqueue implements Broker

func (*BrokerUnimplemented) Fail added in v0.0.13

func (*BrokerUnimplemented) Fail(ctx context.Context, msg IMsg, errFail error) error

Fail implements Broker

func (*BrokerUnimplemented) GetMsg added in v0.0.13

func (*BrokerUnimplemented) GetMsg(ctx context.Context, queueName string, msgId string) (*Msg, error)

GetMsg implements Broker

func (*BrokerUnimplemented) GetStats added in v0.0.13

func (*BrokerUnimplemented) GetStats(ctx context.Context) ([]*QueueStat, error)

GetStats implements Broker

func (*BrokerUnimplemented) GetStatsByDate added in v0.0.13

func (*BrokerUnimplemented) GetStatsByDate(ctx context.Context, YYYYMMDD string) (*QueueDailyStat, error)

GetStatsByDate implements Broker

func (*BrokerUnimplemented) GetStatsWeekly added in v0.0.13

GetStatsWeekly implements Broker

func (*BrokerUnimplemented) Init added in v0.0.13

func (*BrokerUnimplemented) Init(ctx context.Context, queueName string) error

Init implements Broker

func (*BrokerUnimplemented) ListMsg added in v0.0.13

func (*BrokerUnimplemented) ListMsg(ctx context.Context, queueName string, state string, limit int64, offset int64) ([]string, error)

ListMsg implements Broker

func (*BrokerUnimplemented) Pause added in v0.0.13

func (*BrokerUnimplemented) Pause(ctx context.Context, queueName string) error

Pause implements Broker

func (*BrokerUnimplemented) Ping added in v0.0.13

Ping implements Broker

func (*BrokerUnimplemented) Resume added in v0.0.13

func (*BrokerUnimplemented) Resume(ctx context.Context, queueName string) error

Resume implements Broker

func (*BrokerUnimplemented) SetClock added in v0.0.13

func (*BrokerUnimplemented) SetClock(c Clock)

SetClock implements Broker

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

Cleaner auto-delete dead or failed messages, completed messages at intervals.

func NewCleaner

func NewCleaner(params CleanerParams) *Cleaner

type CleanerParams

type CleanerParams struct {
	Broker     Broker
	Ctx        context.Context
	DeleteAgo  time.Duration
	Logger     Logger
	MsgMaxTTL  time.Duration
	QueueNames map[string]struct{}
}

type Client

type Client struct {
	Broker Broker
}

func NewClientFromBroker

func NewClientFromBroker(broker Broker) (rs *Client, err error)

func NewClientRedis added in v0.0.13

func NewClientRedis(dsn string) (rs *Client, err error)

func (*Client) Close

func (it *Client) Close() error

Close closes the redis connection.

func (*Client) Enqueue

func (it *Client) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (rs *Msg, err error)

Enqueue enqueues a message into a queue.

type Clock

type Clock interface {
	Now() time.Time
}

func NewWallClock

func NewWallClock() Clock

type Config

type Config struct {
	Logger Logger

	MsgMaxTTL   time.Duration
	RestIfNoMsg time.Duration

	QueueCfgs map[string]*QueueCfg
}

type CronJob added in v0.0.9

type CronJob struct {
	// contains filtered or unexported fields
}

func (*CronJob) Run added in v0.0.9

func (it *CronJob) Run()

type FuncEnqueueErrorHandler added in v0.0.9

type FuncEnqueueErrorHandler func(msg IMsg, opts []OptionClient, err error)

type FuncWorkInterval

type FuncWorkInterval func() time.Duration

type Handler

type Handler interface {
	ProcessMsg(ctx context.Context, msg IMsg) error
}

func LoggingElapsed

func LoggingElapsed(h Handler) Handler

func NotFoundHandler

func NotFoundHandler() Handler

type HandlerFunc

type HandlerFunc func(ctx context.Context, msg IMsg) error

func (HandlerFunc) ProcessMsg

func (fn HandlerFunc) ProcessMsg(ctx context.Context, msg IMsg) error

ProcessMsg calls fn(ctx, task)

type IMsg

type IMsg interface {
	GetId() string

	GetPayload() []byte
	GetQueue() string
	String() string
}

type Logger

type Logger interface {
	// Debug logs a message at Debug level.
	Debug(args ...interface{})

	// Debugf logs a message at Debug level.
	Debugf(template string, args ...interface{})

	// Info logs a message at Info level.
	Info(args ...interface{})

	// Infof logs a message at Info level.
	Infof(template string, args ...interface{})

	// Warn logs a message at Warning level.
	Warn(args ...interface{})

	// Warnf logs a message at Warning level.
	Warnf(template string, args ...interface{})

	// Error logs a message at Error level.
	Error(args ...interface{})

	// Errorf logs a message at Error level.
	Errorf(template string, args ...interface{})

	// Fatal logs a message at Fatal level
	// and process will exit with status set to 1.
	Fatal(args ...interface{})

	// Fatalf logs a message at Fatal level.
	Fatalf(template string, args ...interface{})
}

Logger supports logging at various log levels.

type MiddlewareFunc

type MiddlewareFunc func(Handler) Handler

type MockBroker added in v0.0.14

type MockBroker struct {
	// contains filtered or unexported fields
}

MockBroker is a mock of Broker interface.

func NewMockBroker added in v0.0.14

func NewMockBroker(ctrl *gomock.Controller) *MockBroker

NewMockBroker creates a new mock instance.

func (*MockBroker) Close added in v0.0.14

func (m *MockBroker) Close() error

Close mocks base method.

func (*MockBroker) Complete added in v0.0.14

func (m *MockBroker) Complete(ctx context.Context, msg IMsg) error

Complete mocks base method.

func (*MockBroker) DeleteAgo added in v0.0.14

func (m *MockBroker) DeleteAgo(ctx context.Context, queueName string, duration time.Duration) error

DeleteAgo mocks base method.

func (*MockBroker) DeleteMsg added in v0.0.14

func (m *MockBroker) DeleteMsg(ctx context.Context, queueName, id string) error

DeleteMsg mocks base method.

func (*MockBroker) DeleteQueue added in v0.0.14

func (m *MockBroker) DeleteQueue(ctx context.Context, queueName string) error

DeleteQueue mocks base method.

func (*MockBroker) Dequeue added in v0.0.14

func (m *MockBroker) Dequeue(ctx context.Context, queueName string) (*Msg, error)

Dequeue mocks base method.

func (*MockBroker) EXPECT added in v0.0.14

func (m *MockBroker) EXPECT() *MockBrokerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockBroker) Enqueue added in v0.0.14

func (m *MockBroker) Enqueue(ctx context.Context, msg IMsg, opts ...OptionClient) (*Msg, error)

Enqueue mocks base method.

func (*MockBroker) Fail added in v0.0.14

func (m *MockBroker) Fail(ctx context.Context, msg IMsg, errFail error) error

Fail mocks base method.

func (*MockBroker) GetMsg added in v0.0.14

func (m *MockBroker) GetMsg(ctx context.Context, queueName, id string) (*Msg, error)

GetMsg mocks base method.

func (*MockBroker) GetStats added in v0.0.14

func (m *MockBroker) GetStats(ctx context.Context) ([]*QueueStat, error)

GetStats mocks base method.

func (*MockBroker) GetStatsByDate added in v0.0.14

func (m *MockBroker) GetStatsByDate(ctx context.Context, listQueueNames []string, YYYYMMDD string) (*QueueDailyStat, error)

GetStatsByDate mocks base method.

func (*MockBroker) GetStatsWeekly added in v0.0.14

func (m *MockBroker) GetStatsWeekly(ctx context.Context, listQueueNames []string) ([]*QueueDailyStat, error)

GetStatsWeekly mocks base method.

func (*MockBroker) Init added in v0.0.14

func (m *MockBroker) Init(ctx context.Context, queueName string) error

Init mocks base method.

func (*MockBroker) ListFailed added in v0.0.14

func (m *MockBroker) ListFailed(ctx context.Context, queueName, msgId string, limit, offset int64) ([]*Msg, error)

ListFailed mocks base method.

func (*MockBroker) ListMsg added in v0.0.14

func (m *MockBroker) ListMsg(ctx context.Context, queueName, state string, limit, offset int64) ([]string, error)

ListMsg mocks base method.

func (*MockBroker) ListQueue added in v0.0.14

func (m *MockBroker) ListQueue(ctx context.Context) ([]string, error)

ListQueue mocks base method.

func (*MockBroker) Pause added in v0.0.14

func (m *MockBroker) Pause(ctx context.Context, queueName string) error

Pause mocks base method.

func (*MockBroker) Ping added in v0.0.14

func (m *MockBroker) Ping(ctx context.Context) error

Ping mocks base method.

func (*MockBroker) Resume added in v0.0.14

func (m *MockBroker) Resume(ctx context.Context, queueName string) error

Resume mocks base method.

func (*MockBroker) SetClock added in v0.0.14

func (m *MockBroker) SetClock(c Clock)

SetClock mocks base method.

func (*MockBroker) UTC added in v0.0.14

func (m *MockBroker) UTC(flag bool)

UTC mocks base method.

type MockBrokerMockRecorder added in v0.0.14

type MockBrokerMockRecorder struct {
	// contains filtered or unexported fields
}

MockBrokerMockRecorder is the mock recorder for MockBroker.

func (*MockBrokerMockRecorder) Close added in v0.0.14

func (mr *MockBrokerMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close.

func (*MockBrokerMockRecorder) Complete added in v0.0.14

func (mr *MockBrokerMockRecorder) Complete(ctx, msg any) *gomock.Call

Complete indicates an expected call of Complete.

func (*MockBrokerMockRecorder) DeleteAgo added in v0.0.14

func (mr *MockBrokerMockRecorder) DeleteAgo(ctx, queueName, duration any) *gomock.Call

DeleteAgo indicates an expected call of DeleteAgo.

func (*MockBrokerMockRecorder) DeleteMsg added in v0.0.14

func (mr *MockBrokerMockRecorder) DeleteMsg(ctx, queueName, id any) *gomock.Call

DeleteMsg indicates an expected call of DeleteMsg.

func (*MockBrokerMockRecorder) DeleteQueue added in v0.0.14

func (mr *MockBrokerMockRecorder) DeleteQueue(ctx, queueName any) *gomock.Call

DeleteQueue indicates an expected call of DeleteQueue.

func (*MockBrokerMockRecorder) Dequeue added in v0.0.14

func (mr *MockBrokerMockRecorder) Dequeue(ctx, queueName any) *gomock.Call

Dequeue indicates an expected call of Dequeue.

func (*MockBrokerMockRecorder) Enqueue added in v0.0.14

func (mr *MockBrokerMockRecorder) Enqueue(ctx, msg any, opts ...any) *gomock.Call

Enqueue indicates an expected call of Enqueue.

func (*MockBrokerMockRecorder) Fail added in v0.0.14

func (mr *MockBrokerMockRecorder) Fail(ctx, msg, errFail any) *gomock.Call

Fail indicates an expected call of Fail.

func (*MockBrokerMockRecorder) GetMsg added in v0.0.14

func (mr *MockBrokerMockRecorder) GetMsg(ctx, queueName, id any) *gomock.Call

GetMsg indicates an expected call of GetMsg.

func (*MockBrokerMockRecorder) GetStats added in v0.0.14

func (mr *MockBrokerMockRecorder) GetStats(ctx any) *gomock.Call

GetStats indicates an expected call of GetStats.

func (*MockBrokerMockRecorder) GetStatsByDate added in v0.0.14

func (mr *MockBrokerMockRecorder) GetStatsByDate(ctx, listQueueNames, YYYYMMDD any) *gomock.Call

GetStatsByDate indicates an expected call of GetStatsByDate.

func (*MockBrokerMockRecorder) GetStatsWeekly added in v0.0.14

func (mr *MockBrokerMockRecorder) GetStatsWeekly(ctx, listQueueNames any) *gomock.Call

GetStatsWeekly indicates an expected call of GetStatsWeekly.

func (*MockBrokerMockRecorder) Init added in v0.0.14

func (mr *MockBrokerMockRecorder) Init(ctx, queueName any) *gomock.Call

Init indicates an expected call of Init.

func (*MockBrokerMockRecorder) ListFailed added in v0.0.14

func (mr *MockBrokerMockRecorder) ListFailed(ctx, queueName, msgId, limit, offset any) *gomock.Call

ListFailed indicates an expected call of ListFailed.

func (*MockBrokerMockRecorder) ListMsg added in v0.0.14

func (mr *MockBrokerMockRecorder) ListMsg(ctx, queueName, state, limit, offset any) *gomock.Call

ListMsg indicates an expected call of ListMsg.

func (*MockBrokerMockRecorder) ListQueue added in v0.0.14

func (mr *MockBrokerMockRecorder) ListQueue(ctx any) *gomock.Call

ListQueue indicates an expected call of ListQueue.

func (*MockBrokerMockRecorder) Pause added in v0.0.14

func (mr *MockBrokerMockRecorder) Pause(ctx, queueName any) *gomock.Call

Pause indicates an expected call of Pause.

func (*MockBrokerMockRecorder) Ping added in v0.0.14

func (mr *MockBrokerMockRecorder) Ping(ctx any) *gomock.Call

Ping indicates an expected call of Ping.

func (*MockBrokerMockRecorder) Resume added in v0.0.14

func (mr *MockBrokerMockRecorder) Resume(ctx, queueName any) *gomock.Call

Resume indicates an expected call of Resume.

func (*MockBrokerMockRecorder) SetClock added in v0.0.14

func (mr *MockBrokerMockRecorder) SetClock(c any) *gomock.Call

SetClock indicates an expected call of SetClock.

func (*MockBrokerMockRecorder) UTC added in v0.0.14

func (mr *MockBrokerMockRecorder) UTC(flag any) *gomock.Call

UTC indicates an expected call of UTC.

type Msg

type Msg struct {
	Payload []byte `json:"payload"`
	Id      string `json:"id"`
	Queue   string `json:"queue"`

	// message created at timestamp in Unix milliseconds
	Created int64 `json:"created"`

	// expired timestamp in Unix milliseconds
	Expireat int64 `json:"expireat"`

	Err   string `json:"err"`
	State string `json:"state"`

	// state last changed timestamp in Unix milliseconds
	Updated int64 `json:"updated"`
}

func GenerateNewMsg added in v0.0.14

func GenerateNewMsg() *Msg

func (*Msg) GetId

func (it *Msg) GetId() string

func (*Msg) GetPayload

func (it *Msg) GetPayload() []byte

func (*Msg) GetQueue

func (it *Msg) GetQueue() string

func (*Msg) String

func (it *Msg) String() string

type MsgHistory added in v0.0.13

type MsgHistory map[string]*list.List // key is <msgId>, value is <msg>

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

func NewMux

func NewMux() *Mux

func (*Mux) GetPatterns

func (it *Mux) GetPatterns() map[string]MuxEntry

func (*Mux) Handle

func (it *Mux) Handle(pattern string, handler Handler)

Handle registers the handler for the given pattern. If there is duplicated handler then panics.

func (*Mux) Handler

func (it *Mux) Handler(msg IMsg) (h Handler, pattern string)

func (*Mux) ProcessMsg

func (it *Mux) ProcessMsg(ctx context.Context, msg IMsg) error

ProcessMsg dispatches msg to the handler whose pattern most closely matches the queue name.

func (*Mux) Use

func (it *Mux) Use(mws ...MiddlewareFunc)

Use appends a MiddlewareFunc to the chain. Middlewares are executed one by one in the order.

type MuxEntry

type MuxEntry struct {
	// contains filtered or unexported fields
}

type OptTypeClient

type OptTypeClient int
const (
	OptTypeQueueName OptTypeClient = iota
	OptTypeUniqueIn
	OptTypeIgnoreUnique
)

type OptTypeServer

type OptTypeServer int
const (
	OptTypeServerWorkerNum OptTypeServer = iota
	OptTypeServerWorkerWorkIntervalFunc
)

type OptionClient

type OptionClient interface {
	// String returns a string representation of the option.
	String() string

	// Type returns the type of the option.
	Type() OptTypeClient

	// Value returns a value used to create this option.
	Value() interface{}
}

func OptIgnoreUnique added in v0.0.14

func OptIgnoreUnique(s bool) OptionClient

func OptQueueName

func OptQueueName(s string) OptionClient

func OptUniqueIn

func OptUniqueIn(a time.Duration) OptionClient

type OptionServer

type OptionServer interface {
	// String returns a string representation of the option.
	String() string

	// Type returns the type of the option.
	Type() OptTypeServer

	// Value returns a value used to create this option.
	Value() interface{}
}

func OptQueueWorkerNum

func OptQueueWorkerNum(n uint16) OptionServer

func OptWorkerWorkInterval

func OptWorkerWorkInterval(f FuncWorkInterval) OptionServer

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor manages queue worker(s) for consuming messages.

func NewProcessor

func NewProcessor(params ProcessorParams) *Processor

type ProcessorParams

type ProcessorParams struct {
	Ctx         context.Context
	Broker      Broker
	RestIfNoMsg time.Duration
	Handler     Handler
	Logger      Logger
	QueueName   string

	WorkerNum              uint16
	WorkerWorkIntervalFunc FuncWorkInterval
}

type QueueCfg

type QueueCfg struct {
	// contains filtered or unexported fields
}

func NewQueueCfg

func NewQueueCfg(opts ...OptionServer) *QueueCfg

type QueueDailyStat

type QueueDailyStat struct {
	Date      string `json:"date" gorm:"primarykey"` // YYYY-MM-DD in UTC
	Completed int64  `json:"completed"`
	Failed    int64  `json:"failed"`
	Total     int64  `json:"total"` // it is equal to Completed + Failed
}

type QueueStat

type QueueStat struct {
	Name       string `json:"name"`       // queue name
	Total      int64  `json:"total"`      // all state of message store in Redis
	Pending    int64  `json:"pending"`    // wait to free worker consume it
	Processing int64  `json:"processing"` // worker already took and consuming
	Failed     int64  `json:"failed"`     // occured error, and/or pending to retry
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(parmas SchedulerParams) *Scheduler

func (*Scheduler) Register

func (it *Scheduler) Register(cronSpec string, msg IMsg, opts ...OptionClient) (jobId string, err error)

func (*Scheduler) Run

func (it *Scheduler) Run() (err error)

func (*Scheduler) Shutdown added in v0.0.9

func (it *Scheduler) Shutdown()

func (*Scheduler) Unregister

func (it *Scheduler) Unregister(jobId string) (err error)

type SchedulerParams

type SchedulerParams struct {
	Ctx      context.Context
	Broker   Broker
	Logger   Logger
	Location *time.Location

	EnqueueErrorHandler FuncEnqueueErrorHandler
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(ctx context.Context, b Broker, cfg *Config) *Server

func (*Server) Cfg added in v0.0.13

func (it *Server) Cfg() *Config

func (*Server) Pause added in v0.0.10

func (it *Server) Pause(qname string) error

func (*Server) Resume added in v0.0.10

func (it *Server) Resume(qname string) error

func (*Server) Run

func (it *Server) Run(mux *Mux) (err error)

func (*Server) Shutdown

func (it *Server) Shutdown()

type SimulatedClock

type SimulatedClock struct {
	// contains filtered or unexported fields
}

A SimulatedClock is a Clock implementation that doesn't "tick" on its own. Change time by explicit call the AdvanceTime() or SetTime() functions.

func NewSimulatedClock

func NewSimulatedClock(t time.Time) *SimulatedClock

func (*SimulatedClock) AdvanceTime

func (it *SimulatedClock) AdvanceTime(d time.Duration)

func (*SimulatedClock) Now

func (it *SimulatedClock) Now() time.Time

func (*SimulatedClock) SetTime

func (it *SimulatedClock) SetTime(t time.Time)

type WallClock

type WallClock struct{}

func (*WallClock) Now

func (it *WallClock) Now() time.Time

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL