consumer

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2022 License: Apache-2.0 Imports: 16 Imported by: 4

Documentation

Overview

Package consumer is a generated GoMock package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMessageHandler added in v0.5.0

func NewMessageHandler(mpFactory MessageProcessorFactory, opts Options) server.Handler

NewMessageHandler creates a new server handler with messageFn.

Types

type Configuration

type Configuration struct {
	Encoder                   *proto.Configuration      `yaml:"encoder"`
	Decoder                   *proto.Configuration      `yaml:"decoder"`
	MessagePool               *MessagePoolConfiguration `yaml:"messagePool"`
	AckFlushInterval          *time.Duration            `yaml:"ackFlushInterval"`
	AckBufferSize             *int                      `yaml:"ackBufferSize"`
	ConnectionWriteBufferSize *int                      `yaml:"connectionWriteBufferSize"`
	ConnectionReadBufferSize  *int                      `yaml:"connectionReadBufferSize"`
	ConnectionWriteTimeout    *time.Duration            `yaml:"connectionWriteTimeout"`
}

Configuration configs the consumer options.

func (*Configuration) NewOptions

func (c *Configuration) NewOptions(iOpts instrument.Options) Options

NewOptions creates consumer options.

type ConsumeFn

type ConsumeFn func(c Consumer)

ConsumeFn processes the consumer. This is useful when user want to reuse resource across messages received on the same consumer or have finer level control on how to read messages from consumer.

type Consumer

type Consumer interface {
	// Message waits for and returns the next message received.
	Message() (Message, error)

	// Init initializes the consumer.
	Init()

	// Close closes the consumer.
	Close()
}

Consumer receives messages from a connection.

type Listener

type Listener interface {
	// Accept waits for and returns the next connection based consumer.
	Accept() (Consumer, error)

	// Close closes the listener.
	// Any blocked Accept operations will be unblocked and return errors.
	Close() error

	// Addr returns the listener's network address.
	Addr() net.Addr
}

Listener is a consumer listener based on a network address.

func NewListener

func NewListener(addr string, opts Options) (Listener, error)

NewListener creates a consumer listener.

type Message

type Message interface {
	// Bytes returns the bytes.
	Bytes() []byte

	// Ack acks the message.
	Ack()

	// ShardID returns shard ID of the Message.
	ShardID() uint64

	// SentAtNanos returns when the producer sent the Message.
	SentAtNanos() uint64
}

Message carries the data that needs to be processed.

type MessagePoolConfiguration added in v0.5.0

type MessagePoolConfiguration struct {
	// Size is the size of the pool.
	Size pool.Size `yaml:"size"`

	// Watermark is the object pool watermark configuration.
	Watermark pool.WatermarkConfiguration `yaml:"watermark"`

	// MaxBufferReuseSize specifies the maximum buffer which can
	// be reused and pooled, if a buffer greater than this
	// is used then it is discarded. Zero specifies no limit.
	MaxBufferReuseSize int `yaml:"maxBufferReuseSize"`
}

MessagePoolConfiguration is the message pool configuration options, which extends the default object pool configuration.

func (MessagePoolConfiguration) NewOptions added in v0.5.0

NewOptions creates message pool options.

type MessagePoolOptions added in v0.5.0

type MessagePoolOptions struct {
	PoolOptions pool.ObjectPoolOptions

	// MaxBufferReuseSize specifies the maximum buffer which can
	// be reused and pooled, if a buffer greater than this
	// is used then it is discarded. Zero specifies no limit.
	MaxBufferReuseSize int
}

MessagePoolOptions are options to use when creating the message pool.

type MessageProcessor added in v0.5.0

type MessageProcessor interface {
	Process(m Message)
	Close()
}

MessageProcessor processes the message. When a MessageProcessor was set in the server, it will be called to process every message received.

func NewNoOpMessageProcessor added in v1.4.0

func NewNoOpMessageProcessor() MessageProcessor

NewNoOpMessageProcessor creates a new MessageProcessor that does nothing.

type MessageProcessorFactory added in v1.4.1

type MessageProcessorFactory interface {
	// Create returns a MessageProcessor.
	Create() MessageProcessor
	// Close the factory.
	Close()
}

MessageProcessorFactory creates MessageProcessors.

func NewMessageProcessorFactory added in v1.4.1

func NewMessageProcessorFactory(fn func() MessageProcessor) MessageProcessorFactory

NewMessageProcessorFactory returns a MessageProcessorFactory that creates a new MessageProcessor for every call to Create.

func SingletonMessageProcessor added in v1.4.0

func SingletonMessageProcessor(mp MessageProcessor) MessageProcessorFactory

SingletonMessageProcessor returns a MessageProcessorFactory that shares the same MessageProcessor for all users. The MessageProcessor is closed when the factory is closed.

type MockMessage

type MockMessage struct {
	// contains filtered or unexported fields
}

MockMessage is a mock of Message interface.

func NewMockMessage

func NewMockMessage(ctrl *gomock.Controller) *MockMessage

NewMockMessage creates a new mock instance.

func (*MockMessage) Ack

func (m *MockMessage) Ack()

Ack mocks base method.

func (*MockMessage) Bytes

func (m *MockMessage) Bytes() []byte

Bytes mocks base method.

func (*MockMessage) EXPECT

func (m *MockMessage) EXPECT() *MockMessageMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockMessage) SentAtNanos added in v1.4.0

func (m *MockMessage) SentAtNanos() uint64

SentAtNanos mocks base method.

func (*MockMessage) ShardID added in v1.2.0

func (m *MockMessage) ShardID() uint64

ShardID mocks base method.

type MockMessageMockRecorder

type MockMessageMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageMockRecorder is the mock recorder for MockMessage.

func (*MockMessageMockRecorder) Ack

func (mr *MockMessageMockRecorder) Ack() *gomock.Call

Ack indicates an expected call of Ack.

func (*MockMessageMockRecorder) Bytes

func (mr *MockMessageMockRecorder) Bytes() *gomock.Call

Bytes indicates an expected call of Bytes.

func (*MockMessageMockRecorder) SentAtNanos added in v1.4.0

func (mr *MockMessageMockRecorder) SentAtNanos() *gomock.Call

SentAtNanos indicates an expected call of SentAtNanos.

func (*MockMessageMockRecorder) ShardID added in v1.2.0

func (mr *MockMessageMockRecorder) ShardID() *gomock.Call

ShardID indicates an expected call of ShardID.

type MockMessageProcessor added in v0.5.0

type MockMessageProcessor struct {
	// contains filtered or unexported fields
}

MockMessageProcessor is a mock of MessageProcessor interface.

func NewMockMessageProcessor added in v0.5.0

func NewMockMessageProcessor(ctrl *gomock.Controller) *MockMessageProcessor

NewMockMessageProcessor creates a new mock instance.

func (*MockMessageProcessor) Close added in v0.5.0

func (m *MockMessageProcessor) Close()

Close mocks base method.

func (*MockMessageProcessor) EXPECT added in v0.5.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockMessageProcessor) Process added in v0.5.0

func (m *MockMessageProcessor) Process(arg0 Message)

Process mocks base method.

type MockMessageProcessorMockRecorder added in v0.5.0

type MockMessageProcessorMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageProcessorMockRecorder is the mock recorder for MockMessageProcessor.

func (*MockMessageProcessorMockRecorder) Close added in v0.5.0

Close indicates an expected call of Close.

func (*MockMessageProcessorMockRecorder) Process added in v0.5.0

func (mr *MockMessageProcessorMockRecorder) Process(arg0 interface{}) *gomock.Call

Process indicates an expected call of Process.

type Options

type Options interface {
	// EncoderOptions returns the options for Encoder.
	EncoderOptions() proto.Options

	// SetEncoderOptions sets the options for Encoder.
	SetEncoderOptions(value proto.Options) Options

	// DecoderOptions returns the options for Decoder.
	DecoderOptions() proto.Options

	// SetDecoderOptions sets the options for Decoder.
	SetDecoderOptions(value proto.Options) Options

	// MessagePoolOptions returns the options for message pool.
	MessagePoolOptions() MessagePoolOptions

	// SetMessagePoolOptions sets the options for message pool.
	SetMessagePoolOptions(value MessagePoolOptions) Options

	// AckFlushInterval returns the ack flush interval.
	AckFlushInterval() time.Duration

	// SetAckFlushInterval sets the ack flush interval.
	SetAckFlushInterval(value time.Duration) Options

	// AckBufferSize returns the ack buffer size.
	AckBufferSize() int

	// SetAckBufferSize sets the ack buffer size.
	SetAckBufferSize(value int) Options

	// ConnectionWriteBufferSize returns the size of buffer before a write or a read.
	ConnectionWriteBufferSize() int

	// SetConnectionWriteBufferSize sets the buffer size.
	SetConnectionWriteBufferSize(value int) Options

	// ConnectionReadBufferSize returns the size of buffer before a write or a read.
	ConnectionReadBufferSize() int

	// SetConnectionWriteBufferSize sets the buffer size.
	SetConnectionReadBufferSize(value int) Options

	// ConnectionWriteTimeout returns the timeout for writing to the connection.
	ConnectionWriteTimeout() time.Duration

	// SetConnectionWriteTimeout sets the write timeout for the connection.
	SetConnectionWriteTimeout(value time.Duration) Options

	// InstrumentOptions returns the instrument options.
	InstrumentOptions() instrument.Options

	// SetInstrumentOptions sets the instrument options.
	SetInstrumentOptions(value instrument.Options) Options
}

Options configs the consumer listener.

func NewOptions

func NewOptions() Options

NewOptions creates a new options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL