Documentation ¶
Index ¶
- Variables
- type AckMessage
- func (a *AckMessage) BuildAckID() string
- func (a *AckMessage) HasHitDeadline() bool
- func (a *AckMessage) MatchesOriginatingMessageServer() bool
- func (a *AckMessage) String() string
- func (a *AckMessage) ToTopicPartition() TopicPartition
- func (a *AckMessage) WithContext(ctx context.Context) *AckMessage
- type AcknowledgeRequest
- type BasicImplementation
- func (s *BasicImplementation) Acknowledge(ctx context.Context, req *AckMessage, errChan chan error)
- func (s *BasicImplementation) CanConsumeMore() bool
- func (s *BasicImplementation) EvictUnackedMessagesPastDeadline(ctx context.Context, errChan chan error)
- func (s *BasicImplementation) GetConsumedMessagesStats() map[string]interface{}
- func (s *BasicImplementation) GetConsumerLag() map[string]uint64
- func (s *BasicImplementation) GetSubscriberID() string
- func (s *BasicImplementation) GetSubscription() *subscription.Model
- func (s *BasicImplementation) ModAckDeadline(ctx context.Context, req *ModAckMessage, errChan chan error)
- func (s *BasicImplementation) Pull(ctx context.Context, req *PullRequest, responseChan chan *metrov1.PullResponse, ...)
- type ConsumptionMetadata
- type Core
- type IAckMessage
- type IConsumer
- type ICore
- type ISubscriber
- type Implementation
- type ModAckMessage
- type ModifyAckDeadlineRequest
- type OrderedConsumptionMetadata
- type OrderedImplementation
- func (s *OrderedImplementation) Acknowledge(ctx context.Context, req *AckMessage, errChan chan error)
- func (s *OrderedImplementation) CanConsumeMore() bool
- func (s *OrderedImplementation) EvictUnackedMessagesPastDeadline(ctx context.Context, errChan chan error)
- func (s *OrderedImplementation) GetConsumedMessagesStats() map[string]interface{}
- func (s *OrderedImplementation) GetConsumerLag() map[string]uint64
- func (s *OrderedImplementation) GetSubscriberID() string
- func (s *OrderedImplementation) GetSubscription() *subscription.Model
- func (s *OrderedImplementation) ModAckDeadline(ctx context.Context, req *ModAckMessage, errChan chan error)
- func (s *OrderedImplementation) Pull(ctx context.Context, req *PullRequest, responseChan chan *metrov1.PullResponse, ...)
- type OrderingSequenceManager
- type PullRequest
- type Subscriber
- func (s *Subscriber) GetAckChannel() chan *AckMessage
- func (s *Subscriber) GetErrorChannel() chan error
- func (s *Subscriber) GetID() string
- func (s *Subscriber) GetModAckChannel() chan *ModAckMessage
- func (s *Subscriber) GetRequestChannel() chan *PullRequest
- func (s *Subscriber) GetResponseChannel() chan *metrov1.PullResponse
- func (s *Subscriber) GetSubscriptionName() string
- func (s *Subscriber) Run(ctx context.Context)
- func (s *Subscriber) Stop()
- type TopicPartition
Constants ¶
This section is empty.
Variables ¶
var ( // ErrIllegalPartitionValue is thrown when partition value of AckID is not in expected state ErrIllegalPartitionValue = errors.New("partition value cannot be less than 0") // ErrIllegalOffsetValue is thrown when offset value of AckID is not in expected state ErrIllegalOffsetValue = errors.New("offset value cannot be less than 0") // ErrIllegalDeadlineValue is thrown when deadline value of AckID is not in expected state ErrIllegalDeadlineValue = errors.New("deadline value cannot be less than 0") // ErrInvalidAckID is thrown when ackID is not in the expected format ErrInvalidAckID = merror.Newf(merror.InvalidArgument, "AckID received is not in expected format") )
Functions ¶
This section is empty.
Types ¶
type AckMessage ¶
type AckMessage struct { ServerAddress string SubscriberID string Topic string Partition int32 Offset int32 MessageID string Deadline int32 AckID string // contains filtered or unexported fields }
AckMessage ...
func (*AckMessage) MatchesOriginatingMessageServer ¶
func (a *AckMessage) MatchesOriginatingMessageServer() bool
MatchesOriginatingMessageServer ...
func (*AckMessage) String ¶
func (a *AckMessage) String() string
func (*AckMessage) ToTopicPartition ¶
func (a *AckMessage) ToTopicPartition() TopicPartition
ToTopicPartition ...
func (*AckMessage) WithContext ¶
func (a *AckMessage) WithContext(ctx context.Context) *AckMessage
WithContext can be used to set the current context to the request
type AcknowledgeRequest ¶
type AcknowledgeRequest struct {
AckIDs []string
}
AcknowledgeRequest ...
func (*AcknowledgeRequest) IsEmpty ¶
func (ar *AcknowledgeRequest) IsEmpty() bool
IsEmpty returns true if its an empty request
type BasicImplementation ¶
type BasicImplementation struct {
// contains filtered or unexported fields
}
BasicImplementation provides implementation of subscriber functionalities
func (*BasicImplementation) Acknowledge ¶
func (s *BasicImplementation) Acknowledge(ctx context.Context, req *AckMessage, errChan chan error)
Acknowledge acknowledges a message pulled for delivery
func (*BasicImplementation) CanConsumeMore ¶
func (s *BasicImplementation) CanConsumeMore() bool
CanConsumeMore looks at sum of all consumed messages in all the active topic partitions and checks threshold
func (*BasicImplementation) EvictUnackedMessagesPastDeadline ¶
func (s *BasicImplementation) EvictUnackedMessagesPastDeadline(ctx context.Context, errChan chan error)
EvictUnackedMessagesPastDeadline evicts messages past ack deadline
func (*BasicImplementation) GetConsumedMessagesStats ¶
func (s *BasicImplementation) GetConsumedMessagesStats() map[string]interface{}
GetConsumedMessagesStats ...
func (*BasicImplementation) GetConsumerLag ¶
func (s *BasicImplementation) GetConsumerLag() map[string]uint64
GetConsumerLag returns perceived lag for the gievn Subscriber
func (*BasicImplementation) GetSubscriberID ¶
func (s *BasicImplementation) GetSubscriberID() string
GetSubscriberID ...
func (*BasicImplementation) GetSubscription ¶
func (s *BasicImplementation) GetSubscription() *subscription.Model
GetSubscription ...
func (*BasicImplementation) ModAckDeadline ¶
func (s *BasicImplementation) ModAckDeadline(ctx context.Context, req *ModAckMessage, errChan chan error)
ModAckDeadline modifies ack deadline
func (*BasicImplementation) Pull ¶
func (s *BasicImplementation) Pull(ctx context.Context, req *PullRequest, responseChan chan *metrov1.PullResponse, errChan chan error)
Pull pulls message from the broker and publishes it into the response channel
type ConsumptionMetadata ¶
type ConsumptionMetadata struct {
// contains filtered or unexported fields
}
ConsumptionMetadata ...
func NewConsumptionMetadata ¶
func NewConsumptionMetadata() *ConsumptionMetadata
NewConsumptionMetadata ...
func (*ConsumptionMetadata) Store ¶
func (cm *ConsumptionMetadata) Store(msg messagebroker.ReceivedMessage, deadline int64)
Store updates all the internal data structures with the consumed message metadata
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
Core implements ICore
func (*Core) NewSubscriber ¶
func (c *Core) NewSubscriber(ctx context.Context, subscriberID string, subscription *subscription.Model, timeoutInMs int, maxOutstandingMessages int64, maxOutstandingBytes int64, requestCh chan *PullRequest, ackCh chan *AckMessage, modAckCh chan *ModAckMessage) (ISubscriber, error)
NewSubscriber initiates a new subscriber for a given topic
type IAckMessage ¶
type IAckMessage interface { BuildAckID() string // return true if ack_id has originated from this server MatchesOriginatingMessageServer() bool }
IAckMessage ...
func NewAckMessage ¶
func NewAckMessage(subscriberID, topic string, partition, offset, deadline int32, messageID string) (IAckMessage, error)
NewAckMessage ...
type IConsumer ¶
type IConsumer interface { PauseConsumer(ctx context.Context) error IsPaused(ctx context.Context) bool ResumeConsumer(ctx context.Context) error PausePrimaryConsumer(ctx context.Context) error IsPrimaryPaused(ctx context.Context) bool ResumePrimaryConsumer(ctx context.Context) error ReceiveMessages(ctx context.Context, maxMessages int32) (*messagebroker.GetMessagesFromTopicResponse, error) GetTopicMetadata(ctx context.Context, req messagebroker.GetTopicMetadataRequest) (messagebroker.GetTopicMetadataResponse, error) CommitByPartitionAndOffset(ctx context.Context, req messagebroker.CommitOnTopicRequest) (messagebroker.CommitOnTopicResponse, error) GetConsumerLag(ctx context.Context) (map[string]uint64, error) Close(ctx context.Context) error }
IConsumer a wrapper over consumer
func NewConsumerManager ¶
func NewConsumerManager(ctx context.Context, bs brokerstore.IBrokerStore, brokerTimeout int, subscriberID string, subscriptionName, topicName, retryTopicName string) (IConsumer, error)
NewConsumerManager ...
type ICore ¶
type ICore interface { NewSubscriber(ctx context.Context, id string, subscription *subscription.Model, timeoutInMs int, maxOutstandingMessages int64, maxOutstandingBytes int64, requestCh chan *PullRequest, ackCh chan *AckMessage, modAckCh chan *ModAckMessage) (ISubscriber, error) }
ICore is interface over subscribers core
func NewCore ¶
func NewCore(bs brokerstore.IBrokerStore, subscriptionCore subscription.ICore, offsetCore offset.ICore, ch cache.ICache, topicCore topic.ICore) ICore
NewCore returns a new subscriber core
type ISubscriber ¶
type ISubscriber interface { GetID() string GetSubscriptionName() string GetResponseChannel() chan *metrov1.PullResponse GetRequestChannel() chan *PullRequest GetAckChannel() chan *AckMessage GetModAckChannel() chan *ModAckMessage GetErrorChannel() chan error Stop() Run(ctx context.Context) }
ISubscriber is interface over high level subscriber
type Implementation ¶
type Implementation interface { GetSubscription() *subscription.Model GetSubscriberID() string GetConsumedMessagesStats() map[string]interface{} GetConsumerLag() map[string]uint64 Pull(ctx context.Context, req *PullRequest, responseChan chan *metrov1.PullResponse, errChan chan error) Acknowledge(ctx context.Context, req *AckMessage, errChan chan error) ModAckDeadline(ctx context.Context, req *ModAckMessage, errChan chan error) EvictUnackedMessagesPastDeadline(ctx context.Context, errChan chan error) CanConsumeMore() bool }
Implementation is an interface abstracting different types of subscribers
type ModAckMessage ¶
type ModAckMessage struct { AckMessage *AckMessage // contains filtered or unexported fields }
ModAckMessage ...
func NewModAckMessage ¶
func NewModAckMessage(ackMessage *AckMessage, ackDeadline int32) *ModAckMessage
NewModAckMessage ...
func (*ModAckMessage) WithContext ¶
func (a *ModAckMessage) WithContext(ctx context.Context) *ModAckMessage
WithContext can be used to set the current context to the request
type ModifyAckDeadlineRequest ¶
type ModifyAckDeadlineRequest struct { // The initial ACK deadline given to messages is 10s // https://godoc.org/cloud.google.com/go/pubsub#hdr-Deadlines ModifyDeadlineSeconds []int32 ModifyDeadlineAckIDs []string }
ModifyAckDeadlineRequest ...
func (*ModifyAckDeadlineRequest) IsEmpty ¶
func (mr *ModifyAckDeadlineRequest) IsEmpty() bool
IsEmpty returns true if its an empty request
type OrderedConsumptionMetadata ¶
type OrderedConsumptionMetadata struct { ConsumptionMetadata // contains filtered or unexported fields }
OrderedConsumptionMetadata holds consumption metadata for ordered consumer
func NewOrderedConsumptionMetadata ¶
func NewOrderedConsumptionMetadata() *OrderedConsumptionMetadata
NewOrderedConsumptionMetadata ...
func (*OrderedConsumptionMetadata) Store ¶
func (cm *OrderedConsumptionMetadata) Store(msg messagebroker.ReceivedMessage, deadline int64)
Store updates all the internal data structures with the consumed message metadata
type OrderedImplementation ¶
type OrderedImplementation struct {
// contains filtered or unexported fields
}
OrderedImplementation implements a subscriber that delivers messages in order
func (*OrderedImplementation) Acknowledge ¶
func (s *OrderedImplementation) Acknowledge(ctx context.Context, req *AckMessage, errChan chan error)
Acknowledge acknowleges a pulled message
func (*OrderedImplementation) CanConsumeMore ¶
func (s *OrderedImplementation) CanConsumeMore() bool
CanConsumeMore looks at sum of all consumed messages in all the active topic partitions and checks threshold
func (*OrderedImplementation) EvictUnackedMessagesPastDeadline ¶
func (s *OrderedImplementation) EvictUnackedMessagesPastDeadline(ctx context.Context, errChan chan error)
EvictUnackedMessagesPastDeadline evicts messages past acknowledgement deadline
func (*OrderedImplementation) GetConsumedMessagesStats ¶
func (s *OrderedImplementation) GetConsumedMessagesStats() map[string]interface{}
GetConsumedMessagesStats ...
func (*OrderedImplementation) GetConsumerLag ¶
func (s *OrderedImplementation) GetConsumerLag() map[string]uint64
GetConsumerLag returns perceived lag for the given Subscriber
func (*OrderedImplementation) GetSubscriberID ¶
func (s *OrderedImplementation) GetSubscriberID() string
GetSubscriberID ...
func (*OrderedImplementation) GetSubscription ¶
func (s *OrderedImplementation) GetSubscription() *subscription.Model
GetSubscription ...
func (*OrderedImplementation) ModAckDeadline ¶
func (s *OrderedImplementation) ModAckDeadline(ctx context.Context, req *ModAckMessage, errChan chan error)
ModAckDeadline modifies the acknowledgement deadline of message
func (*OrderedImplementation) Pull ¶
func (s *OrderedImplementation) Pull(ctx context.Context, req *PullRequest, responseChan chan *metrov1.PullResponse, errChan chan error)
Pull pulls a message from broker and publishes onto the response channel
type OrderingSequenceManager ¶
type OrderingSequenceManager interface { GetOrderedSequenceNum(ctx context.Context, sub *subscription.Model, message messagebroker.ReceivedMessage) (*sequencePair, error) SetOrderedSequenceNum(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string, sequenceNum int32) error GetLastSequenceStatus(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string) (*lastSequenceStatus, error) SetLastSequenceStatus(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string, status *lastSequenceStatus) error GetLastMessageSequenceNum(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string) (int32, error) DeleteSequence(ctx context.Context, sub *subscription.Model, partition int32, orderingKey string) error }
OrderingSequenceManager is an interface for managing ordering sequences
func NewOffsetSequenceManager ¶
func NewOffsetSequenceManager(ctx context.Context, offsetCore offset.ICore) OrderingSequenceManager
NewOffsetSequenceManager returns an offset based sequence manager
type PullRequest ¶
type PullRequest struct { MaxNumOfMessages int32 // contains filtered or unexported fields }
PullRequest ...
func (*PullRequest) WithContext ¶
func (req *PullRequest) WithContext(ctx context.Context) *PullRequest
WithContext can be used to set the current context to the request
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber consumes messages from a topic
func (*Subscriber) GetAckChannel ¶
func (s *Subscriber) GetAckChannel() chan *AckMessage
GetAckChannel returns the chan from where ack is received
func (*Subscriber) GetErrorChannel ¶
func (s *Subscriber) GetErrorChannel() chan error
GetErrorChannel returns the channel where error is written
func (*Subscriber) GetModAckChannel ¶
func (s *Subscriber) GetModAckChannel() chan *ModAckMessage
GetModAckChannel returns the chan where mod ack is written
func (*Subscriber) GetRequestChannel ¶
func (s *Subscriber) GetRequestChannel() chan *PullRequest
GetRequestChannel returns the chan from where request is received
func (*Subscriber) GetResponseChannel ¶
func (s *Subscriber) GetResponseChannel() chan *metrov1.PullResponse
GetResponseChannel returns the chan where response is written
func (*Subscriber) GetSubscriptionName ¶
func (s *Subscriber) GetSubscriptionName() string
GetSubscriptionName ...
type TopicPartition ¶
type TopicPartition struct {
// contains filtered or unexported fields
}
TopicPartition ...
func NewTopicPartition ¶
func NewTopicPartition(topic string, partition int32) TopicPartition
NewTopicPartition ...
func (TopicPartition) String ¶
func (tp TopicPartition) String() string