mqpf

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2021 License: MIT Imports: 13 Imported by: 0

README

Mimit Queue Process Framework

Go Reference Build Status Coverage Status Go Report Card

Description

Mimit queue process framework provide a consumer frame work for task queue process.

By implement QueueEventHandlerInterface and provide the data object to QueueFramework, user can Launch a queue.

User should implement ConsumeMessage function at least, as it is the entry for business logic. Sample code is provided in sample directory.

Work Flow

  1. Launch and invoke BeforeLaunch interface.
  2. Wait for queue message.
  3. Change visibility of the message for VisibilityTimeout seconds.
  4. Invoke ParseMessageBody interface if the queue receives a message.
  5. Invoke ConsumeMessage interface for user business logic.
  6. If ConsumeMessage successfully disposes the message, the message will be deleted. Otherwise, OnConsumeFailed interface will be invoked, and the message will be visible after VisibilityTimeout seconds.
  7. Go back to step 2.
  8. User can stop the flow by invoke Stop interface, AfterLaunch will be invoked.

Besides the default flow, OnParseMessageBodyFailed is invoked if ParseMessageBody interface can not decode the message correctly.

BeforeChangeVisibility and AfterChangeVisibility are invoked before and after the queue changes message visibility.

OnChangeVisibilityFailed is invoked if the queue meets error in changing message visibility.

OnError is invoked whenever an error happens in the flow. User can log and process the error message.

OnWaitingMessage is invoked when the queue framework starts to wait for one queue message.

OnWaitingProcessing is invoked whenever too many messages are processing and the queue will wait for a few seconds. If the queue need to wait for over config.OverloadBreakSeconds, the queue will stop itself.

OnRecoverProcessing is invoked when the queue is recovered from waiting for processing.

Flow Control Scheme

The queue framework will wait for a few seconds, if too many messages are being processed. When the count of processing messages is greater than QueueConfig.MaxProcessingMessage, a timer is triggered to wait for the queue to be more idle. The seconds to wait increases if the queue needs to wait for the processing continuously.

By default, the queue will stop itself if the waiting seconds is longer than QueueConfig.OverloadBreakSeconds.

Documentation

Index

Constants

View Source
const MinLogDuration = 1 // seconds

MinLogDuration is the minimum log period for top QPS monitor

Variables

This section is empty.

Functions

func NewQueueFramework

func NewQueueFramework(q ali_mns.AliMNSQueue, c cl.QueueConfig, h QueueEventHandlerInterface) *queueFramework

NewQueueFramework creates and returns a queue framework which consists with a provided event handler, and configures the queue framework by provided QueueConfig.

Types

type DefaultEventHandler

type DefaultEventHandler struct{}

DefaultEventHandler implements QueueEventHandlerInterface for default apis. Users can derive from this struct and override the apis.

func (*DefaultEventHandler) AfterChangeVisibility

func (d *DefaultEventHandler) AfterChangeVisibility(_ ali_mns.AliMNSQueue,
	_ *ali_mns.MessageReceiveResponse,
	_ *ali_mns.MessageVisibilityChangeResponse)

AfterChangeVisibility does nothing by default.

func (*DefaultEventHandler) AfterLaunch

func (d *DefaultEventHandler) AfterLaunch(_ QueueFramework)

AfterLaunch does nothing by default.

func (*DefaultEventHandler) BeforeChangeVisibility

func (d *DefaultEventHandler) BeforeChangeVisibility(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse)

BeforeChangeVisibility does nothing by default.

func (*DefaultEventHandler) BeforeLaunch

func (d *DefaultEventHandler) BeforeLaunch(qf QueueFramework)

BeforeLaunch registers system INT, TERM, STOP signals to stop the queue, and HUP signal to update the performance log.

func (*DefaultEventHandler) ConsumeMessage

func (d *DefaultEventHandler) ConsumeMessage(_ []byte, _ *ali_mns.MessageReceiveResponse) error

ConsumeMessage does nothing by default.

func (*DefaultEventHandler) OnChangeVisibilityFailed

func (d *DefaultEventHandler) OnChangeVisibilityFailed(_ ali_mns.AliMNSQueue,
	_ *ali_mns.MessageReceiveResponse,
	_ *ali_mns.MessageVisibilityChangeResponse)

OnChangeVisibilityFailed does nothing by default.

func (*DefaultEventHandler) OnConsumeFailed

func (d *DefaultEventHandler) OnConsumeFailed(_ error, _ []byte, _ *ali_mns.MessageReceiveResponse)

OnConsumeFailed does nothing by default.

func (*DefaultEventHandler) OnError

func (d *DefaultEventHandler) OnError(err error, queue ali_mns.AliMNSQueue,
	resp *ali_mns.MessageReceiveResponse,
	vret *ali_mns.MessageVisibilityChangeResponse,
	qf QueueFramework)

OnError logs the error in statistic and logger. The message will be deleted if dequeue count is over MaxDequeueCount config.

func (*DefaultEventHandler) OnParseMessageBodyFailed

func (d *DefaultEventHandler) OnParseMessageBodyFailed(_ error, _ *ali_mns.MessageReceiveResponse)

OnParseMessageBodyFailed does nothing by default.

func (*DefaultEventHandler) OnRecoverProcessing

func (d *DefaultEventHandler) OnRecoverProcessing(_ QueueFramework)

OnRecoverProcessing only logs queue recovered message by default.

func (*DefaultEventHandler) OnWaitingMessage

func (d *DefaultEventHandler) OnWaitingMessage(qf QueueFramework)

OnWaitingMessage only updates statistic logs by default.

func (*DefaultEventHandler) OnWaitingProcessing

func (d *DefaultEventHandler) OnWaitingProcessing(qf QueueFramework)

OnWaitingProcessing only logs waiting message by default.

func (*DefaultEventHandler) ParseMessageBody

func (d *DefaultEventHandler) ParseMessageBody(resp *ali_mns.MessageReceiveResponse) ([]byte, error)

ParseMessageBody decodes response string in base64 by default.

type QueueEventHandlerInterface

type QueueEventHandlerInterface interface {
	// BeforeLaunch function is invoked when framework Launch function starts.
	// qf QueueFramework is the framework
	BeforeLaunch(qf QueueFramework)

	// AfterLaunch function is invoked when framework Launch function starts.
	// qf QueueFramework is the framework
	AfterLaunch(qf QueueFramework)

	// OnWaitingMessage is invoked when queue framework starts to wait for one queue message.
	// User can log queue status or do something besides normal dispose flow.
	OnWaitingMessage(qf QueueFramework)

	// ParseMessageBody decodes the message body and is invoked when message is received.
	// The decoded message will be passed to ConsumeMessage interface as the first parameter.
	ParseMessageBody(resp *ali_mns.MessageReceiveResponse) ([]byte, error)

	// OnParseMessageBodyFailed is invoked if ParseMessageBody return a non-nil error.
	// User can log and deal with the error and response body in this function.
	OnParseMessageBodyFailed(err error, resp *ali_mns.MessageReceiveResponse)

	// ConsumeMessage is the entry for user business logic. The decoded body and response struct are provided.
	ConsumeMessage(body []byte, resp *ali_mns.MessageReceiveResponse) error

	// OnConsumeFailed is invoked if ConsumeMessage return a non-nil error.
	OnConsumeFailed(err error, body []byte, resp *ali_mns.MessageReceiveResponse)

	// BeforeChangeVisibility is invoked before the queue framework changes message visibility.
	BeforeChangeVisibility(q ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse)

	// AfterChangeVisibility is invoked after the queue framework changes message visibility.
	AfterChangeVisibility(q ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse,
		vr *ali_mns.MessageVisibilityChangeResponse)

	// OnChangeVisibilityFailed is invoked if the queue framework can't change message visibility.
	OnChangeVisibilityFailed(q ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse,
		vr *ali_mns.MessageVisibilityChangeResponse)

	// OnWaitingProcessing is invoked whenever too many messages are processing and the queue will
	// wait for a few seconds. If the queue need to wait for over config.OverloadBreakSeconds,
	// the queue will stop itself.
	OnWaitingProcessing(qf QueueFramework)

	// OnRecoverProcessing is invoked when the queue is recovered from waiting for processing.
	OnRecoverProcessing(qf QueueFramework)

	// OnError is invoked whenever an error happens.
	OnError(err error, q ali_mns.AliMNSQueue,
		rr *ali_mns.MessageReceiveResponse, vr *ali_mns.MessageVisibilityChangeResponse,
		qf QueueFramework)
}

QueueEventHandlerInterface defines the interfaces for queue event handler. User should implement all the API functions, or derive from DefaultEventHandler struct.

type QueueFramework

type QueueFramework interface {
	RegisterBreakQueueOsSingal(sigs ...os.Signal)
	GetConfig() cl.QueueConfig
	GetStatistic() *Statistic
	SetQueue(q ali_mns.AliMNSQueue)
	HasValidQueue() bool
	SetEventHandler(h QueueEventHandlerInterface)
	HasEventHandler() bool
	Launch()
	Stop()
	WaitProcessingSeconds(pop bool) int
}

QueueFramework is the interface that should be satisfied for any modified queue framework.

type Statistic

type Statistic struct {
	// contains filtered or unexported fields
}

Statistic is a struct for queue performance monitoring data

func (*Statistic) Fetch

func (ss *Statistic) Fetch(paramName string) uint64

Fetch returns statistic value by paramName request. The function is thread safe.

func (*Statistic) HandleError

func (ss *Statistic) HandleError(count ...uint64)

HandleError adds 'count' for encountering problems situation in statistic value. If count is not provided, the default value for count is 1.

func (*Statistic) HandleSuccess

func (ss *Statistic) HandleSuccess(count ...uint64)

HandleSuccess adds 'count' for successful handled situation in statistic value. If count is not provided, the default value for count is 1.

func (*Statistic) Loop

func (ss *Statistic) Loop(count ...uint64)

Loop adds 'count' for loops that fetched no messages in statistic value. If count is not provided, the default value for count is 1.

func (*Statistic) MessageReceived

func (ss *Statistic) MessageReceived(count ...uint64)

MessageReceived adds 'count' for received message in statistic value. If count is not provided, the default value for count is 1.

func (*Statistic) Monitor

func (ss *Statistic) Monitor() bool

Monitor update periodMonitor objects which logs a period performance for queue handling.

func (*Statistic) MonitorLog

func (ss *Statistic) MonitorLog() string

MonitorLog returns period monitor log in string format.

func (*Statistic) Performance

func (ss *Statistic) Performance() string

Performance returns performance log result in string format.

func (*Statistic) QueueError

func (ss *Statistic) QueueError(count ...uint64)

QueueError adds 'count' for queue problems in statistic value. If count is not provided, the default value for count is 1.

func (*Statistic) Start

func (ss *Statistic) Start()

Start is invoked to log when the monitoring task begins.

func (Statistic) String

func (ss Statistic) String() string

func (*Statistic) Wait

func (ss *Statistic) Wait(count ...uint64)

Wait adds 'count' for queue wait in statistic value. If count is not provided, the default value for count is 1.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL