pulsar

package
v0.1.0-util Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2020 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrConsumerClosed = errors.New("consumer closed")

Functions

This section is empty.

Types

type Authentication

type Authentication interface{}

Opaque interface that represents the authentication credentials

func NewAuthentication

func NewAuthentication(name string, params string) (Authentication, error)

func NewAuthenticationAthenz

func NewAuthenticationAthenz(authParams map[string]string) Authentication

func NewAuthenticationTLS

func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication

Create new Authentication provider with specified TLS certificate and private key

func NewAuthenticationToken

func NewAuthenticationToken(token string) Authentication

Create new Authentication provider with specified auth token

func NewAuthenticationTokenFromFile

func NewAuthenticationTokenFromFile(tokenFilePath string) Authentication

Create new Authentication provider with specified auth token from a file

func NewAuthenticationTokenFromSupplier

func NewAuthenticationTokenFromSupplier(tokenSupplier func() (string, error)) Authentication

NewAuthenticationTokenFromSupplier returns a token auth provider that gets the token data from a user supplied function. The function is invoked each time the client library needs to use a token in talking with Pulsar brokers

type Client

type Client interface {
	// Create the producer instance
	// This method will block until the producer is created successfully
	CreateProducer(ProducerOptions) (Producer, error)

	// Create a `Consumer` by subscribing to a topic.
	//
	// If the subscription does not exist, a new subscription will be created and all messages published after the
	// creation will be retained until acknowledged, even if the consumer is not connected
	Subscribe(ConsumerOptions) (Consumer, error)

	// Create a Reader instance.
	// This method will block until the reader is created successfully.
	CreateReader(ReaderOptions) (Reader, error)

	// Fetch the list of partitions for a given topic
	//
	// If the topic is partitioned, this will return a list of partition names.
	// If the topic is not partitioned, the returned list will contain the topic
	// name itself.
	//
	// This can be used to discover the partitions and create {@link Reader},
	// {@link Consumer} or {@link Producer} instances directly on a particular partition.
	TopicPartitions(topic string) ([]string, error)

	// Close the Client and free associated resources
	Close()
}

func NewClient

func NewClient(options ClientOptions) (Client, error)

type ClientOptions

type ClientOptions struct {
	// Configure the service URL for the Pulsar service.
	// This parameter is required
	URL string

	// Timeout for the establishment of a TCP connection (default: 30 seconds)
	ConnectionTimeout time.Duration

	// Set the operation timeout (default: 30 seconds)
	// Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
	// operation will be marked as failed
	OperationTimeout time.Duration

	// Configure the authentication provider. (default: no authentication)
	// Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
	Authentication

	// Set the path to the trusted TLS certificate file
	TLSTrustCertsFilePath string

	// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
	TLSAllowInsecureConnection bool

	// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
	TLSValidateHostname bool
}

Builder interface that is used to construct a Pulsar Client instance.

type CompressionType

type CompressionType int
const (
	NoCompression CompressionType = iota
	LZ4
	ZLib
	ZSTD
)

type Consumer

type Consumer interface {
	// Subscription get a subscription for the consumer
	Subscription() string

	// Unsubscribe the consumer
	Unsubscribe() error

	// Receive a single message.
	// This calls blocks until a message is available.
	Receive(context.Context) (Message, error)

	// Chan returns a channel to consume messages from
	Chan() <-chan ConsumerMessage

	// Ack the consumption of a single message
	Ack(Message)

	// AckID the consumption of a single message, identified by its MessageID
	AckID(MessageID)

	// Acknowledge the failure to process a single message.
	//
	// When a message is "negatively acked" it will be marked for redelivery after
	// some fixed delay. The delay is configurable when constructing the consumer
	// with ConsumerOptions.NAckRedeliveryDelay .
	//
	// This call is not blocking.
	Nack(Message)

	// Acknowledge the failure to process a single message.
	//
	// When a message is "negatively acked" it will be marked for redelivery after
	// some fixed delay. The delay is configurable when constructing the consumer
	// with ConsumerOptions.NackRedeliveryDelay .
	//
	// This call is not blocking.
	NackID(MessageID)

	// Close the consumer and stop the broker to push more messages
	Close()

	// Reset the subscription associated with this consumer to a specific message id.
	// The message id can either be a specific message or represent the first or last messages in the topic.
	//
	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
	//       seek() on the individual partitions.
	Seek(MessageID) error

	// Reset the subscription associated with this consumer to a specific message publish time.
	//
	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
	// the individual partitions.
	//
	// @param timestamp
	//            the message publish time where to reposition the subscription
	//
	SeekByTime(time time.Time) error
}

Consumer is an interface that abstracts behavior of Pulsar's consumer

type ConsumerMessage

type ConsumerMessage struct {
	Consumer
	Message
}

Pair of a Consumer and Message

type ConsumerOptions

type ConsumerOptions struct {
	// Specify the topic this consumer will subscribe on.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	Topic string

	// Specify a list of topics this consumer will subscribe on.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	Topics []string

	// Specify a regular expression to subscribe to multiple topics under the same namespace.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	TopicsPattern string

	// Specify the interval in which to poll for new partitions or new topics if using a TopicsPattern.
	AutoDiscoveryPeriod time.Duration

	// Specify the subscription name for this consumer
	// This argument is required when subscribing
	SubscriptionName string

	// Attach a set of application defined properties to the consumer
	// This properties will be visible in the topic stats
	Properties map[string]string

	// Select the subscription type to be used when subscribing to the topic.
	// Default is `Exclusive`
	Type SubscriptionType

	// InitialPosition at which the cursor will be set when subscribe
	// Default is `Latest`
	SubscriptionInitialPosition

	// Configuration for Dead Letter Queue consumer policy.
	// eg. route the message to topic X after N failed attempts at processing it
	// By default is nil and there's no DLQ
	DLQ *DLQPolicy

	// Sets a `MessageChannel` for the consumer
	// When a message is received, it will be pushed to the channel for consumption
	MessageChannel chan ConsumerMessage

	// Sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
	// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is `1000` messages and should be good for most use cases.
	// Set to -1 to disable prefetching in consumer
	ReceiverQueueSize int

	// The delay after which to redeliver the messages that failed to be
	// processed. Default is 1min. (See `Consumer.Nack()`)
	NackRedeliveryDelay time.Duration

	// Set the consumer name.
	Name string

	// If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
	// of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
	// point, the messages will be sent as normal.
	//
	// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
	//  failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
	//  shared subscription, will lead to the subscription call throwing a PulsarClientException.
	ReadCompacted bool

	// Mark the subscription as replicated to keep it in sync across clusters
	ReplicateSubscriptionState bool
}

ConsumerOptions is used to configure and create instances of Consumer

type DLQPolicy

type DLQPolicy struct {
	// Maximum number of times that a message will be delivered before being sent to the dead letter queue.
	MaxDeliveries uint32

	// Name of the topic where the failing messages will be sent.
	Topic string
}

Configuration for Dead Letter Queue consumer policy

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error implement error interface, composed of two parts: msg and result.

func (*Error) Error

func (e *Error) Error() string

func (*Error) Result

func (e *Error) Result() Result

type HashingScheme

type HashingScheme int
const (
	// JavaStringHash and Java String.hashCode() equivalent
	JavaStringHash HashingScheme = iota
	// Murmur3_32Hash use Murmur3 hashing function
	Murmur3_32Hash
)

type Message

type Message interface {
	// Topic get the topic from which this message originated from
	Topic() string

	// Properties are application defined key/value pairs that will be attached to the message.
	// Return the properties attached to the message.
	Properties() map[string]string

	// Payload get the payload of the message
	Payload() []byte

	// ID get the unique message ID associated with this message.
	// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
	ID() MessageID

	// PublishTime get the publish time of this message. The publish time is the timestamp that a client
	// publish the message.
	PublishTime() time.Time

	// EventTime get the event time associated with this message. It is typically set by the applications via
	// `ProducerMessage.EventTime`.
	// If EventTime is 0, it means there isn't any event time associated with this message.
	EventTime() time.Time

	// Key get the key of the message, if any
	Key() string

	// Get message redelivery count, redelivery count maintain in pulsar broker. When client nack acknowledge messages,
	// broker will dispatch message again with message redelivery count in CommandMessage defined.
	//
	// Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker
	// redelivery count will be recalculated.
	RedeliveryCount() uint32

	// Check whether the message is replicated from other cluster.
	IsReplicated() bool

	// Get name of cluster, from which the message is replicated.
	GetReplicatedFrom() string
}

Message abstraction used in Pulsar

type MessageID

type MessageID interface {
	// Serialize the message id into a sequence of bytes that can be stored somewhere else
	Serialize() []byte
}

MessageID identifier for a particular message

func DeserializeMessageID

func DeserializeMessageID(data []byte) (MessageID, error)

DeserializeMessageID reconstruct a MessageID object from its serialized representation

func EarliestMessageID

func EarliestMessageID() MessageID

EarliestMessageID returns a messageID that points to the earliest message available in a topic

func LatestMessageID

func LatestMessageID() MessageID

LatestMessage returns a messageID that points to the latest message

type Producer

type Producer interface {
	// Topic return the topic to which producer is publishing to
	Topic() string

	// Name return the producer name which could have been assigned by the system or specified by the client
	Name() string

	// Send a message
	// This call will be blocking until is successfully acknowledged by the Pulsar broker.
	// Example:
	// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
	Send(context.Context, *ProducerMessage) (MessageID, error)

	// SendAsync a message in asynchronous mode
	// This call is blocked when the `event channel` becomes full (default: 10) or the
	// `maxPendingMessages` becomes full (default: 1000)
	// The callback will report back the message being published and
	// the eventual error in publishing
	SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))

	// LastSequenceID get the last sequence id that was published by this producer.
	// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
	// was published and acknowledged by the broker.
	// After recreating a producer with the same producer name, this will return the last message that was
	// published in the previous producer session, or -1 if there no message was ever published.
	// return the last sequence id published by this producer.
	LastSequenceID() int64

	// Flush all the messages buffered in the client and wait until all messages have been successfully
	// persisted.
	Flush() error

	// Close the producer and releases resources allocated
	// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
	// of errors, pending writes will not be retried.
	Close()
}

Producer is used to publish messages on a topic

type ProducerMessage

type ProducerMessage struct {
	// Payload for the message
	Payload []byte

	// Key sets the key of the message for routing policy
	Key string

	// Properties attach application defined properties on the message
	Properties map[string]string

	// EventTime set the event time for a given message
	// By default, messages don't have an event time associated, while the publish
	// time will be be always present.
	// Set the event time to a non-zero timestamp to explicitly declare the time
	// that the event "happened", as opposed to when the message is being published.
	EventTime time.Time

	// ReplicationClusters override the replication clusters for this message.
	ReplicationClusters []string

	// SequenceID set the sequence id to assign to the current message
	SequenceID *int64

	// Request to deliver the message only after the specified relative delay.
	// Note: messages are only delivered with delay when a consumer is consuming
	//     through a `SubscriptionType=Shared` subscription. With other subscription
	//     types, the messages will still be delivered immediately.
	DeliverAfter time.Duration

	// Deliver the message only at or after the specified absolute timestamp.
	// Note: messages are only delivered with delay when a consumer is consuming
	//     through a `SubscriptionType=Shared` subscription. With other subscription
	//     types, the messages will still be delivered immediately.
	DeliverAt time.Time
}

ProducerMessage abstraction used in Pulsar producer

type ProducerOptions

type ProducerOptions struct {
	// Topic specify the topic this producer will be publishing on.
	// This argument is required when constructing the producer.
	Topic string

	// Name specify a name for the producer
	// If not assigned, the system will generate a globally unique name which can be access with
	// Producer.ProducerName().
	// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
	// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
	// a topic.
	Name string

	// Properties attach a set of application defined properties to the producer
	// This properties will be visible in the topic stats
	Properties map[string]string

	// MaxPendingMessages set the max size of the queue holding the messages pending to receive an
	// acknowledgment from the broker.
	MaxPendingMessages int

	// HashingScheme change the `HashingScheme` used to chose the partition on where to publish a particular message.
	// Standard hashing functions available are:
	//
	//  - `JavaStringHash` : Java String.hashCode() equivalent
	//  - `Murmur3_32Hash` : Use Murmur3 hashing function.
	// 		https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
	//
	// Default is `JavaStringHash`.
	HashingScheme

	// CompressionType set the compression type for the producer.
	// By default, message payloads are not compressed. Supported compression types are:
	//  - LZ4
	//  - ZLIB
	//  - ZSTD
	//
	// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
	// release in order to be able to receive messages compressed with ZSTD.
	CompressionType

	// MessageRouter set a custom message routing policy by passing an implementation of MessageRouter
	// The router is a function that given a particular message and the topic metadata, returns the
	// partition index where the message should be routed to
	MessageRouter func(*ProducerMessage, TopicMetadata) int

	// DisableBatching control whether automatic batching of messages is enabled for the producer. By default batching
	// is enabled.
	// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
	// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
	// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
	// contents.
	// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
	// Setting `DisableBatching: true` will make the producer to send messages individually
	DisableBatching bool

	// BatchingMaxPublishDelay set the time period within which the messages sent will be batched (default: 10ms)
	// if batch messages are enabled. If set to a non zero value, messages will be queued until this time
	// interval or until
	BatchingMaxPublishDelay time.Duration

	// BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000)
	// If set to a value greater than 1, messages will be queued until this threshold is reached or
	// batch interval has elapsed.
	BatchingMaxMessages uint
}

type Reader

type Reader interface {
	// Topic from which this reader is reading from
	Topic() string

	// Next read the next message in the topic, blocking until a message is available
	Next(context.Context) (Message, error)

	// HasNext check if there is any message available to read from the current position
	HasNext() bool

	// Close the reader and stop the broker to push more messages
	Close()
}

Reader can be used to scan through all the messages currently available in a topic.

type ReaderMessage

type ReaderMessage struct {
	Reader
	Message
}

ReaderMessage package Reader and Message as a struct to use

type ReaderOptions

type ReaderOptions struct {
	// Topic specify the topic this consumer will subscribe on.
	// This argument is required when constructing the reader.
	Topic string

	// Name set the reader name.
	Name string

	// Attach a set of application defined properties to the reader
	// This properties will be visible in the topic stats
	Properties map[string]string

	// StartMessageID initial reader positioning is done by specifying a message id. The options are:
	//  * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic
	//  * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the
	//                           reader was created
	//  * `MessageID` : Start reading from a particular message id, the reader will position itself on that
	//                  specific position. The first message to be read will be the message next to the specified
	//                  messageID
	StartMessageID MessageID

	// If true, the reader will start at the `StartMessageID`, included.
	// Default is `false` and the reader will start from the "next" message
	StartMessageIDInclusive bool

	// MessageChannel sets a `MessageChannel` for the consumer
	// When a message is received, it will be pushed to the channel for consumption
	MessageChannel chan ReaderMessage

	// ReceiverQueueSize sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the Reader before the
	// application calls Reader.readNext(). Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is {@code 1000} messages and should be good for most use cases.
	ReceiverQueueSize int

	// SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader".
	SubscriptionRolePrefix string

	// If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
	// of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for
	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
	// point, the messages will be sent as normal.
	//
	// ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent
	// topics will lead to the reader create call throwing a PulsarClientException.
	ReadCompacted bool
}

ReaderOptions abstraction Reader options to use.

type Result

type Result int

Result used to represent pulsar processing is an alias of type int.

const (
	// ResultOk means no errors
	ResultOk Result = iota
	// ResultUnknownError means unknown error happened on broker
	ResultUnknownError
	// ResultInvalidConfiguration means invalid configuration
	ResultInvalidConfiguration
	// ResultTimeoutError means operation timed out
	ResultTimeoutError
	// ResultLookupError means broker lookup failed
	ResultLookupError
	// ResultInvalidTopicName means invalid topic name
	ResultInvalidTopicName
	// ResultConnectError means failed to connect to broker
	ResultConnectError

	// ReadError means failed to read from socket
	//ReadError                      Result = 6
	// AuthenticationError means authentication failed on broker
	//AuthenticationError            Result = 7
	//AuthorizationError             Result = 8
	//ErrorGettingAuthenticationData Result = 9  // Client cannot find authorization data
	//BrokerMetadataError            Result = 10 // Broker failed in updating metadata
	//BrokerPersistenceError         Result = 11 // Broker failed to persist entry
	//ChecksumError                  Result = 12 // Corrupt message checksum failure
	// ConsumerBusy means Exclusive consumer is already connected
	ConsumerBusy Result = 13
	//NotConnectedError              Result = 14 // Producer/Consumer is not currently connected to broker
	//AlreadyClosedError             Result = 15 // Producer/Consumer is already closed and not accepting any operation
	//InvalidMessage                 Result = 16 // Error in publishing an already used message
	//ConsumerNotInitialized         Result = 17 // Consumer is not initialized
	//ProducerNotInitialized         Result = 18 // Producer is not initialized
	//TooManyLookupRequestException  Result = 19 // Too Many concurrent LookupRequest
	// InvalidUrl means Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
	//InvalidUrl                            Result = 21
	// ServiceUnitNotReady unloaded between client did lookup and producer/consumer got created
	//ServiceUnitNotReady                   Result = 22
	//OperationNotSupported                 Result = 23
	//ProducerBlockedQuotaExceededError     Result = 24 // Producer is blocked
	//ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
	//ProducerQueueIsFull                   Result = 26 // Producer queue is full
	//MessageTooBig                         Result = 27 // Trying to send a messages exceeding the max size
	TopicNotFound        Result = 28 // Topic not found
	SubscriptionNotFound Result = 29 // Subscription not found

)

type SubscriptionInitialPosition

type SubscriptionInitialPosition int
const (
	// Latest position which means the start consuming position will be the last message
	SubscriptionPositionLatest SubscriptionInitialPosition = iota

	// Earliest position which means the start consuming position will be the first message
	SubscriptionPositionEarliest
)

type SubscriptionType

type SubscriptionType int

SubscriptionType of subscription supported by Pulsar

const (
	// Exclusive there can be only 1 consumer on the same topic with the same subscription name
	Exclusive SubscriptionType = iota

	// Shared subscription mode, multiple consumer will be able to use the same subscription name
	// and the messages will be dispatched according to
	// a round-robin rotation between the connected consumers
	Shared

	// Failover subscription mode, multiple consumer will be able to use the same subscription name
	// but only 1 consumer will receive the messages.
	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
	Failover

	// KeyShared subscription mode, multiple consumer will be able to use the same
	// subscription and all messages with the same key will be dispatched to only one consumer
	KeyShared
)

type TopicMetadata

type TopicMetadata interface {
	// NumPartitions get the number of partitions for the specific topic
	NumPartitions() uint32
}

TopicMetadata is a interface of topic metadata

Directories

Path Synopsis
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL