Documentation ¶
Index ¶
- type Authentication
- type Client
- type ClientOptions
- type CompressionType
- type Consumer
- type ConsumerMessage
- type ConsumerOptions
- type Error
- type HashingScheme
- type LoggerLevel
- type Message
- type MessageID
- type MessageRoutingMode
- type Producer
- type ProducerMessage
- type ProducerOptions
- type Reader
- type ReaderMessage
- type ReaderOptions
- type Result
- type SubscriptionType
- type TopicMetadata
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Authentication ¶
type Authentication interface{}
Opaque interface that represents the authentication credentials
func NewAuthenticationAthenz ¶
func NewAuthenticationAthenz(authParams string) Authentication
Create new Athenz Authentication provider with configuration in JSON form
func NewAuthenticationTLS ¶
func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication
Create new Authentication provider with specified TLS certificate and private key
type Client ¶
type Client interface { // Create the producer instance // This method will block until the producer is created successfully CreateProducer(ProducerOptions) (Producer, error) // Create a `Consumer` by subscribing to a topic. // // If the subscription does not exist, a new subscription will be created and all messages published after the // creation will be retained until acknowledged, even if the consumer is not connected Subscribe(ConsumerOptions) (Consumer, error) // Create a Reader instance. // This method will block until the reader is created successfully. CreateReader(ReaderOptions) (Reader, error) // Close the Client and free associated resources Close() error }
func NewClient ¶
func NewClient(options ClientOptions) (Client, error)
type ClientOptions ¶
type ClientOptions struct { // Configure the service URL for the Pulsar service. // This parameter is required URL string // Number of threads to be used for handling connections to brokers (default: 1 thread) IOThreads int // Set the operation timeout (default: 30 seconds) // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the // operation will be marked as failed OperationTimeoutSeconds time.Duration // Set the number of threads to be used for message listeners (default: 1 thread) MessageListenerThreads int // Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker. // (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe // on thousands of topic using created Pulsar Client ConcurrentLookupRequests int // Provide a custom logger implementation where all Pulsar library info/warn/error messages will be routed // By default, log messages will be printed on standard output. By passing a logger function, application // can determine how to print logs. This function will be called each time the Pulsar client library wants // to write any logs. Logger func(level LoggerLevel, file string, line int, message string) // Set the path to the trusted TLS certificate file TLSTrustCertsFilePath string // Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false) TLSAllowInsecureConnection bool // Configure the authentication provider. (default: no authentication) // Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")` Authentication // Set the interval between each stat info (default: 60 seconds). Stats will be activated with positive // statsIntervalSeconds It should be set to at least 1 second StatsIntervalInSeconds int }
Builder interface that is used to construct a Pulsar Client instance.
type CompressionType ¶
type CompressionType int
const ( NoCompression CompressionType = 0 LZ4 CompressionType = 1 ZLib CompressionType = 2 )
type Consumer ¶
type Consumer interface { // Get the topic for the consumer Topic() string // Get a subscription for the consumer Subscription() string // Unsubscribe the consumer Unsubscribe() error // Receives a single message. // This calls blocks until a message is available. Receive(context.Context) (Message, error) //Ack the consumption of a single message Ack(Message) error // Ack the consumption of a single message, identified by its MessageID AckID(MessageID) error // Ack the reception of all the messages in the stream up to (and including) the provided message. // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be // re-delivered to this consumer. // // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. // // It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered. AckCumulative(Message) error // Ack the reception of all the messages in the stream up to (and including) the provided message. // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be // re-delivered to this consumer. // // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. // // It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered. AckCumulativeID(MessageID) error // Close the consumer and stop the broker to push more messages Close() error // Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not // active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all // the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection // breaks, the messages are redelivered after reconnect. RedeliverUnackedMessages() }
An interface that abstracts behavior of Pulsar's consumer
type ConsumerMessage ¶
Pair of a Consumer and Message
type ConsumerOptions ¶
type ConsumerOptions struct { // Specify the topic this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topic string // Specify a list of topics this consumer will subscribe on. // Either a topic, a list of topics or a topics pattern are required when subscribing Topics []string // Specify a regular expression to subscribe to multiple topics under the same namespace. // Either a topic, a list of topics or a topics pattern are required when subscribing TopicsPattern string // Specify the subscription name for this consumer // This argument is required when subscribing SubscriptionName string // Attach a set of application defined properties to the consumer // This properties will be visible in the topic stats Properties map[string]string // Set the timeout for unacked messages // Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer // Default is 0, which means message are not being replayed based on ack time AckTimeout time.Duration // Select the subscription type to be used when subscribing to the topic. // Default is `Exclusive` Type SubscriptionType // Sets a `MessageChannel` for the consumer // When a message is received, it will be pushed to the channel for consumption MessageChannel chan ConsumerMessage // Sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the // application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // Default value is `1000` messages and should be good for most use cases. // Set to -1 to disable prefetching in consumer ReceiverQueueSize int // Set the max total receiver queue size across partitions. // This setting will be used to reduce the receiver queue size for individual partitions // ReceiverQueueSize(int) if the total exceeds this value (default: 50000). MaxTotalReceiverQueueSizeAcrossPartitions int // Set the consumer name. Name string // If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog // of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that // point, the messages will be sent as normal. // // ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e. // failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a // shared subscription, will lead to the subscription call throwing a PulsarClientException. ReadCompacted bool }
ConsumerBuilder is used to configure and create instances of Consumer
type HashingScheme ¶
type HashingScheme int
const ( JavaStringHash HashingScheme = 0 // Java String.hashCode() equivalent Murmur3_32Hash HashingScheme = 1 // Use Murmur3 hashing function BoostHash HashingScheme = 2 // C++ based boost::hash )
type LoggerLevel ¶
type LoggerLevel int
const ( DEBUG LoggerLevel = iota INFO WARN ERROR )
func (LoggerLevel) String ¶
func (l LoggerLevel) String() string
type Message ¶
type Message interface { // Return the properties attached to the message. // Properties are application defined key/value pairs that will be attached to the message Properties() map[string]string // Get the payload of the message Payload() []byte // Get the unique message ID associated with this message. // The message id can be used to univocally refer to a message without having the keep the entire payload in memory. ID() MessageID // Get the publish time of this message. The publish time is the timestamp that a client publish the message. PublishTime() time.Time // Get the event time associated with this message. It is typically set by the applications via // `ProducerMessage.EventTime`. // If there isn't any event time associated with this event, it will be nil. EventTime() *time.Time // Get the key of the message, if any Key() string }
type MessageID ¶
type MessageID interface { // Serialize the message id into a sequence of bytes that can be stored somewhere else Serialize() []byte }
Identifier for a particular message
func DeserializeMessageID ¶
Reconstruct a MessageID object from its serialized representation
type MessageRoutingMode ¶
type MessageRoutingMode int
const ( // Publish messages across all partitions in round-robin. RoundRobinDistribution MessageRoutingMode = 0 // The producer will chose one single partition and publish all the messages into that partition UseSinglePartition MessageRoutingMode = 1 // Use custom message router implementation that will be called to determine the partition for a particular message. CustomPartition MessageRoutingMode = 2 )
type Producer ¶
type Producer interface { // return the topic to which producer is publishing to Topic() string // return the producer name which could have been assigned by the system or specified by the client Name() string // Send a message // This call will be blocking until is successfully acknowledged by the Pulsar broker. // Example: // producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload }) Send(context.Context, ProducerMessage) error // Send a message in asynchronous mode // The callback will report back the message being published and // the eventual error in publishing SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error)) // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. Close() error }
The producer is used to publish messages on a topic
type ProducerMessage ¶
type ProducerMessage struct { // Payload for the message Payload []byte // Sets the key of the message for routing policy Key string // Attach application defined properties on the message Properties map[string]string // Set the event time for a given message EventTime time.Time // Override the replication clusters for this message. ReplicationClusters []string }
type ProducerOptions ¶
type ProducerOptions struct { // Specify the topic this producer will be publishing on. // This argument is required when constructing the producer. Topic string // Specify a name for the producer // If not assigned, the system will generate a globally unique name which can be access with // Producer.ProducerName(). // When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique // across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on // a topic. Name string // Attach a set of application defined properties to the producer // This properties will be visible in the topic stats Properties map[string]string // Set the send timeout (default: 30 seconds) // If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported. // Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message // deduplication feature. SendTimeout time.Duration // Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. // When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail // unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior. MaxPendingMessages int // Set the number of max pending messages across all the partitions // This setting will be used to lower the max pending messages for each partition // `MaxPendingMessages(int)`, if the total exceeds the configured value. MaxPendingMessagesAcrossPartitions int // Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing // message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with // `ProducerQueueIsFullError` when there is no space left in pending queue. BlockIfQueueFull bool // Set the message routing mode for the partitioned producer. // Default routing mode is round-robin routing. // // This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a // particular message. MessageRoutingMode // Change the `HashingScheme` used to chose the partition on where to publish a particular message. // Standard hashing functions available are: // // - `JavaStringHash` : Java String.hashCode() equivalent // - `Murmur3_32Hash` : Use Murmur3 hashing function. // https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash // - `BoostHash` : C++ based boost::hash // // Default is `JavaStringHash`. HashingScheme // Set the compression type for the producer. // By default, message payloads are not compressed. Supported compression types are: // - LZ4 // - ZLIB CompressionType // Set a custom message routing policy by passing an implementation of MessageRouter // The router is a function that given a particular message and the topic metadata, returns the // partition index where the message should be routed to MessageRouter func(Message, TopicMetadata) int // Control whether automatic batching of messages is enabled for the producer. Default: false [No batching] // // When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the // broker, leading to better throughput, especially when publishing small messages. If compression is enabled, // messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or // contents. // // When enabled default batch delay is set to 1 ms and default batch size is 1000 messages Batching bool // Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are // enabled. If set to a non zero value, messages will be queued until this time interval or until BatchingMaxPublishDelay time.Duration // Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1, // messages will be queued until this threshold is reached or batch interval has elapsed BatchingMaxMessages uint }
type Reader ¶
type Reader interface { // The topic from which this reader is reading from Topic() string // Read the next message in the topic, blocking until a message is available Next(context.Context) (Message, error) // Check if there is any message available to read from the current position HasNext() (bool, error) // Close the reader and stop the broker to push more messages Close() error }
A Reader can be used to scan through all the messages currently available in a topic.
type ReaderMessage ¶
type ReaderOptions ¶
type ReaderOptions struct { // Specify the topic this consumer will subscribe on. // This argument is required when constructing the reader. Topic string // Set the reader name. Name string // The initial reader positioning is done by specifying a message id. The options are: // * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic // * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the // reader was created // * `MessageID` : Start reading from a particular message id, the reader will position itself on that // specific position. The first message to be read will be the message next to the specified // messageID StartMessageID MessageID // Sets a `MessageChannel` for the consumer // When a message is received, it will be pushed to the channel for consumption MessageChannel chan ReaderMessage // Sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the Reader before the // application calls Reader.readNext(). Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // // Default value is {@code 1000} messages and should be good for most use cases. ReceiverQueueSize int // Set the subscription role prefix. The default prefix is "reader". SubscriptionRolePrefix string // If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog // of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that // point, the messages will be sent as normal. // // ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent // topics will lead to the reader create call throwing a PulsarClientException. ReadCompacted bool }
type Result ¶
type Result int
const ( UnknownError Result = 1 // Unknown error happened on broker InvalidConfiguration Result = 2 // Invalid configuration TimeoutError Result = 3 // Operation timed out LookupError Result = 4 // Broker lookup failed ConnectError Result = 5 // Failed to connect to broker ReadError Result = 6 // Failed to read from socket AuthenticationError Result = 7 // Authentication failed on broker AuthorizationError Result = 8 // Client is not authorized to create producer/consumer ErrorGettingAuthenticationData Result = 9 // Client cannot find authorization data BrokerMetadataError Result = 10 // Broker failed in updating metadata BrokerPersistenceError Result = 11 // Broker failed to persist entry ChecksumError Result = 12 // Corrupt message checksum failure ConsumerBusy Result = 13 // Exclusive consumer is already connected NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation InvalidMessage Result = 16 // Error in publishing an already used message ConsumerNotInitialized Result = 17 // Consumer is not initialized ProducerNotInitialized Result = 18 // Producer is not initialized TooManyLookupRequestException Result = 19 // Too Many concurrent LookupRequest InvalidTopicName Result = 20 // Invalid topic name InvalidUrl Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor) ServiceUnitNotReady Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created OperationNotSupported Result = 23 ProducerBlockedQuotaExceededError Result = 24 // Producer is blocked ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception ProducerQueueIsFull Result = 26 // Producer queue is full MessageTooBig Result = 27 // Trying to send a messages exceeding the max size TopicNotFound Result = 28 // Topic not found SubscriptionNotFound Result = 29 // Subscription not found ConsumerNotFound Result = 30 // Consumer not found UnsupportedVersionError Result = 31 // Error when an older client/version doesn't support a required feature TopicTerminated Result = 32 // Topic was already terminated CryptoError Result = 33 // Error when crypto operation fails )
type SubscriptionType ¶
type SubscriptionType int
Types of subscription supported by Pulsar
const ( // There can be only 1 consumer on the same topic with the same subscription name Exclusive SubscriptionType = 0 // a round-robin rotation between the connected consumers Shared SubscriptionType = 1 // Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. Failover SubscriptionType = 2 )
type TopicMetadata ¶
type TopicMetadata interface { // Get the number of partitions for the specific topic NumPartitions() int }