dispatchers

package
v0.0.0-...-2b0d425 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2017 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrBatchTooLarge   = "BatchTooLarge"
	ErrMessageTooLarge = "MessageTooLarge"
)
View Source
const InfluxMaxBatchSize = 500
View Source
const KinesisMaxNumberOfRecords = 500
View Source
const KinesisMaxSizeInBytes = 5 * MEGABYTE
View Source
const KinesisPartitionKeyMaxSize = 256
View Source
const MEGABYTE = 1024 * 1024

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch interface {
	Add(message []byte) error
	CanAdd(message []byte) bool
	Len() int
}

type Dispatcher

type Dispatcher interface {
	// accepts a message for dispatching
	Put([]byte) bool
	// the dispatching worker
	Dispatch()
}

TODO: Remove this interface (we're covered by service now)

type EchoBatch

type EchoBatch struct {
	Messages [][]byte
}

func (*EchoBatch) Add

func (b *EchoBatch) Add(message []byte) error

func (*EchoBatch) CanAdd

func (b *EchoBatch) CanAdd(message []byte) bool

func (*EchoBatch) Len

func (b *EchoBatch) Len() int

type EchoService

type EchoService struct{}

Used for local testing / debugging

func (*EchoService) CreateBatch

func (svc *EchoService) CreateBatch() Batch

prints the message to stdout

func (*EchoService) Send

func (svc *EchoService) Send(batch Batch) error

type InfluxBatch

type InfluxBatch struct {
	// contains filtered or unexported fields
}

func NewInfluxBatch

func NewInfluxBatch() *InfluxBatch

func (*InfluxBatch) Add

func (batch *InfluxBatch) Add(message []byte) error

func (*InfluxBatch) CanAdd

func (batch *InfluxBatch) CanAdd(message []byte) bool

func (*InfluxBatch) Len

func (batch *InfluxBatch) Len() int

type InfluxService

type InfluxService struct {
	Host     string
	Database string
}

func NewInfluxService

func NewInfluxService(host string, database string) *InfluxService

func (*InfluxService) CreateBatch

func (svc *InfluxService) CreateBatch() Batch

func (*InfluxService) Send

func (svc *InfluxService) Send(batch Batch) error

type KinesisBatch

type KinesisBatch struct {
	// contains filtered or unexported fields
}

func NewKinesisBatch

func NewKinesisBatch(streamName string) *KinesisBatch

func (*KinesisBatch) Add

func (batch *KinesisBatch) Add(message []byte) error

inserts message into batch; if not possible returns an error

func (*KinesisBatch) CanAdd

func (batch *KinesisBatch) CanAdd(message []byte) bool

func (*KinesisBatch) IsEmpty

func (batch *KinesisBatch) IsEmpty() bool

func (*KinesisBatch) IsReady

func (batch *KinesisBatch) IsReady(message []byte) bool

func (*KinesisBatch) Len

func (batch *KinesisBatch) Len() int

type KinesisService

type KinesisService struct {
	// contains filtered or unexported fields
}

func NewKinesisService

func NewKinesisService(streamName string, awsRegion string) *KinesisService

func (*KinesisService) CreateBatch

func (svc *KinesisService) CreateBatch() Batch

func (*KinesisService) Send

func (svc *KinesisService) Send(batch Batch) error

type MessageDispatcher

type MessageDispatcher struct {
	Service Service
	// contains filtered or unexported fields
}

func NewMessageDispatcher

func NewMessageDispatcher(service Service, bufferSize int) *MessageDispatcher

Creates and returns a new dispatcher

func (*MessageDispatcher) Dispatch

func (dispatcher *MessageDispatcher) Dispatch()

func (*MessageDispatcher) Put

func (dispatcher *MessageDispatcher) Put(message []byte) bool

inserts the message into the buffer for dispatching returns true for successful insertion, false if message was dropped will never block; if the queue is full, the message will be dropped

func (*MessageDispatcher) SetBatchFrequency

func (dispatcher *MessageDispatcher) SetBatchFrequency(freq time.Duration)

type MockDispatcher

type MockDispatcher struct {
	Messages chan string
}

The following is a mock type for use in tests

func (*MockDispatcher) Dispatch

func (dispatcher *MockDispatcher) Dispatch()

func (*MockDispatcher) Put

func (dispatcher *MockDispatcher) Put(message []byte) bool

type Service

type Service interface {
	CreateBatch() Batch
	Send(Batch) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL