subscriber

package
v0.0.0-...-e2755d2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2022 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrIllegalPartitionValue is thrown when partition value of AckID is not in expected state
	ErrIllegalPartitionValue = errors.New("partition value cannot be less than 0")

	// ErrIllegalOffsetValue is thrown when offset value of AckID is not in expected state
	ErrIllegalOffsetValue = errors.New("offset value cannot be less than 0")

	// ErrIllegalDeadlineValue is thrown when deadline value of AckID is not in expected state
	ErrIllegalDeadlineValue = errors.New("deadline value cannot be less than 0")

	// ErrInvalidAckID is thrown when ackID is not in the expected format
	ErrInvalidAckID = merror.Newf(merror.InvalidArgument, "AckID received is not in expected format")
)

Functions

This section is empty.

Types

type AckMessage

type AckMessage struct {
	ServerAddress string
	SubscriberID  string
	Topic         string
	Partition     int32
	Offset        int32
	MessageID     string
	Deadline      int32
	AckID         string
	// contains filtered or unexported fields
}

AckMessage ...

func ParseAckID

func ParseAckID(ackID string) (*AckMessage, error)

ParseAckID ...

func (*AckMessage) BuildAckID

func (a *AckMessage) BuildAckID() string

BuildAckID ...

func (*AckMessage) HasHitDeadline

func (a *AckMessage) HasHitDeadline() bool

HasHitDeadline ...

func (*AckMessage) MatchesOriginatingMessageServer

func (a *AckMessage) MatchesOriginatingMessageServer() bool

MatchesOriginatingMessageServer ...

func (*AckMessage) String

func (a *AckMessage) String() string

func (*AckMessage) ToTopicPartition

func (a *AckMessage) ToTopicPartition() TopicPartition

ToTopicPartition ...

func (*AckMessage) WithContext

func (a *AckMessage) WithContext(ctx context.Context) *AckMessage

WithContext can be used to set the current context to the request

type AcknowledgeRequest

type AcknowledgeRequest struct {
	AckIDs []string
}

AcknowledgeRequest ...

func (*AcknowledgeRequest) IsEmpty

func (ar *AcknowledgeRequest) IsEmpty() bool

IsEmpty returns true if its an empty request

type BasicImplementation

type BasicImplementation struct {
	// contains filtered or unexported fields
}

BasicImplementation provides implementation of subscriber functionalities

func (*BasicImplementation) Acknowledge

func (s *BasicImplementation) Acknowledge(ctx context.Context, req *AckMessage, errChan chan error)

Acknowledge acknowledges a message pulled for delivery

func (*BasicImplementation) CanConsumeMore

func (s *BasicImplementation) CanConsumeMore() bool

CanConsumeMore looks at sum of all consumed messages in all the active topic partitions and checks threshold

func (*BasicImplementation) EvictUnackedMessagesPastDeadline

func (s *BasicImplementation) EvictUnackedMessagesPastDeadline(ctx context.Context, errChan chan error)

EvictUnackedMessagesPastDeadline evicts messages past ack deadline

func (*BasicImplementation) GetConsumedMessagesStats

func (s *BasicImplementation) GetConsumedMessagesStats() map[string]interface{}

GetConsumedMessagesStats ...

func (*BasicImplementation) GetConsumerLag

func (s *BasicImplementation) GetConsumerLag() map[string]uint64

GetConsumerLag returns perceived lag for the gievn Subscriber

func (*BasicImplementation) GetSubscriberID

func (s *BasicImplementation) GetSubscriberID() string

GetSubscriberID ...

func (*BasicImplementation) GetSubscription

func (s *BasicImplementation) GetSubscription() *subscription.Model

GetSubscription ...

func (*BasicImplementation) ModAckDeadline

func (s *BasicImplementation) ModAckDeadline(ctx context.Context, req *ModAckMessage, errChan chan error)

ModAckDeadline modifies ack deadline

func (*BasicImplementation) Pull

func (s *BasicImplementation) Pull(ctx context.Context, req *PullRequest, responseChan chan *metrov1.PullResponse, errChan chan error)

Pull pulls message from the broker and publishes it into the response channel

type ConsumptionMetadata

type ConsumptionMetadata struct {
	// contains filtered or unexported fields
}

ConsumptionMetadata ...

func NewConsumptionMetadata

func NewConsumptionMetadata() *ConsumptionMetadata

NewConsumptionMetadata ...

func (*ConsumptionMetadata) Store

func (cm *ConsumptionMetadata) Store(msg messagebroker.ReceivedMessage, deadline int64)

Store updates all the internal data structures with the consumed message metadata

type Core

type Core struct {
	// contains filtered or unexported fields
}

Core implements ICore

func (*Core) NewSubscriber

func (c *Core) NewSubscriber(ctx context.Context,
	subscriberID string,
	subscription *subscription.Model,
	timeoutInMs int,
	maxOutstandingMessages int64,
	maxOutstandingBytes int64,
	requestCh chan *PullRequest,
	ackCh chan *AckMessage,
	modAckCh chan *ModAckMessage) (ISubscriber, error)

NewSubscriber initiates a new subscriber for a given topic

type IAckMessage

type IAckMessage interface {
	BuildAckID() string

	// return true if ack_id has originated from this server
	MatchesOriginatingMessageServer() bool
}

IAckMessage ...

func NewAckMessage

func NewAckMessage(subscriberID, topic string, partition, offset, deadline int32, messageID string) (IAckMessage, error)

NewAckMessage ...

type IConsumer

type IConsumer interface {
	PauseConsumer(ctx context.Context) error
	IsPaused(ctx context.Context) bool
	ResumeConsumer(ctx context.Context) error

	PausePrimaryConsumer(ctx context.Context) error
	IsPrimaryPaused(ctx context.Context) bool
	ResumePrimaryConsumer(ctx context.Context) error

	ReceiveMessages(ctx context.Context, maxMessages int32) (*messagebroker.GetMessagesFromTopicResponse, error)
	GetTopicMetadata(ctx context.Context, req messagebroker.GetTopicMetadataRequest) (messagebroker.GetTopicMetadataResponse, error)
	CommitByPartitionAndOffset(ctx context.Context, req messagebroker.CommitOnTopicRequest) (messagebroker.CommitOnTopicResponse, error)
	GetConsumerLag(ctx context.Context) (map[string]uint64, error)

	Close(ctx context.Context) error
}

IConsumer a wrapper over consumer

func NewConsumerManager

func NewConsumerManager(ctx context.Context, bs brokerstore.IBrokerStore, brokerTimeout int, subscriberID string, subscriptionName,
	topicName, retryTopicName string) (IConsumer, error)

NewConsumerManager ...

type ICore

type ICore interface {
	NewSubscriber(ctx context.Context, id string, subscription *subscription.Model, timeoutInMs int, maxOutstandingMessages int64, maxOutstandingBytes int64,
		requestCh chan *PullRequest, ackCh chan *AckMessage, modAckCh chan *ModAckMessage) (ISubscriber, error)
}

ICore is interface over subscribers core

func NewCore

func NewCore(bs brokerstore.IBrokerStore, subscriptionCore subscription.ICore, offsetCore offset.ICore, ch cache.ICache, topicCore topic.ICore) ICore

NewCore returns a new subscriber core

type ISubscriber

type ISubscriber interface {
	GetID() string
	GetSubscriptionName() string
	GetResponseChannel() chan *metrov1.PullResponse
	GetRequestChannel() chan *PullRequest
	GetAckChannel() chan *AckMessage
	GetModAckChannel() chan *ModAckMessage
	GetErrorChannel() chan error
	Stop()
	Run(ctx context.Context)
}

ISubscriber is interface over high level subscriber

type Implementation

type Implementation interface {
	GetSubscription() *subscription.Model
	GetSubscriberID() string
	GetConsumedMessagesStats() map[string]interface{}
	GetConsumerLag() map[string]uint64

	Pull(ctx context.Context, req *PullRequest, responseChan chan *metrov1.PullResponse, errChan chan error)
	Acknowledge(ctx context.Context, req *AckMessage, errChan chan error)
	ModAckDeadline(ctx context.Context, req *ModAckMessage, errChan chan error)
	EvictUnackedMessagesPastDeadline(ctx context.Context, errChan chan error)

	CanConsumeMore() bool
}

Implementation is an interface abstracting different types of subscribers

type ModAckMessage

type ModAckMessage struct {
	AckMessage *AckMessage
	// contains filtered or unexported fields
}

ModAckMessage ...

func NewModAckMessage

func NewModAckMessage(ackMessage *AckMessage, ackDeadline int32) *ModAckMessage

NewModAckMessage ...

func (*ModAckMessage) String

func (a *ModAckMessage) String() string

String ...

func (*ModAckMessage) WithContext

func (a *ModAckMessage) WithContext(ctx context.Context) *ModAckMessage

WithContext can be used to set the current context to the request

type ModifyAckDeadlineRequest

type ModifyAckDeadlineRequest struct {
	// The initial ACK deadline given to messages is 10s
	// https://godoc.org/cloud.google.com/go/pubsub#hdr-Deadlines
	ModifyDeadlineSeconds []int32
	ModifyDeadlineAckIDs  []string
}

ModifyAckDeadlineRequest ...

func (*ModifyAckDeadlineRequest) IsEmpty

func (mr *ModifyAckDeadlineRequest) IsEmpty() bool

IsEmpty returns true if its an empty request

type OrderedConsumptionMetadata

type OrderedConsumptionMetadata struct {
	ConsumptionMetadata
	// contains filtered or unexported fields
}

OrderedConsumptionMetadata holds consumption metadata for ordered consumer

func NewOrderedConsumptionMetadata

func NewOrderedConsumptionMetadata() *OrderedConsumptionMetadata

NewOrderedConsumptionMetadata ...

func (*OrderedConsumptionMetadata) Store

Store updates all the internal data structures with the consumed message metadata

type OrderedImplementation

type OrderedImplementation struct {
	// contains filtered or unexported fields
}

OrderedImplementation implements a subscriber that delivers messages in order

func (*OrderedImplementation) Acknowledge

func (s *OrderedImplementation) Acknowledge(ctx context.Context, req *AckMessage, errChan chan error)

Acknowledge acknowleges a pulled message

func (*OrderedImplementation) CanConsumeMore

func (s *OrderedImplementation) CanConsumeMore() bool

CanConsumeMore looks at sum of all consumed messages in all the active topic partitions and checks threshold

func (*OrderedImplementation) EvictUnackedMessagesPastDeadline

func (s *OrderedImplementation) EvictUnackedMessagesPastDeadline(ctx context.Context, errChan chan error)

EvictUnackedMessagesPastDeadline evicts messages past acknowledgement deadline

func (*OrderedImplementation) GetConsumedMessagesStats

func (s *OrderedImplementation) GetConsumedMessagesStats() map[string]interface{}

GetConsumedMessagesStats ...

func (*OrderedImplementation) GetConsumerLag

func (s *OrderedImplementation) GetConsumerLag() map[string]uint64

GetConsumerLag returns perceived lag for the given Subscriber

func (*OrderedImplementation) GetSubscriberID

func (s *OrderedImplementation) GetSubscriberID() string

GetSubscriberID ...

func (*OrderedImplementation) GetSubscription

func (s *OrderedImplementation) GetSubscription() *subscription.Model

GetSubscription ...

func (*OrderedImplementation) ModAckDeadline

func (s *OrderedImplementation) ModAckDeadline(ctx context.Context, req *ModAckMessage, errChan chan error)

ModAckDeadline modifies the acknowledgement deadline of message

func (*OrderedImplementation) Pull

func (s *OrderedImplementation) Pull(ctx context.Context, req *PullRequest, responseChan chan *metrov1.PullResponse, errChan chan error)

Pull pulls a message from broker and publishes onto the response channel

type OrderingSequenceManager

type OrderingSequenceManager interface {
	GetOrderedSequenceNum(ctx context.Context, sub *subscription.Model, message messagebroker.ReceivedMessage) (*sequencePair, error)
	SetOrderedSequenceNum(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string, sequenceNum int32) error

	GetLastSequenceStatus(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string) (*lastSequenceStatus, error)
	SetLastSequenceStatus(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string, status *lastSequenceStatus) error

	GetLastMessageSequenceNum(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string) (int32, error)

	DeleteSequence(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string) error
}

OrderingSequenceManager is an interface for managing ordering sequences

func NewOffsetSequenceManager

func NewOffsetSequenceManager(ctx context.Context, offsetCore offset.ICore) OrderingSequenceManager

NewOffsetSequenceManager returns an offset based sequence manager

type PullRequest

type PullRequest struct {
	MaxNumOfMessages int32
	// contains filtered or unexported fields
}

PullRequest ...

func (*PullRequest) WithContext

func (req *PullRequest) WithContext(ctx context.Context) *PullRequest

WithContext can be used to set the current context to the request

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber consumes messages from a topic

func (*Subscriber) GetAckChannel

func (s *Subscriber) GetAckChannel() chan *AckMessage

GetAckChannel returns the chan from where ack is received

func (*Subscriber) GetErrorChannel

func (s *Subscriber) GetErrorChannel() chan error

GetErrorChannel returns the channel where error is written

func (*Subscriber) GetID

func (s *Subscriber) GetID() string

GetID ...

func (*Subscriber) GetModAckChannel

func (s *Subscriber) GetModAckChannel() chan *ModAckMessage

GetModAckChannel returns the chan where mod ack is written

func (*Subscriber) GetRequestChannel

func (s *Subscriber) GetRequestChannel() chan *PullRequest

GetRequestChannel returns the chan from where request is received

func (*Subscriber) GetResponseChannel

func (s *Subscriber) GetResponseChannel() chan *metrov1.PullResponse

GetResponseChannel returns the chan where response is written

func (*Subscriber) GetSubscriptionName

func (s *Subscriber) GetSubscriptionName() string

GetSubscriptionName ...

func (*Subscriber) Run

func (s *Subscriber) Run(ctx context.Context)

Run loop

func (*Subscriber) Stop

func (s *Subscriber) Stop()

Stop the subscriber

type TopicPartition

type TopicPartition struct {
	// contains filtered or unexported fields
}

TopicPartition ...

func NewTopicPartition

func NewTopicPartition(topic string, partition int32) TopicPartition

NewTopicPartition ...

func (TopicPartition) String

func (tp TopicPartition) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL