pqueue

package
v0.0.0-...-a3e95bc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2017 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Package pqueue is a generated protocol buffer package.

It is generated from these files:

pqueue/pqmsg.proto

It has these top-level messages:

PQueueMsgData

Index

Constants

View Source
const (
	PQ_CMD_DELETE_LOCKED_BY_ID = "DELLCK"
	PQ_CMD_DELETE_BY_ID        = "DEL"
	PQ_CMD_DELETE_BY_RCPT      = "RDEL"
	PQ_CMD_UNLOCK_BY_RCPT      = "RUNLCK"
	PQ_CMD_UNLOCK_BY_ID        = "UNLCK"
	PQ_CMD_UPD_LOCK_BY_ID      = "UPDLCK"
	PQ_CMD_UPD_LOCK_BY_RCPT    = "RUPDLCK"
	PQ_CMD_PUSH                = "PUSH"
	PQ_CMD_POP                 = "POP"
	PQ_CMD_POPLOCK             = "POPLCK"
	PQ_CMD_MSG_INFO            = "MSGINFO"
	PQ_CMD_STATUS              = "STATUS"
	PQ_CMD_CHECK_TIMEOUTS      = "CHKTS"
	PQ_CMD_SET_CFG             = "SETCFG"
	PQ_CMD_PURGE               = "PURGE"
)
View Source
const (
	PRM_ID           = "ID"
	PRM_RECEIPT      = "RCPT"
	PRM_POP_WAIT     = "WAIT"
	PRM_LOCK_TIMEOUT = "TIMEOUT"
	PRM_PRIORITY     = "PRIORITY"
	PRM_LIMIT        = "LIMIT"
	PRM_PAYLOAD      = "PL"
	PRM_DELAY        = "DELAY"
	PRM_TIMESTAMP    = "TS"
	PRM_ASYNC        = "ASYNC"
	PRM_SYNC_WAIT    = "SYNCWAIT"
	PRM_MSG_TTL      = "TTL"
)
View Source
const (
	CPRM_MSG_TTL           = "MSGTTL"
	CPRM_MAX_MSG_SIZE      = "MSGSIZE"
	CPRM_MAX_MSGS_IN_QUEUE = "MAXMSGS"
	CPRM_DELIVERY_DELAY    = "DELAY"
	CPRM_POP_LIMIT         = "POPLIMIT"
	CPRM_LOCK_TIMEOUT      = "TIMEOUT"
	CPRM_FAIL_QUEUE        = "FAILQ"
	CPRM_POP_WAIT          = "WAIT"
)
View Source
const (
	PQ_STATUS_MAX_QUEUE_SIZE   = "MaxMsgsInQueue"
	PQ_STATUS_POP_WAIT_TIMEOUT = "PopWaitTimeout"
	PQ_STATUS_MSG_TTL          = "MsgTtl"
	PQ_STATUS_DELIVERY_DELAY   = "DeliveryDelay"
	PQ_STATUS_POP_LOCK_TIMEOUT = "PopLockTimeout"
	PQ_STATUS_POP_COUNT_LIMIT  = "PopCountLimit"
	PQ_STATUS_CREATE_TS        = "CreateTs"
	PQ_STATUS_LAST_PUSH_TS     = "LastPushTs"
	PQ_STATUS_LAST_POP_TS      = "LastPopTs"
	PQ_STATUS_TOTAL_MSGS       = "TotalMessages"
	PQ_STATUS_IN_FLIGHT_MSG    = "InFlightMessages"
	PQ_STATUS_AVAILABLE_MSGS   = "AvailableMessages"
	PQ_STATUS_DELAYED          = "DelayedMessages"
	PQ_STATUS_FAIL_QUEUE       = "FailQueue"
	PQ_STATUS_MAX_MSG_SIZE     = "MaxMsgSize"
)
View Source
const (
	MSG_INFO_ID        = "Id"
	MSG_INFO_LOCKED    = "Locked"
	MSG_INFO_UNLOCK_TS = "UnlockTs"
	MSG_INFO_POP_COUNT = "PopCount"
	MSG_INFO_PRIORITY  = "Priority"
	MSG_INFO_EXPIRE_TS = "ExpireTs"
)
View Source
const PAYLOAD_LIMIT = 512 * 1024

Variables

View Source
var (
	ErrInvalidLengthPqmsg = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowPqmsg   = fmt.Errorf("proto: integer overflow")
)

Functions

func DefaultPQConfig

func DefaultPQConfig() *conf.PQConfig

func LoadPQueue

func LoadPQueue(svcs apis.IServices, desc *queue_info.ServiceDescription) (apis.ISvc, error)

func ParsePQConfig

func ParsePQConfig(params []string) (*conf.PQConfig, apis.IResponse)

Types

type MsgHeap

type MsgHeap struct {
	// contains filtered or unexported fields
}

func NewMsgHeap

func NewMsgHeap() *MsgHeap

func NewSnHeap

func NewSnHeap() *MsgHeap

func NewTsHeap

func NewTsHeap() *MsgHeap

func (*MsgHeap) ContainsSn

func (s *MsgHeap) ContainsSn(sn uint64) bool

func (*MsgHeap) Empty

func (s *MsgHeap) Empty() bool

func (*MsgHeap) GetMsg

func (s *MsgHeap) GetMsg(sn uint64) *PQMsgMetaData

func (*MsgHeap) Init

func (s *MsgHeap) Init()

func (*MsgHeap) Len

func (s *MsgHeap) Len() int

func (*MsgHeap) MinMsg

func (s *MsgHeap) MinMsg() *PQMsgMetaData

func (*MsgHeap) NotEmpty

func (s *MsgHeap) NotEmpty() bool

func (*MsgHeap) Pop

func (s *MsgHeap) Pop() *PQMsgMetaData

func (*MsgHeap) Push

func (s *MsgHeap) Push(msg *PQMsgMetaData)

func (*MsgHeap) Remove

func (s *MsgHeap) Remove(sn uint64) *PQMsgMetaData

type MsgResponseItem

type MsgResponseItem struct {
	// contains filtered or unexported fields
}

func NewMsgResponseItem

func NewMsgResponseItem(msg *PQMsgMetaData, payload []byte) *MsgResponseItem

func (*MsgResponseItem) GetMeta

func (p *MsgResponseItem) GetMeta() *PQMsgMetaData

func (*MsgResponseItem) ID

func (p *MsgResponseItem) ID() string

func (*MsgResponseItem) Payload

func (p *MsgResponseItem) Payload() []byte

func (*MsgResponseItem) Receipt

func (p *MsgResponseItem) Receipt() string

func (*MsgResponseItem) WriteResponse

func (p *MsgResponseItem) WriteResponse(buf *bufio.Writer) error

type PQContext

type PQContext struct {
	// contains filtered or unexported fields
}

func NewPQContext

func NewPQContext(pq *PQueue, r apis.ResponseWriter) *PQContext

func (*PQContext) Call

func (ctx *PQContext) Call(cmd string, params []string) apis.IResponse

Call dispatches to the command handler to process necessary parameters.

func (*PQContext) CheckTimeouts

func (ctx *PQContext) CheckTimeouts(params []string) apis.IResponse

func (*PQContext) DeleteById

func (ctx *PQContext) DeleteById(params []string) apis.IResponse

func (*PQContext) DeleteByReceipt

func (ctx *PQContext) DeleteByReceipt(params []string) apis.IResponse

DeleteByReceipt deletes locked message using provided receipt. This is a preferable method to unlock messages. It helps to avoid race condition in case if when message lock has timed out and was picked up by other consumer.

func (*PQContext) DeleteLockedById

func (ctx *PQContext) DeleteLockedById(params []string) apis.IResponse

func (*PQContext) Finish

func (ctx *PQContext) Finish()

func (*PQContext) GetCurrentStatus

func (ctx *PQContext) GetCurrentStatus(params []string) apis.IResponse

func (*PQContext) GetMessageInfo

func (ctx *PQContext) GetMessageInfo(params []string) apis.IResponse

func (*PQContext) Pop

func (ctx *PQContext) Pop(params []string) apis.IResponse

Pop message from queue completely removing it.

func (*PQContext) PopLock

func (ctx *PQContext) PopLock(params []string) apis.IResponse

PopLock gets message from the queue setting lock timeout.

func (*PQContext) Push

func (ctx *PQContext) Push(params []string) apis.IResponse

Push message to the queue. Pushing message automatically enables auto expiration.

func (*PQContext) SetParamValue

func (ctx *PQContext) SetParamValue(params []string) apis.IResponse

func (*PQContext) UnlockByReceipt

func (ctx *PQContext) UnlockByReceipt(params []string) apis.IResponse

UnlockByReceipt unlocks locked message using provided receipt. Unlocking by receipt is making sure message was not relocked by something else.

func (*PQContext) UnlockMessageById

func (ctx *PQContext) UnlockMessageById(params []string) apis.IResponse

func (*PQContext) UpdateLockById

func (ctx *PQContext) UpdateLockById(params []string) apis.IResponse

UpdateLockById sets a user defined message lock timeout. It works only for locked messages.

func (*PQContext) UpdateLockByRcpt

func (ctx *PQContext) UpdateLockByRcpt(params []string) apis.IResponse

UpdateLockByRcpt updates message lock according to provided receipt.

type PQMsgMetaData

type PQMsgMetaData struct {
	SerialNumber uint64
	PQueueMsgData
}

func NewPQMsgMetaData

func NewPQMsgMetaData(id string, priority int64, expireTs int64, sn uint64) *PQMsgMetaData

func UnmarshalPQMsgMetaData

func UnmarshalPQMsgMetaData(sn uint64, buf []byte) *PQMsgMetaData

func (*PQMsgMetaData) ByteMarshal

func (self *PQMsgMetaData) ByteMarshal() []byte

func (*PQMsgMetaData) Sn2Bin

func (pqm *PQMsgMetaData) Sn2Bin() string

type PQueue

type PQueue struct {
	// A wrapper on top of common database operations.
	db.DBService
	// contains filtered or unexported fields
}

func InitPQueue

func InitPQueue(svcs apis.IServices, desc *queue_info.ServiceDescription, config *conf.PQConfig) *PQueue

func (*PQueue) AvailableMessages

func (pq *PQueue) AvailableMessages() int64

AvailableMessages returns number of available messages.

func (*PQueue) CheckTimeouts

func (pq *PQueue) CheckTimeouts(ts int64) apis.IResponse

func (*PQueue) Clear

func (pq *PQueue) Clear()

Clear drops all locked and unlocked messages in the queue.

func (*PQueue) Close

func (pq *PQueue) Close()

func (*PQueue) Config

func (pq *PQueue) Config() *conf.PQConfig

ServiceConfig returns service config as an empty interface type. User service type getter to find out the expected config type.

func (*PQueue) DelayedCount

func (pq *PQueue) DelayedCount() int64

DelayedCount is the number of messages which are delayed for delivery.

func (*PQueue) DeleteById

func (pq *PQueue) DeleteById(msgId string) apis.IResponse

func (*PQueue) DeleteByReceipt

func (pq *PQueue) DeleteByReceipt(rcpt string) apis.IResponse

func (*PQueue) DeleteLockedById

func (pq *PQueue) DeleteLockedById(msgId string) apis.IResponse

func (*PQueue) Description

func (pq *PQueue) Description() *queue_info.ServiceDescription

Description is queue description.

func (*PQueue) GetCurrentStatus

func (pq *PQueue) GetCurrentStatus() apis.IResponse

func (*PQueue) GetMessageInfo

func (pq *PQueue) GetMessageInfo(msgId string) apis.IResponse

func (*PQueue) GetStatus

func (pq *PQueue) GetStatus() map[string]interface{}

GetStatus returns detailed queue status information.

func (*PQueue) Info

func (pq *PQueue) Info() apis.ServiceInfo

func (*PQueue) IsClosed

func (pq *PQueue) IsClosed() bool

func (*PQueue) LockedCount

func (pq *PQueue) LockedCount() int64

LockedCount is the number of messages which are locked at the moment.

func (*PQueue) NewContext

func (pq *PQueue) NewContext(rw apis.ResponseWriter) apis.ServiceContext

func (*PQueue) Pop

func (pq *PQueue) Pop(lockTimeout, popWaitTimeout, limit int64, lock bool) apis.IResponse

PopWaitItems pops 'limit' messages within 'timeout'(milliseconds) time interval.

func (*PQueue) Push

func (pq *PQueue) Push(msgId string, payload string, msgTtl, delay, priority int64) apis.IResponse

func (*PQueue) ReleaseInFlight

func (pq *PQueue) ReleaseInFlight(cutOffTs int64) apis.IResponse

func (*PQueue) SetParams

func (pq *PQueue) SetParams(params *PQueueParams) apis.IResponse

func (*PQueue) StartUpdate

func (pq *PQueue) StartUpdate()

StartUpdate runs a loop of periodic data updates.

func (*PQueue) TimeoutItems

func (pq *PQueue) TimeoutItems(cutOffTs int64) apis.IResponse

func (*PQueue) TotalMessages

func (pq *PQueue) TotalMessages() int64

TotalMessages returns a number of all messages currently in the queue.

func (*PQueue) UnlockByReceipt

func (pq *PQueue) UnlockByReceipt(rcpt string) apis.IResponse

func (*PQueue) UnlockMessageById

func (pq *PQueue) UnlockMessageById(msgId string) apis.IResponse

func (*PQueue) UpdateLockById

func (pq *PQueue) UpdateLockById(msgId string, lockTimeout int64) apis.IResponse

UpdateLockById sets a user defined message lock timeout. It works only for locked messages.

func (*PQueue) UpdateLockByRcpt

func (pq *PQueue) UpdateLockByRcpt(rcpt string, lockTimeout int64) apis.IResponse

UpdateLockByRcpt sets a user defined message lock timeout tp the message that matches receipt.

type PQueueMsgData

type PQueueMsgData struct {
	Priority int64  `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"`
	ExpireTs int64  `protobuf:"varint,2,opt,name=expire_ts,json=expireTs,proto3" json:"expire_ts,omitempty"`
	PopCount int64  `protobuf:"varint,3,opt,name=pop_count,json=popCount,proto3" json:"pop_count,omitempty"`
	UnlockTs int64  `protobuf:"varint,4,opt,name=unlock_ts,json=unlockTs,proto3" json:"unlock_ts,omitempty"`
	StrId    string `protobuf:"bytes,5,opt,name=str_id,json=strId,proto3" json:"str_id,omitempty"`
}

func (*PQueueMsgData) Descriptor

func (*PQueueMsgData) Descriptor() ([]byte, []int)

func (*PQueueMsgData) Equal

func (this *PQueueMsgData) Equal(that interface{}) bool

func (*PQueueMsgData) GoString

func (this *PQueueMsgData) GoString() string

func (*PQueueMsgData) Marshal

func (m *PQueueMsgData) Marshal() (data []byte, err error)

func (*PQueueMsgData) MarshalTo

func (m *PQueueMsgData) MarshalTo(data []byte) (int, error)

func (*PQueueMsgData) ProtoMessage

func (*PQueueMsgData) ProtoMessage()

func (*PQueueMsgData) Reset

func (m *PQueueMsgData) Reset()

func (*PQueueMsgData) Size

func (m *PQueueMsgData) Size() (n int)

func (*PQueueMsgData) String

func (this *PQueueMsgData) String() string

func (*PQueueMsgData) Unmarshal

func (m *PQueueMsgData) Unmarshal(data []byte) error

type PQueueParams

type PQueueParams struct {
	MsgTTL         *int64
	MaxMsgSize     *int64
	MaxMsgsInQueue *int64
	DeliveryDelay  *int64
	PopCountLimit  *int64
	PopLockTimeout *int64
	PopWaitTimeout *int64
	FailQueue      string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL