Documentation ¶
Index ¶
- Constants
- func NewQueueFramework(q ali_mns.AliMNSQueue, c cl.QueueConfig, h QueueEventHandlerInterface) *queueFramework
- type DefaultEventHandler
- func (d *DefaultEventHandler) AfterChangeVisibility(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse, ...)
- func (d *DefaultEventHandler) AfterLaunch(_ QueueFramework)
- func (d *DefaultEventHandler) BeforeChangeVisibility(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse)
- func (d *DefaultEventHandler) BeforeLaunch(qf QueueFramework)
- func (d *DefaultEventHandler) ConsumeMessage(_ []byte, _ *ali_mns.MessageReceiveResponse) error
- func (d *DefaultEventHandler) OnChangeVisibilityFailed(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse, ...)
- func (d *DefaultEventHandler) OnConsumeFailed(_ error, _ []byte, _ *ali_mns.MessageReceiveResponse)
- func (d *DefaultEventHandler) OnError(err error, queue ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse, ...)
- func (d *DefaultEventHandler) OnParseMessageBodyFailed(_ error, _ *ali_mns.MessageReceiveResponse)
- func (d *DefaultEventHandler) OnRecoverProcessing(_ QueueFramework)
- func (d *DefaultEventHandler) OnWaitingMessage(qf QueueFramework)
- func (d *DefaultEventHandler) OnWaitingProcessing(qf QueueFramework)
- func (d *DefaultEventHandler) ParseMessageBody(resp *ali_mns.MessageReceiveResponse) ([]byte, error)
- type QueueEventHandlerInterface
- type QueueFramework
- type Statistic
- func (ss *Statistic) Fetch(paramName string) uint64
- func (ss *Statistic) HandleError(count ...uint64)
- func (ss *Statistic) HandleSuccess(count ...uint64)
- func (ss *Statistic) Loop(count ...uint64)
- func (ss *Statistic) MessageReceived(count ...uint64)
- func (ss *Statistic) Monitor() bool
- func (ss *Statistic) MonitorLog() string
- func (ss *Statistic) Performance() string
- func (ss *Statistic) QueueError(count ...uint64)
- func (ss *Statistic) Start()
- func (ss Statistic) String() string
- func (ss *Statistic) Wait(count ...uint64)
Constants ¶
const MinLogDuration = 1 // seconds
MinLogDuration is the minimum log period for top QPS monitor
Variables ¶
This section is empty.
Functions ¶
func NewQueueFramework ¶
func NewQueueFramework(q ali_mns.AliMNSQueue, c cl.QueueConfig, h QueueEventHandlerInterface) *queueFramework
NewQueueFramework creates and returns a queue framework which consists with a provided event handler, and configures the queue framework by provided QueueConfig.
Types ¶
type DefaultEventHandler ¶
type DefaultEventHandler struct{}
DefaultEventHandler implements QueueEventHandlerInterface for default apis. Users can derive from this struct and override the apis.
func (*DefaultEventHandler) AfterChangeVisibility ¶
func (d *DefaultEventHandler) AfterChangeVisibility(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse, _ *ali_mns.MessageVisibilityChangeResponse)
AfterChangeVisibility does nothing by default.
func (*DefaultEventHandler) AfterLaunch ¶
func (d *DefaultEventHandler) AfterLaunch(_ QueueFramework)
AfterLaunch does nothing by default.
func (*DefaultEventHandler) BeforeChangeVisibility ¶
func (d *DefaultEventHandler) BeforeChangeVisibility(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse)
BeforeChangeVisibility does nothing by default.
func (*DefaultEventHandler) BeforeLaunch ¶
func (d *DefaultEventHandler) BeforeLaunch(qf QueueFramework)
BeforeLaunch registers system INT, TERM, STOP signals to stop the queue, and HUP signal to update the performance log.
func (*DefaultEventHandler) ConsumeMessage ¶
func (d *DefaultEventHandler) ConsumeMessage(_ []byte, _ *ali_mns.MessageReceiveResponse) error
ConsumeMessage does nothing by default.
func (*DefaultEventHandler) OnChangeVisibilityFailed ¶
func (d *DefaultEventHandler) OnChangeVisibilityFailed(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse, _ *ali_mns.MessageVisibilityChangeResponse)
OnChangeVisibilityFailed does nothing by default.
func (*DefaultEventHandler) OnConsumeFailed ¶
func (d *DefaultEventHandler) OnConsumeFailed(_ error, _ []byte, _ *ali_mns.MessageReceiveResponse)
OnConsumeFailed does nothing by default.
func (*DefaultEventHandler) OnError ¶
func (d *DefaultEventHandler) OnError(err error, queue ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse, vret *ali_mns.MessageVisibilityChangeResponse, qf QueueFramework)
OnError logs the error in statistic and logger. The message will be deleted if dequeue count is over MaxDequeueCount config.
func (*DefaultEventHandler) OnParseMessageBodyFailed ¶
func (d *DefaultEventHandler) OnParseMessageBodyFailed(_ error, _ *ali_mns.MessageReceiveResponse)
OnParseMessageBodyFailed does nothing by default.
func (*DefaultEventHandler) OnRecoverProcessing ¶
func (d *DefaultEventHandler) OnRecoverProcessing(_ QueueFramework)
OnRecoverProcessing only logs queue recovered message by default.
func (*DefaultEventHandler) OnWaitingMessage ¶
func (d *DefaultEventHandler) OnWaitingMessage(qf QueueFramework)
OnWaitingMessage only updates statistic logs by default.
func (*DefaultEventHandler) OnWaitingProcessing ¶
func (d *DefaultEventHandler) OnWaitingProcessing(qf QueueFramework)
OnWaitingProcessing only logs waiting message by default.
func (*DefaultEventHandler) ParseMessageBody ¶
func (d *DefaultEventHandler) ParseMessageBody(resp *ali_mns.MessageReceiveResponse) ([]byte, error)
ParseMessageBody decodes response string in base64 by default.
type QueueEventHandlerInterface ¶
type QueueEventHandlerInterface interface { // BeforeLaunch function is invoked when framework Launch function starts. // qf QueueFramework is the framework BeforeLaunch(qf QueueFramework) // AfterLaunch function is invoked when framework Launch function starts. // qf QueueFramework is the framework AfterLaunch(qf QueueFramework) // OnWaitingMessage is invoked when queue framework starts to wait for one queue message. // User can log queue status or do something besides normal dispose flow. OnWaitingMessage(qf QueueFramework) // ParseMessageBody decodes the message body and is invoked when message is received. // The decoded message will be passed to ConsumeMessage interface as the first parameter. ParseMessageBody(resp *ali_mns.MessageReceiveResponse) ([]byte, error) // OnParseMessageBodyFailed is invoked if ParseMessageBody return a non-nil error. // User can log and deal with the error and response body in this function. OnParseMessageBodyFailed(err error, resp *ali_mns.MessageReceiveResponse) // ConsumeMessage is the entry for user business logic. The decoded body and response struct are provided. ConsumeMessage(body []byte, resp *ali_mns.MessageReceiveResponse) error // OnConsumeFailed is invoked if ConsumeMessage return a non-nil error. OnConsumeFailed(err error, body []byte, resp *ali_mns.MessageReceiveResponse) // BeforeChangeVisibility is invoked before the queue framework changes message visibility. BeforeChangeVisibility(q ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse) // AfterChangeVisibility is invoked after the queue framework changes message visibility. AfterChangeVisibility(q ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse, vr *ali_mns.MessageVisibilityChangeResponse) // OnChangeVisibilityFailed is invoked if the queue framework can't change message visibility. OnChangeVisibilityFailed(q ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse, vr *ali_mns.MessageVisibilityChangeResponse) // OnWaitingProcessing is invoked whenever too many messages are processing and the queue will // wait for a few seconds. If the queue need to wait for over config.OverloadBreakSeconds, // the queue will stop itself. OnWaitingProcessing(qf QueueFramework) // OnRecoverProcessing is invoked when the queue is recovered from waiting for processing. OnRecoverProcessing(qf QueueFramework) // OnError is invoked whenever an error happens. OnError(err error, q ali_mns.AliMNSQueue, rr *ali_mns.MessageReceiveResponse, vr *ali_mns.MessageVisibilityChangeResponse, qf QueueFramework) }
QueueEventHandlerInterface defines the interfaces for queue event handler. User should implement all the API functions, or derive from DefaultEventHandler struct.
type QueueFramework ¶
type QueueFramework interface { RegisterBreakQueueOsSingal(sigs ...os.Signal) GetConfig() cl.QueueConfig GetStatistic() *Statistic SetQueue(q ali_mns.AliMNSQueue) HasValidQueue() bool SetEventHandler(h QueueEventHandlerInterface) HasEventHandler() bool Launch() Stop() WaitProcessingSeconds(pop bool) int }
QueueFramework is the interface that should be satisfied for any modified queue framework.
type Statistic ¶
type Statistic struct {
// contains filtered or unexported fields
}
Statistic is a struct for queue performance monitoring data
func (*Statistic) Fetch ¶
Fetch returns statistic value by paramName request. The function is thread safe.
func (*Statistic) HandleError ¶
HandleError adds 'count' for encountering problems situation in statistic value. If count is not provided, the default value for count is 1.
func (*Statistic) HandleSuccess ¶
HandleSuccess adds 'count' for successful handled situation in statistic value. If count is not provided, the default value for count is 1.
func (*Statistic) Loop ¶
Loop adds 'count' for loops that fetched no messages in statistic value. If count is not provided, the default value for count is 1.
func (*Statistic) MessageReceived ¶
MessageReceived adds 'count' for received message in statistic value. If count is not provided, the default value for count is 1.
func (*Statistic) Monitor ¶
Monitor update periodMonitor objects which logs a period performance for queue handling.
func (*Statistic) MonitorLog ¶
MonitorLog returns period monitor log in string format.
func (*Statistic) Performance ¶
Performance returns performance log result in string format.
func (*Statistic) QueueError ¶
QueueError adds 'count' for queue problems in statistic value. If count is not provided, the default value for count is 1.