enqueuestomp

package module
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2020 License: MIT Imports: 15 Imported by: 0

README

enqueuestomp

CircleCI Release GoDoc Go Report Card

Use

package main

import (
    "log"

    "github.com/globocom/enqueuestomp"
    "github.com/go-stomp/stomp"
)

func main() {
    enqueueConfig := enqueuestomp.Config{
        Addr:  "localhost:61613",
    }
    enqueueConfig.SetOptions(
        stomp.ConnOpt.HeartBeat(0*time.Second, 0*time.Second),
    )

    enqueue, err := enqueuestomp.NewEnqueueStomp(
        enqueueConfig,
    )
    if err != nil {
        log.Fatalf("error %s", err)
    }

    sc := enqueuestomp.SendConfig{}
    sc.SetOptions(
        stomp.SendOpt.Header("persistent", "true"),
    )

    name := "queueName"
    body := []byte("queueBody")
    err := enqueue.SendQueue(name, body, sc)
    if err != nil {
        fmt.Printf("error %s", err)
    }
}
Enqueue config
type Config struct {
    // Default is tcp
    Network string

    // host:port address
    // Default is localhost:61613
    Addr string

    // https://pkg.go.dev/github.com/go-stomp/stomp
    Options []func(*stomp.Conn) error

    // The maxWorkers parameter specifies the maximum number of workers that can
    // execute tasks concurrently.  When there are no incoming tasks, workers are
    // gradually stopped until there are no remaining workers.
    // Default is runtime.NumCPU()
    MaxWorkers int

    // Default is 3, Max is 5
    RetriesConnect int

    // Used to determine how long a retry request should wait until attempted.
    // Default is ExponentialBackoff
    BackoffConnect BackoffStrategy

    // File path to write logging output to
    WriteOutputPath string

    // Logger that will be used
    // Default is nothing
    Logger Logger

    // create unique identifier
    // Default google/uuid
    IdentifierFunc func() string
}
Send config
type SendConfig struct {
    // The content type should be specified, according to the STOMP specification, but if contentType is an empty
    // string, the message will be delivered without a content-type header entry.
    // Default is text/plain.
    ContentType string

    // Any number of options can be specified in opts. See the examples for usage. Options include whether
    // to receive a RECEIPT, should the content-length be suppressed, and sending custom header entries.
    // https://pkg.go.dev/github.com/go-stomp/stomp/frame
    Options []func(*frame.Frame) error

    BeforeSend func(identifier string, destinationType string, destinationName string, body []byte, startTime time.Time)

    AfterSend func(identifier string, destinationType string, destinationName string, body []byte, startTime time.Time, err error)

    // the name of the CircuitBreaker.
    // Default is empty
    CircuitName string
}
CircuitBreaker config
type CircuitBreakerConfig struct {
    // how long to wait for command to complete, in milliseconds
    // Default is 10000
    Timeout int

    // how many commands of the same type can run at the same time
    // Default is 10000
    MaxConcurrentRequests int

    // the minimum number of requests needed before a circuit can be tripped due to health
    // Default is 100
    RequestVolumeThreshold int

    //  how long, in milliseconds, to wait after a circuit opens before testing for recovery
    // Default is 500
    SleepWindow int

    // causes circuits to open once the rolling measure of errors exceeds this percent of requests
    // Default is 5
    ErrorPercentThreshold int
}
Documentation

GoDoc

Full documentation for the package can be viewed online using the GoDoc site here: https://pkg.go.dev/github.com/globocom/enqueuestomp

Documentation

Index

Constants

View Source
const (
	DefaultRetriesConnect    = 3
	DefaultMaxRetriesConnect = 5
)
View Source
const (
	DestinationTypeQueue = "queue"
	DestinationTypeTopic = "topic"
)
View Source
const DefaultInitialBackOff = 100 * time.Millisecond

Variables

View Source
var (
	ErrEmptyBody      = errors.New("empty body")
	ErrEmptyQueueName = errors.New("empty queue name")
	ErrEmptyTopicName = errors.New("empty topic name")

	DefaultExpiresCheck = 1 * time.Minute
	DefaultBodyCheck    = []byte("PING")
)

Functions

func ConstantBackOff

func ConstantBackOff(_ int) time.Duration

ConstantBackOff always returns DefaultInitialBackOff.

func ExponentialBackoff

func ExponentialBackoff(i int) time.Duration

ExponentialBackoff returns ever increasing backoffs by a power of 2.

func LinearBackoff

func LinearBackoff(i int) time.Duration

LinearBackoff returns increasing durations.

Types

type BackoffStrategy

type BackoffStrategy func(retry int) time.Duration

BackoffStrategy is used to determine how long a retry request should wait until attempted.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	// how long to wait for command to complete, in milliseconds
	// Default is 10000
	Timeout int

	// how many commands of the same type can run at the same time
	// Default is 10000
	MaxConcurrentRequests int

	// the minimum number of requests needed before a circuit can be tripped due to health
	// Default is 100
	RequestVolumeThreshold int

	//  how long, in milliseconds, to wait after a circuit opens before testing for recovery
	// Default is 500
	SleepWindow int

	// causes circuits to open once the rolling measure of errors exceeds this percent of requests
	// Default is 5
	ErrorPercentThreshold int
}

type Config

type Config struct {
	// Default is tcp
	Network string

	// host:port address
	// Default is localhost:61613
	Addr string

	// https://pkg.go.dev/github.com/go-stomp/stomp
	Options []func(*stomp.Conn) error

	// The maxWorkers parameter specifies the maximum number of workers that can
	// execute tasks concurrently.  When there are no incoming tasks, workers are
	// gradually stopped until there are no remaining workers.
	// Default is runtime.NumCPU()
	MaxWorkers int

	// Default is 3, Max is 5
	RetriesConnect int

	// Used to determine how long a retry request should wait until attempted.
	// Default is ExponentialBackoff
	BackoffConnect BackoffStrategy

	// File path to write logging output to
	WriteOutputPath string

	// Logger that will be used
	// Default is nothing
	Logger Logger

	// create unique identifier
	// Default google/uuid
	IdentifierFunc func() string
}

func (*Config) AddOption added in v1.5.0

func (c *Config) AddOption(opt func(*stomp.Conn) error)

func (*Config) SetOptions added in v1.5.0

func (c *Config) SetOptions(opts ...func(*stomp.Conn) error)

type EnqueueStomp

type EnqueueStomp struct {
	// contains filtered or unexported fields
}

func NewEnqueueStomp

func NewEnqueueStomp(config Config) (*EnqueueStomp, error)

func (*EnqueueStomp) CheckQueue added in v1.8.0

func (emq *EnqueueStomp) CheckQueue(queueName string) error

func (*EnqueueStomp) CheckTopic added in v1.8.0

func (emq *EnqueueStomp) CheckTopic(topicName string) error

func (*EnqueueStomp) Config

func (emq *EnqueueStomp) Config() Config

func (*EnqueueStomp) ConfigureCircuitBreaker

func (emq *EnqueueStomp) ConfigureCircuitBreaker(name string, cb CircuitBreakerConfig)

func (*EnqueueStomp) Disconnect added in v1.2.0

func (emq *EnqueueStomp) Disconnect() error

func (*EnqueueStomp) QueueSize

func (emq *EnqueueStomp) QueueSize() int

func (*EnqueueStomp) SendQueue

func (emq *EnqueueStomp) SendQueue(queueName string, body []byte, sc SendConfig) error

SendQueue The body array contains the message body, and its content should be consistent with the specified content type.

func (*EnqueueStomp) SendTopic

func (emq *EnqueueStomp) SendTopic(topicName string, body []byte, sc SendConfig) error

SendTopic The body array contains the message body, and its content should be consistent with the specified content type.

type Logger added in v1.1.0

type Logger interface {
	Debugf(template string, args ...interface{})
	Errorf(template string, args ...interface{})
}

type NoopLogger added in v1.1.0

type NoopLogger struct{}

NoopLogger does not log anything.

func (NoopLogger) Debugf added in v1.1.0

func (l NoopLogger) Debugf(template string, args ...interface{})

Debugf does nothing.

func (NoopLogger) Errorf added in v1.1.0

func (l NoopLogger) Errorf(template string, args ...interface{})

Errorf does nothing.

type SendConfig added in v1.1.0

type SendConfig struct {
	// The content type should be specified, according to the STOMP specification, but if contentType is an empty
	// string, the message will be delivered without a content-type header entry.
	// Default is text/plain.
	ContentType string

	// Any number of options can be specified in opts. See the examples for usage. Options include whether
	// to receive a RECEIPT, should the content-length be suppressed, and sending custom header entries.
	// https://pkg.go.dev/github.com/go-stomp/stomp/frame
	Options []func(*frame.Frame) error

	BeforeSend func(identifier string, destinationType string, destinationName string, body []byte, startTime time.Time)

	AfterSend func(identifier string, destinationType string, destinationName string, body []byte, startTime time.Time, err error)

	// the name of the CircuitBreaker.
	// Default is empty
	CircuitName string
}

func (*SendConfig) AddOption added in v1.5.0

func (sc *SendConfig) AddOption(opt func(*frame.Frame) error)

func (*SendConfig) SetOptions added in v1.5.0

func (sc *SendConfig) SetOptions(opts ...func(*frame.Frame) error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL