Documentation ¶
Overview ¶
Package pulsar implements a Apache Pulsar Client.
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Close() error
- func (c *Client) CloseConsumer(consumerID uint64) error
- func (c *Client) CloseProducer(producerID uint64) error
- func (c *Client) Dial(ctx context.Context) error
- func (c *Client) NewConsumer(ctx context.Context, config ConsumerConfig) (Consumer, error)
- func (c *Client) NewProducer(ctx context.Context, config ProducerConfig) (*Producer, error)
- func (c *Client) Topics(namespace string) ([]*Topic, error)
- type ClientOption
- type Consumer
- type ConsumerConfig
- type InitialPosition
- type Logger
- type Message
- type MessageID
- type Producer
- type ProducerConfig
- type SubscriptionType
- type Topic
Constants ¶
const ( ExclusiveSubscription = SubscriptionType(pb.CommandSubscribe_Exclusive) )
Subscription type options.
const ( // LatestPosition starts reading from the topic end, only getting // messages published after the reader was created. LatestPosition = InitialPosition(pb.CommandSubscribe_Latest) // EarliestPosition starts reading from the earliest message // available in the topic. EarliestPosition = InitialPosition(pb.CommandSubscribe_Earliest) )
Subscription initial position options.
const (
DefaultNamespace = "default"
)
...
Variables ¶
var ErrNetClosing = errors.New("use of closed network connection")
ErrNetClosing is returned when a network descriptor is used after it has been closed.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client implements a Pulsar client.
func NewClient ¶
func NewClient(serverURL string, opts ...ClientOption) (*Client, error)
NewClient creates a new Pulsar client.
func (*Client) CloseConsumer ¶
CloseConsumer closes a specific consumer.
func (*Client) CloseProducer ¶
CloseProducer closes a specific producer.
func (*Client) Dial ¶
Dial connects to the Pulsar server. This needs to be called before a Consumer or Producer can be created.
func (*Client) NewConsumer ¶
NewConsumer creates a new Consumer, returning after the connection has been made.
func (*Client) NewProducer ¶
NewProducer creates a new Producer, returning after the connection has been made.
type Consumer ¶
type Consumer interface { // Close closes the subscription and unregisters from the Client. Close() error AckMessage(*Message) error // ReadMessage reads and return the next message from the Pulsar. ReadMessage(context.Context) (*Message, error) SeekMessage(*Message) error // HasNext returns whether there is a message available to read HasNext() bool // LastMessageID returns the last message ID of the topic. // If the topic is empty, EntryId will be math.MaxUint64 LastMessageID() (*MessageID, error) }
Consumer provides a high-level API for consuming messages from Pulsar.
type ConsumerConfig ¶
type ConsumerConfig struct { // The topic name to read messages from. Topic string // A regular expression for topics to read messages from. TopicPattern string // Interval in ms in which the client checks for topic changes // that match the set topic pattern and updates the subscriptions. // Default is 30000 TopicPatternDiscoveryInterval int // A unique name for the subscription. If not specified, a random name // will be used. Subscription string // A unique name for the Consumer. If not specified, a random name // will be used. Name string // Select the subscription type to be used when subscribing to the topic. // Default is `Subscribe_Exclusive` Type SubscriptionType // Signal whether the subscription will initialize on latest // or earliest position. InitialPosition InitialPosition // If specified, the subscription will position the cursor // on the particular message id and will send messages from // that point. StartMessageID []byte // Include the message StartMessageID in the read messages. // If StartMessageID is not set but InitialPosition is set // to LatestPosition, the latest message ID of the topic // will be sent. StartMessageIDInclusive bool // Signal whether the subscription should be backed by a // durable cursor or not. For Readers, set to false, for // Consumers set Durable to true and specify a Subscription. // If Durable is true, StartMessageID will be ignored, as it // will be determined by the broker. Durable bool // If true, the subscribe operation will cause a topic to be // created if it does not exist already (and if topic auto-creation // is allowed by broker. // If false, the subscribe operation will fail if the topic // does not exist. ForceTopicCreation bool // MessageChannel sets a channel that receives all messages that the // consumer receives. If not set, a default channel for 1000 messages // will be created. MessageChannel chan *Message }
ConsumerConfig is a configuration object used to create new instances of Consumer.
func (*ConsumerConfig) Validate ¶
func (config *ConsumerConfig) Validate() error
Validate method validates the config properties.
type Message ¶
type Message struct { Body []byte Topic string ID *MessageID // contains filtered or unexported fields }
Message is a data structure representing Pulsar messages.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer provides a high-level API for sending messages to Pulsar.
func (*Producer) WriteMessage ¶
WriteMessage puts the message into the message queue, blocks until the message has been sent and an acknowledgement message is received from Pulsar.
func (*Producer) WriteMessageAsync ¶
WriteMessageAsync puts the message into the message queue. If the message queue is full, this function will block until it can write to the queue. The queue size can be specified in the Producer options.
type ProducerConfig ¶
type ProducerConfig struct { // The topic to write messages to. Topic string // The name of the producer. Name string // Limit on how many messages will be buffered before being sent as a batch. // // The default is a batch size of 100 messages. BatchSize int // Time limit on how often a batch that is not full yet will be flushed and // sent to Pulsar. // // The default is to flush every second. BatchTimeout time.Duration // Capacity of the internal producer message queue. // // The default is to use a queue capacity of 1000 messages. QueueCapacity int }
ProducerConfig is a configuration object used to create new instances of Producer.
func (*ProducerConfig) Validate ¶
func (config *ProducerConfig) Validate() error
Validate method validates the config properties.