pulsar

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2021 License: MIT Imports: 22 Imported by: 0

README

Apache Pulsar Golang Client Library go.dev reference Go Report Card

A Go client library for the Apache Pulsar project.

Benefits over other Pulsar Go libraries

  • Faster message processing
  • Pure Golang, works without use of Cgo
  • Idiomatic and cleaner Go
  • Better stability
  • Higher test coverage
  • Pluggable logger interface

Status

The library is in an early state of development, the API is not stable yet. Any help or input is welcome.

Alternative libraries

Documentation

Overview

Package pulsar implements a Apache Pulsar Client.

Index

Constants

View Source
const (
	ExclusiveSubscription = SubscriptionType(pb.CommandSubscribe_Exclusive)
	SharedSubscription    = SubscriptionType(pb.CommandSubscribe_Shared)
)

Subscription type options.

View Source
const (
	// LatestPosition starts reading from the topic end, only getting
	// messages published after the reader was created.
	LatestPosition = InitialPosition(pb.CommandSubscribe_Latest)
	// EarliestPosition starts reading from the earliest message
	// available in the topic.
	EarliestPosition = InitialPosition(pb.CommandSubscribe_Earliest)
)

Subscription initial position options.

View Source
const (
	DefaultNamespace = "default"
)

...

Variables

View Source
var ErrNetClosing = errors.New("use of closed network connection")

ErrNetClosing is returned when a network descriptor is used after it has been closed.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements a Pulsar client.

func NewClient

func NewClient(serverURL string, opts ...ClientOption) (*Client, error)

NewClient creates a new Pulsar client.

func (*Client) Close

func (c *Client) Close() error

Close closes all consumers, producers and the client connection.

func (*Client) CloseConsumer

func (c *Client) CloseConsumer(consumerID uint64) error

CloseConsumer closes a specific consumer.

func (*Client) CloseProducer

func (c *Client) CloseProducer(producerID uint64) error

CloseProducer closes a specific producer.

func (*Client) Dial

func (c *Client) Dial(ctx context.Context) error

Dial connects to the Pulsar server. This needs to be called before a Consumer or Producer can be created.

func (*Client) NewConsumer

func (c *Client) NewConsumer(ctx context.Context, config ConsumerConfig) (Consumer, error)

NewConsumer creates a new Consumer, returning after the connection has been made.

func (*Client) NewProducer

func (c *Client) NewProducer(ctx context.Context, config ProducerConfig) (*Producer, error)

NewProducer creates a new Producer, returning after the connection has been made.

func (*Client) Topics added in v0.1.1

func (c *Client) Topics(namespace string) ([]*Topic, error)

Topics returns the topics of a namespace. Defaults to DefaultNamespace if no namespace is given.

type ClientOption

type ClientOption func(*clientConfig)

ClientOption ...

func WithLogger

func WithLogger(logger Logger) ClientOption

WithLogger sets a custom logger.

type Consumer

type Consumer interface {
	// Close closes the subscription and unregisters from the Client.
	Close() error

	AckMessage(*Message) error
	// ReadMessage reads and return the next message from the Pulsar.
	ReadMessage(context.Context) (*Message, error)
	SeekMessage(*Message) error
	// HasNext returns whether there is a message available to read
	HasNext() bool

	// LastMessageID returns the last message ID of the topic.
	// If the topic is empty, EntryId will be math.MaxUint64
	LastMessageID() (*MessageID, error)
}

Consumer provides a high-level API for consuming messages from Pulsar.

type ConsumerConfig

type ConsumerConfig struct {
	// The topic name to read messages from.
	Topic string

	// A regular expression for topics to read messages from.
	TopicPattern string

	// Interval in ms in which the client checks for topic changes
	// that match the set topic pattern and updates the subscriptions.
	// Default is 30000
	TopicPatternDiscoveryInterval int

	// A unique name for the subscription. If not specified, a random name
	// will be used.
	Subscription string

	// A unique name for the Consumer. If not specified, a random name
	// will be used.
	Name string

	// Select the subscription type to be used when subscribing to the topic.
	// Default is `Subscribe_Exclusive`
	Type SubscriptionType

	// Signal whether the subscription will initialize on latest
	// or earliest position.
	InitialPosition InitialPosition

	// If specified, the subscription will position the cursor
	// on the particular message id and will send messages from
	// that point.
	StartMessageID []byte

	// Include the message StartMessageID in the read messages.
	// If StartMessageID is not set but InitialPosition is set
	// to LatestPosition, the latest message ID of the topic
	// will be sent.
	StartMessageIDInclusive bool

	// Signal whether the subscription should be backed by a
	// durable cursor or not. For Readers, set to false, for
	// Consumers set Durable to true and specify a Subscription.
	// If Durable is true, StartMessageID will be ignored, as it
	// will be determined by the broker.
	Durable bool

	// If true, the subscribe operation will cause a topic to be
	// created if it does not exist already (and if topic auto-creation
	// is allowed by broker.
	// If false, the subscribe operation will fail if the topic
	// does not exist.
	ForceTopicCreation bool

	// MessageChannel sets a channel that receives all messages that the
	// consumer receives. If not set, a default channel for 1000 messages
	// will be created.
	MessageChannel chan *Message
}

ConsumerConfig is a configuration object used to create new instances of Consumer.

func (*ConsumerConfig) Validate

func (config *ConsumerConfig) Validate() error

Validate method validates the config properties.

type InitialPosition

type InitialPosition pb.CommandSubscribe_InitialPosition

InitialPosition ...

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

Logger ...

type Message

type Message struct {
	Body  []byte
	Topic string
	ID    *MessageID
	// contains filtered or unexported fields
}

Message is a data structure representing Pulsar messages.

type MessageID

type MessageID pb.MessageIdData

MessageID contains the ID of a message.

func (*MessageID) Marshal

func (id *MessageID) Marshal() ([]byte, error)

Marshal the ID.

func (*MessageID) Unmarshal

func (id *MessageID) Unmarshal(b []byte) error

Unmarshal the ID.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer provides a high-level API for sending messages to Pulsar.

func (*Producer) Close

func (p *Producer) Close() error

Close stops writing messages and unregisters from the Client.

func (*Producer) WriteMessage

func (p *Producer) WriteMessage(ctx context.Context, msg []byte) (*MessageID, error)

WriteMessage puts the message into the message queue, blocks until the message has been sent and an acknowledgement message is received from Pulsar.

func (*Producer) WriteMessageAsync

func (p *Producer) WriteMessageAsync(ctx context.Context, msg []byte) error

WriteMessageAsync puts the message into the message queue. If the message queue is full, this function will block until it can write to the queue. The queue size can be specified in the Producer options.

type ProducerConfig

type ProducerConfig struct {
	// The topic to write messages to.
	Topic string

	// The name of the producer.
	Name string

	// Limit on how many messages will be buffered before being sent as a batch.
	//
	// The default is a batch size of 100 messages.
	BatchSize int

	// Time limit on how often a batch that is not full yet will be flushed and
	// sent to Pulsar.
	//
	// The default is to flush every second.
	BatchTimeout time.Duration

	// Capacity of the internal producer message queue.
	//
	// The default is to use a queue capacity of 1000 messages.
	QueueCapacity int
}

ProducerConfig is a configuration object used to create new instances of Producer.

func (*ProducerConfig) Validate

func (config *ProducerConfig) Validate() error

Validate method validates the config properties.

type SubscriptionType

type SubscriptionType pb.CommandSubscribe_SubType

SubscriptionType ...

type Topic added in v0.1.1

type Topic struct {
	Domain       string
	Tenant       string
	Namespace    string
	LocalName    string
	CompleteName string
}

Topic represents a Pulsar Topic.

func NewTopic added in v0.1.1

func NewTopic(name string) (*Topic, error)

NewTopic creates a new topic struct from the given topic name. The topic name can be in short form or a fully qualified topic name.

Directories

Path Synopsis
Package pulsar_proto contains the Apache Pulsar Proto definitions.
Package pulsar_proto contains the Apache Pulsar Proto definitions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL