subee

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2019 License: MIT Imports: 6 Imported by: 4

README

subee

CircleCI latest GoDoc Go Report Card license

Subee (pronounce /ˈsʌbiː/) is a framework for Pub/Sub subscriber worker for Go.

Examples

CLI

We offer an optional cli tool to develop with subee faster.

$ subee help

Usage:
  subee [command]

Available Commands:
  generate    Generate a new code
  help        Help about any command
  start       Build and start subscribers
  version     Print the version information

Flags:
  -h, --help   help for subee

Use "subee [command] --help" for more information about a command.

Installing

macOS
brew install wantedly/tap/subee
Other platforms

You can download prebuilt binaries for each platform in the releases page.

Build from source

go get github.com/wantedly/subee/cmd/subee

Documentation

Index

Constants

View Source
const (
	// DefaultChunkSize is the default maximum chunked message size per consuming.
	DefaultChunkSize = 4

	// DefaultFlushInterval is the default maximum flush interval to receive messages.
	DefaultFlushInterval = 10 * time.Second
)

Variables

This section is empty.

Functions

func SetRawMessage added in v0.5.0

func SetRawMessage(ctx context.Context, msg Message) context.Context

func SetRawMessages added in v0.5.0

func SetRawMessages(ctx context.Context, msgs []Message) context.Context

Types

type Acknowledger

type Acknowledger interface {
	Ack()
	Nack()
}

Acknowledger is an interface to send ACK/NACK.

type BatchConsumer added in v0.2.0

type BatchConsumer interface {
	BatchConsume(context.Context, []Message) error
}

BatchConsumer represents an interface that consume multiple messages.

type BatchConsumerFunc added in v0.2.0

type BatchConsumerFunc func(context.Context, []Message) error

BatchConsumerFunc type is an adapter to allow the use of ordinary functions as BatchConsumer.

func (BatchConsumerFunc) BatchConsume added in v0.2.0

func (f BatchConsumerFunc) BatchConsume(ctx context.Context, msgs []Message) error

BatchConsume call f(ctx, msgs)

type BatchConsumerInterceptor added in v0.2.0

type BatchConsumerInterceptor func(BatchConsumer) BatchConsumer

BatchConsumerInterceptor provides a hook to intercept the execution of a multiple messages consumption.

type BeginTag

type BeginTag struct{}

BeginTag is tag for an receive/consume process starts.

type Config

type Config struct {
	ChunkSize int

	AckImmediately bool

	FlushInterval time.Duration

	Logger Logger

	StatsHandler StatsHandler

	BatchConsumer             BatchConsumer
	BatchConsumerInterceptors []BatchConsumerInterceptor

	Consumer             Consumer
	ConsumerInterceptors []ConsumerInterceptor
}

Config represents the configuration for subee.

type ConsumeBeginTag

type ConsumeBeginTag struct{}

ConsumeBeginTag is tag for consumption start.

type ConsumeEnd

type ConsumeEnd struct {
	BeginTime time.Time
	EndTime   time.Time
	Error     error
}

ConsumeEnd contains stats when consume end.

type Consumer added in v0.2.0

type Consumer interface {
	Consume(context.Context, Message) error
}

Consumer represents an interface that consume single message.

type ConsumerFunc added in v0.2.0

type ConsumerFunc func(context.Context, Message) error

ConsumerFunc type is an adapter to allow the use of ordinary functions as Consumer.

func (ConsumerFunc) Consume added in v0.2.0

func (f ConsumerFunc) Consume(ctx context.Context, msg Message) error

Consume call f(ctx, msgs)

type ConsumerInterceptor added in v0.2.0

type ConsumerInterceptor func(Consumer) Consumer

ConsumerInterceptor provides a hook to intercept the execution of a message consumption.

type Dequeue

type Dequeue struct {
	BeginTime time.Time
	EndTime   time.Time
}

Dequeue contains stats when dequeue in channel.

type End

type End struct {
	MsgCount  int
	BeginTime time.Time
	EndTime   time.Time
}

End contains stats when an receive/consume process ends.

type Engine

type Engine struct {
	*Config
	// contains filtered or unexported fields
}

Engine is the framework instance.

func New added in v0.2.0

func New(subscriber Subscriber, consumer Consumer, opts ...Option) *Engine

New creates a Engine intstance with Consumer.

func NewBatch added in v0.2.0

func NewBatch(subscriber Subscriber, consumer BatchConsumer, opts ...Option) *Engine

NewBatch creates a Engine intstance with BatchConsumer.

func (*Engine) Start

func (e *Engine) Start(ctx context.Context) error

Start starts Subscriber and Consumer process.

type EnqueueTag

type EnqueueTag struct{}

EnqueueTag is tag for enqueue in channel.

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
	Print(v ...interface{})
}

Logger is the interface for logging

func GetLogger

func GetLogger(ctx context.Context) Logger

GetLogger return Logger implementation set in the context.

type Message

type Message interface {
	Acknowledger
	Data() []byte
	Metadata() map[string]string
}

Message is an interface of the subscribed message.

func GetRawMessage added in v0.5.0

func GetRawMessage(ctx context.Context) Message

func GetRawMessages added in v0.5.0

func GetRawMessages(ctx context.Context) []Message

type NopStatsHandler

type NopStatsHandler struct{}

NopStatsHandler is no-op StatsHandler

func (*NopStatsHandler) HandleProcess

func (*NopStatsHandler) HandleProcess(context.Context, Stats)

HandleProcess do nothing

func (*NopStatsHandler) TagProcess

func (*NopStatsHandler) TagProcess(ctx context.Context, t Tag) context.Context

TagProcess returns context without doing anythig

type Option

type Option func(*Config)

Option configures Config.

func WithAckImmediately

func WithAckImmediately() Option

WithAckImmediately returns an Option that make ack messages before consuming.

func WithBatchConsumerInterceptors added in v0.2.0

func WithBatchConsumerInterceptors(interceptors ...BatchConsumerInterceptor) Option

WithBatchConsumerInterceptors returns an Option that sets the BatchConsumerInterceptor implementations(s). Interceptors are called in order of addition. e.g) interceptor1, interceptor2, interceptor3 => interceptor1 => interceptor2 => interceptor3 => BatchConsumer.Consume

func WithChunkSize

func WithChunkSize(size int) Option

WithChunkSize returns an Option that sets the maximum chunked message size per consuming.

func WithConsumerInterceptors added in v0.2.0

func WithConsumerInterceptors(interceptors ...ConsumerInterceptor) Option

WithConsumerInterceptors returns an Option that sets the ConsumerInterceptor implementations(s). Interceptors are called in order of addition. e.g) interceptor1, interceptor2, interceptor3 => interceptor1 => interceptor2 => interceptor3 => Consumer.Consume

func WithFlushInterval

func WithFlushInterval(inval time.Duration) Option

WithFlushInterval returns an Option that sets the maximum flush time interval to receive message.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger returns an Option that sets the Logger implementation.

func WithStatsHandler

func WithStatsHandler(sh StatsHandler) Option

WithStatsHandler returns an Option that sets the StatsHandler implementation.

type Stats

type Stats interface {
	// contains filtered or unexported methods
}

Stats is stats information about receive/consume.

type StatsHandler

type StatsHandler interface {
	TagProcess(context.Context, Tag) context.Context
	HandleProcess(context.Context, Stats)
}

StatsHandler is the interface for related stats handling

type Subscriber

type Subscriber interface {
	Subscribe(context.Context, func(Message)) error
}

Subscriber is the interface to subscribe message.

type Tag

type Tag interface {
	// contains filtered or unexported methods
}

Tag is tag information.

Directories

Path Synopsis
cmd
subee Module
middlewares
logging/zap Module
recovery Module
stats
newrelic Module
subscribers
cloudpubsub Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL