incubator-pulsar: Index | Files

package pulsar

import ""


Package Files

c_client.go c_consumer.go c_error.go c_message.go c_producer.go c_reader.go client.go consumer.go error.go logger.go message.go pointer.go producer.go reader.go

type Authentication Uses

type Authentication interface{}

Opaque interface that represents the authentication credentials

func NewAuthenticationAthenz Uses

func NewAuthenticationAthenz(authParams string) Authentication

Create new Athenz Authentication provider with configuration in JSON form

func NewAuthenticationTLS Uses

func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication

Create new Authentication provider with specified TLS certificate and private key

type Client Uses

type Client interface {
    // Create the producer instance
    // This method will block until the producer is created successfully
    CreateProducer(ProducerOptions) (Producer, error)

    // Create a `Consumer` by subscribing to a topic.
    // If the subscription does not exist, a new subscription will be created and all messages published after the
    // creation will be retained until acknowledged, even if the consumer is not connected
    Subscribe(ConsumerOptions) (Consumer, error)

    // Create a Reader instance.
    // This method will block until the reader is created successfully.
    CreateReader(ReaderOptions) (Reader, error)

    // Close the Client and free associated resources
    Close() error

func NewClient Uses

func NewClient(options ClientOptions) (Client, error)

type ClientOptions Uses

type ClientOptions struct {
    // Configure the service URL for the Pulsar service.
    // This parameter is required
    URL string

    // Number of threads to be used for handling connections to brokers (default: 1 thread)
    IOThreads int

    // Set the operation timeout (default: 30 seconds)
    // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
    // operation will be maked as failed
    OperationTimeoutSeconds time.Duration

    // Set the number of threads to be used for message listeners (default: 1 thread)
    MessageListenerThreads int

    // Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker.
    // (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe
    // on thousands of topic using created Pulsar Client
    ConcurrentLookupRequests int

    // Provide a custom logger implementation where all Pulsar library info/warn/error messages will be routed
    // By default, log messages will be printed on standard output. By passing a logger function, application
    // can determine how to print logs. This function will be called each time the Pulsar client library wants
    // to write any logs.
    Logger func(level LoggerLevel, file string, line int, message string)

    // Set the path to the trusted TLS certificate file
    TLSTrustCertsFilePath string

    // Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
    TLSAllowInsecureConnection bool

    // Configure the authentication provider. (default: no authentication)
    // Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`

    // Set the interval between each stat info (default: 60 seconds). Stats will be activated with positive
    // statsIntervalSeconds It should be set to at least 1 second
    StatsIntervalInSeconds int

Builder interface that is used to construct a Pulsar Client instance.

type CompressionType Uses

type CompressionType int
const (
    NoCompression CompressionType = 0
    LZ4           CompressionType = 1
    ZLib          CompressionType = 2

type Consumer Uses

type Consumer interface {
    // Get the topic for the consumer
    Topic() string

    // Get a subscription for the consumer
    Subscription() string

    // Unsubscribe the consumer
    Unsubscribe() error

    // Receives a single message.
    // This calls blocks until a message is available.
    Receive(context.Context) (Message, error)

    //Ack the consumption of a single message
    Ack(Message) error

    // Ack the consumption of a single message, identified by its MessageID
    AckID(MessageID) error

    // Ack the reception of all the messages in the stream up to (and including) the provided message.
    // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
    // re-delivered to this consumer.
    // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
    // It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
    AckCumulative(Message) error

    // Ack the reception of all the messages in the stream up to (and including) the provided message.
    // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
    // re-delivered to this consumer.
    // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
    // It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
    AckCumulativeID(MessageID) error

    // Close the consumer and stop the broker to push more messages
    Close() error

    // Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
    // active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
    // the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
    // breaks, the messages are redelivered after reconnect.

An interface that abstracts behavior of Pulsar's consumer

type ConsumerMessage Uses

type ConsumerMessage struct {

Pair of a Consumer and Message

type ConsumerOptions Uses

type ConsumerOptions struct {
    // Specify the topic this consumer will subscribe on.
    // This argument is required when subscribing
    Topic string

    // Specify the subscription name for this consumer
    // This argument is required when subscribing
    SubscriptionName string

    // Set the timeout for unacked messages
    // Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
    // Default is 0, which means message are not being replayed based on ack time
    AckTimeout time.Duration

    // Select the subscription type to be used when subscribing to the topic.
    // Default is `Exclusive`
    Type SubscriptionType

    // Sets a `MessageChannel` for the consumer
    // When a message is received, it will be pushed to the channel for consumption
    MessageChannel chan ConsumerMessage

    // Sets the size of the consumer receive queue.
    // The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
    // application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
    // throughput at the expense of bigger memory utilization.
    // Default value is `1000` messages and should be good for most use cases.
    // Set to -1 to disable prefetching in consumer
    ReceiverQueueSize int

    // Set the max total receiver queue size across partitions.
    // This setting will be used to reduce the receiver queue size for individual partitions
    // ReceiverQueueSize(int) if the total exceeds this value (default: 50000).
    MaxTotalReceiverQueueSizeAcrossPartitions int

    // Set the consumer name.
    Name string

ConsumerBuilder is used to configure and create instances of Consumer

type Error Uses

type Error struct {
    // contains filtered or unexported fields

func (*Error) Error Uses

func (e *Error) Error() string

func (*Error) Result Uses

func (e *Error) Result() Result

type HashingScheme Uses

type HashingScheme int
const (
    JavaStringHash HashingScheme = 0 // Java String.hashCode() equivalent
    Murmur3_32Hash HashingScheme = 1 // Use Murmur3 hashing function
    BoostHash      HashingScheme = 2 // C++ based boost::hash

type LoggerLevel Uses

type LoggerLevel int
const (
    DEBUG LoggerLevel = iota

func (LoggerLevel) String Uses

func (l LoggerLevel) String() string

type Message Uses

type Message interface {
    // Return the properties attached to the message.
    // Properties are application defined key/value pairs that will be attached to the message
    Properties() map[string]string

    // Get the payload of the message
    Payload() []byte

    // Get the unique message ID associated with this message.
    // The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
    ID() MessageID

    // Get the publish time of this message. The publish time is the timestamp that a client publish the message.
    PublishTime() time.Time

    // Get the event time associated with this message. It is typically set by the applications via
    // `ProducerMessage.EventTime`.
    // If there isn't any event time associated with this event, it will be nil.
    EventTime() *time.Time

    // Get the key of the message, if any
    Key() string

type MessageID Uses

type MessageID interface {
    // Serialize the message id into a sequence of bytes that can be stored somewhere else
    Serialize() []byte

Identifier for a particular message

var (
    // MessageID that points to the earliest message avaialable in a topic
    EarliestMessage MessageID = earliestMessageID()

    // MessageID that points to the latest message
    LatestMessage MessageID = latestMessageID()

func DeserializeMessageID Uses

func DeserializeMessageID(data []byte) MessageID

Reconstruct a MessageID object from its serialized representation

type MessageRoutingMode Uses

type MessageRoutingMode int
const (
    // Publish messages across all partitions in round-robin.
    RoundRobinDistribution MessageRoutingMode = 0

    // The producer will chose one single partition and publish all the messages into that partition
    UseSinglePartition MessageRoutingMode = 1

    // Use custom message router implementation that will be called to determine the partition for a particular message.
    CustomPartition MessageRoutingMode = 2

type Producer Uses

type Producer interface {
    // return the topic to which producer is publishing to
    Topic() string

    // return the producer name which could have been assigned by the system or specified by the client
    Name() string

    // Send a message
    // This call will be blocking until is successfully acknowledged by the Pulsar broker.
    // Example:
    // producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
    Send(context.Context, ProducerMessage) error

    // Send a message in asynchronous mode
    // The callback will report back the message being published and
    // the eventual error in publishing
    SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))

    // Close the producer and releases resources allocated
    // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
    // of errors, pending writes will not be retried.
    Close() error

The producer is used to publish messages on a topic

type ProducerMessage Uses

type ProducerMessage struct {
    // Payload for the message
    Payload []byte

    // Sets the key of the message for routing policy
    Key string

    // Attach application defined properties on the message
    Properties map[string]string

    // Set the event time for a given message
    EventTime time.Time

    // Override the replication clusters for this message.
    ReplicationClusters []string

type ProducerOptions Uses

type ProducerOptions struct {
    // Specify the topic this producer will be publishing on.
    // This argument is required when constructing the producer.
    Topic string

    // Specify a name for the producer
    // If not assigned, the system will generate a globally unique name which can be access with
    // Producer.ProducerName().
    // When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
    // across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
    // a topic.
    Name string

    // Set the send timeout (default: 30 seconds)
    // If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
    // Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
    // deduplication feature.
    SendTimeout time.Duration

    // Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
    // When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail
    // unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
    MaxPendingMessages int

    // Set the number of max pending messages across all the partitions
    // This setting will be used to lower the max pending messages for each partition
    // `MaxPendingMessages(int)`, if the total exceeds the configured value.
    MaxPendingMessagesAcrossPartitions int

    // Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
    // message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
    // `ProducerQueueIsFullError` when there is no space left in pending queue.
    BlockIfQueueFull bool

    // Set the message routing mode for the partitioned producer.
    // Default routing mode is round-robin routing.
    // This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a
    // particular message.

    // Change the `HashingScheme` used to chose the partition on where to publish a particular message.
    // Standard hashing functions available are:
    //  - `JavaStringHash` : Java String.hashCode() equivalent
    //  - `Murmur3_32Hash` : Use Murmur3 hashing function.
    //  - `BoostHash`      : C++ based boost::hash
    // Default is `JavaStringHash`.

    // Set the compression type for the producer.
    // By default, message payloads are not compressed. Supported compression types are:
    //  - LZ4
    //  - ZLIB

    // Set a custom message routing policy by passing an implementation of MessageRouter
    // The router is a function that given a particular message and the topic metadata, returns the
    // partition index where the message should be routed to
    MessageRouter func(Message, TopicMetadata) int

    // Control whether automatic batching of messages is enabled for the producer. Default: false [No batching]
    // When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
    // broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
    // messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
    // contents.
    // When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
    Batching bool

    // Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
    // enabled. If set to a non zero value, messages will be queued until this time interval or until
    BatchingMaxPublishDelay time.Duration

    // Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
    // messages will be queued until this threshold is reached or batch interval has elapsed
    BatchingMaxMessages uint

type Reader Uses

type Reader interface {
    // The topic from which this reader is reading from
    Topic() string

    // Read the next message in the topic, blocking until a message is available
    Next(context.Context) (Message, error)

    // Close the reader and stop the broker to push more messages
    Close() error

A Reader can be used to scan through all the messages currently available in a topic.

type ReaderMessage Uses

type ReaderMessage struct {

type ReaderOptions Uses

type ReaderOptions struct {
    // Specify the topic this consumer will subscribe on.
    // This argument is required when constructing the reader.
    Topic string

    // Set the reader name.
    Name string

    // The initial reader positioning is done by specifying a message id. The options are:
    //  * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic
    //  * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the
    //                           reader was created
    //  * `MessageID` : Start reading from a particular message id, the reader will position itself on that
    //                  specific position. The first message to be read will be the message next to the specified
    //                  messageID
    StartMessageID MessageID

    // Sets a `MessageChannel` for the consumer
    // When a message is received, it will be pushed to the channel for consumption
    MessageChannel chan ReaderMessage

    // Sets the size of the consumer receive queue.
    // The consumer receive queue controls how many messages can be accumulated by the Reader before the
    // application calls Reader.readNext(). Using a higher value could potentially increase the consumer
    // throughput at the expense of bigger memory utilization.
    // Default value is {@code 1000} messages and should be good for most use cases.
    ReceiverQueueSize int

    // Set the subscription role prefix. The default prefix is "reader".
    SubscriptionRolePrefix string

type Result Uses

type Result int
const (
    UnknownError                          Result = 1  // Unknown error happened on broker
    InvalidConfiguration                  Result = 2  // Invalid configuration
    TimeoutError                          Result = 3  // Operation timed out
    LookupError                           Result = 4  // Broker lookup failed
    ConnectError                          Result = 5  // Failed to connect to broker
    ReadError                             Result = 6  // Failed to read from socket
    AuthenticationError                   Result = 7  // Authentication failed on broker
    AuthorizationError                    Result = 8  // Client is not authorized to create producer/consumer
    ErrorGettingAuthenticationData        Result = 9  // Client cannot find authorization data
    BrokerMetadataError                   Result = 10 // Broker failed in updating metadata
    BrokerPersistenceError                Result = 11 // Broker failed to persist entry
    ChecksumError                         Result = 12 // Corrupt message checksum failure
    ConsumerBusy                          Result = 13 // Exclusive consumer is already connected
    NotConnectedError                     Result = 14 // Producer/Consumer is not currently connected to broker
    AlreadyClosedError                    Result = 15 // Producer/Consumer is already closed and not accepting any operation
    InvalidMessage                        Result = 16 // Error in publishing an already used message
    ConsumerNotInitialized                Result = 17 // Consumer is not initialized
    ProducerNotInitialized                Result = 18 // Producer is not initialized
    TooManyLookupRequestException         Result = 19 // Too Many concurrent LookupRequest
    InvalidTopicName                      Result = 20 // Invalid topic name
    InvalidUrl                            Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
    ServiceUnitNotReady                   Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created
    OperationNotSupported                 Result = 23
    ProducerBlockedQuotaExceededError     Result = 24 // Producer is blocked
    ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
    ProducerQueueIsFull                   Result = 26 // Producer queue is full
    MessageTooBig                         Result = 27 // Trying to send a messages exceeding the max size
    TopicNotFound                         Result = 28 // Topic not found
    SubscriptionNotFound                  Result = 29 // Subscription not found
    ConsumerNotFound                      Result = 30 // Consumer not found
    UnsupportedVersionError               Result = 31 // Error when an older client/version doesn't support a required feature
    TopicTerminated                       Result = 32 // Topic was already terminated
    CryptoError                           Result = 33 // Error when crypto operation fails

func (Result) String Uses

func (r Result) String() string

type SubscriptionType Uses

type SubscriptionType int

Types of subscription supported by Pulsar

const (
    // There can be only 1 consumer on the same topic with the same subscription name
    Exclusive SubscriptionType = 0

    // Multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
    // a round-robin rotation between the connected consumers
    Shared SubscriptionType = 1

    // Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
    // If that consumer disconnects, one of the other connected consumers will start receiving messages.
    Failover SubscriptionType = 2

type TopicMetadata Uses

type TopicMetadata interface {
    // Get the number of partitions for the specific topic
    NumPartitions() int

Package pulsar imports 10 packages (graph). Updated 2018-06-27. Refresh now. Tools for package owners.