manage

package
v0.0.0-...-941bdb3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2020 License: Apache-2.0 Imports: 16 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrorInvalidSubMode = errors.New("invalid subscription mode")

ErrorInvalidSubMode When SubscriptionMode is not one of SubscriptionModeExclusive, SubscriptionModeShard, SubscriptionModeFailover

Functions

This section is empty.

Types

type Client

type Client struct {
	C         *conn.Conn
	AsyncErrs utils.AsyncErrors

	Dispatcher *frame.Dispatcher

	Subscriptions *sub.Subscriptions
	Connector     *conn.Connector
	Pinger        *srv.Pinger
	Discoverer    *srv.Discoverer
	Pubsub        *sub.Pubsub
}

Client is a Pulsar client, capable of sending and receiving messages and managing the associated state.

func NewClient

func NewClient(cfg ClientConfig) (*Client, error)

NewClient returns a Pulsar client for the given configuration options.

func (*Client) Close

func (c *Client) Close() error

Close closes the connection. The channel returned from `Closed` will unblock. The client should no longer be used after calling Close.

func (*Client) Closed

func (c *Client) Closed() <-chan struct{}

Closed returns a channel that unblocks when the client's connection has been closed and is no longer usable. Users should monitor this channel and recreate the Client if closed. TODO: Rename to Done

func (*Client) Connect

func (c *Client) Connect(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)

Connect sends a Connect message to the Pulsar server, then waits for either a CONNECTED response or the context to timeout. Connect should be called immediately after creating a client, before sending any other messages. The "auth method" is not set in the CONNECT message. See ConnectTLS for TLS auth method. The proxyBrokerURL may be blank, or it can be used to indicate that the client is connecting through a proxy server. See "Connection establishment" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Connectionestablishment-6pslvw

func (*Client) ConnectTLS

func (c *Client) ConnectTLS(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)

ConnectTLS sends a Connect message to the Pulsar server, then waits for either a CONNECTED response or the context to timeout. Connect should be called immediately after creating a client, before sending any other messages. The "auth method" is set to tls in the CONNECT message. The proxyBrokerURL may be blank, or it can be used to indicate that the client is connecting through a proxy server. See "Connection establishment" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Connectionestablishment-6pslvw

func (*Client) LookupTopic

func (c *Client) LookupTopic(ctx context.Context, topic string, authoritative bool) (*api.CommandLookupTopicResponse, error)

LookupTopic returns metadata about the given topic. Topic lookup needs to be performed each time a client needs to create or reconnect a producer or a consumer. Lookup is used to discover which particular broker is serving the topic we are about to use.

The command has to be used in a connection that has already gone through the Connect / Connected initial handshake. See "Topic lookup" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Topiclookup-rxds6i

func (*Client) NewExclusiveConsumer

func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, earliest bool, queue chan msg.Message) (*sub.Consumer, error)

NewExclusiveConsumer creates a new exclusive consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl

func (*Client) NewFailoverConsumer

func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queue chan msg.Message) (*sub.Consumer, error)

NewFailoverConsumer creates a new failover consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl

func (*Client) NewProducer

func (c *Client) NewProducer(ctx context.Context, topic, producerName string) (*pub.Producer, error)

NewProducer creates a new producer capable of sending message to the given topic.

func (*Client) NewSharedConsumer

func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queue chan msg.Message) (*sub.Consumer, error)

NewSharedConsumer creates a new shared consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping sends a PING message to the Pulsar server, then waits for either a PONG response or the context to timeout.

type ClientConfig

type ClientConfig struct {
	Addr string // pulsar broker address. May start with pulsar://

	DialTimeout time.Duration // timeout to use when establishing TCP connection
	TLSConfig   *tls.Config   // TLS configuration. May be nil, in which case TLS will not be used
	Errs        chan<- error  // asynchronous errors will be sent here. May be nil

	PingFrequency         time.Duration // how often to PING server
	PingTimeout           time.Duration // how long to wait for PONG response
	ConnectTimeout        time.Duration // how long to wait for CONNECTED response
	InitialReconnectDelay time.Duration // how long to initially wait to reconnect Client
	MaxReconnectDelay     time.Duration // maximum time to wait to attempt to reconnect Client

	AuthMethod string
	AuthData   []byte
	// contains filtered or unexported fields
}

ClientConfig is used to configure a Pulsar client.

func (ClientConfig) ConnAddr

func (c ClientConfig) ConnAddr() string

ConnAddr returns the address that should be used for the TCP connection. It defaults to phyAddr if set, otherwise Addr. This is to support the proxying through a broker, as determined during topic lookup.

func (ClientConfig) SetDefaults

func (c ClientConfig) SetDefaults() ClientConfig

setDefaults returns a modified config with appropriate zero values set to defaults.

type ClientPool

type ClientPool struct {
	// contains filtered or unexported fields
}

ClientPool provides a thread-safe store for ManagedClients, based on their address.

func NewClientPool

func NewClientPool() *ClientPool

NewClientPool initializes a ClientPool.

func (*ClientPool) ForTopic

func (m *ClientPool) ForTopic(ctx context.Context, cfg ClientConfig, topic string) (*ManagedClient, error)

ForTopic performs topic lookup for the given topic and returns the ManagedClient for the discovered topic information. https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Topiclookup-6g0lo incubator-pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java

func (*ClientPool) Get

func (m *ClientPool) Get(cfg ClientConfig) *ManagedClient

Get returns the ManagedClient for the given client configuration. First the cache is checked for an existing client. If one doesn't exist, a new one is created and cached, then returned.

func (*ClientPool) Partitions

type ConsumerConfig

type ConsumerConfig struct {
	ClientConfig

	Topic     string
	Name      string           // subscription name
	SubMode   SubscriptionMode // SubscriptionMode
	Earliest  bool             // if true, subscription cursor set to beginning
	QueueSize int              // number of messages to buffer before dropping messages

	NewConsumerTimeout    time.Duration // maximum duration to create Consumer, including topic lookup
	InitialReconnectDelay time.Duration // how long to initially wait to reconnect Producer
	MaxReconnectDelay     time.Duration // maximum time to wait to attempt to reconnect Producer
}

ConsumerConfig is used to configure a ManagedConsumer.

func (ConsumerConfig) SetDefaults

func (m ConsumerConfig) SetDefaults() ConsumerConfig

SetDefaults returns a modified config with appropriate zero values set to defaults.

type ManagedClient

type ManagedClient struct {
	// contains filtered or unexported fields
}

ManagedClient wraps a Client with re-connect and connection management logic.

func NewManagedClient

func NewManagedClient(cfg ClientConfig) *ManagedClient

NewManagedClient returns a ManagedClient for the given address. The Client will be created and monitored in the background.

func (*ManagedClient) Done

func (m *ManagedClient) Done() <-chan struct{}

Done returns a channel that unblocks when the ManagedClient has been closed.

func (*ManagedClient) Get

func (m *ManagedClient) Get(ctx context.Context) (*Client, error)

Get returns the managed Client in a thread-safe way. If the client is temporarily unavailable, Get will block until either it becomes available or the context expires.

There is no guarantee that the returned Client will be connected or stay connected.

func (*ManagedClient) Stop

func (m *ManagedClient) Stop() error

Stop closes the Client if possible, and/or stops it from re-connecting. The ManagedClient shouldn't be used after calling Stop.

type ManagedConsumer

type ManagedConsumer struct {
	// contains filtered or unexported fields
}

ManagedConsumer wraps a Consumer with reconnect logic.

func NewManagedConsumer

func NewManagedConsumer(cp *ClientPool, cfg ConsumerConfig) *ManagedConsumer

NewManagedConsumer returns an initialized ManagedConsumer. It will create and recreate a Consumer for the given discovery address and topic on a background goroutine.

func (*ManagedConsumer) Ack

func (m *ManagedConsumer) Ack(ctx context.Context, msg msg.Message) error

Ack acquires a consumer and Sends an ACK message for the given message.

func (*ManagedConsumer) Close

func (m *ManagedConsumer) Close(ctx context.Context) error

Close consumer

func (*ManagedConsumer) Consumer

func (m *ManagedConsumer) Consumer(ctx context.Context) *sub.Consumer

func (*ManagedConsumer) ConsumerID

func (m *ManagedConsumer) ConsumerID(ctx context.Context) uint64

ConsumerID returns current consumer's id

func (*ManagedConsumer) Monitor

func (m *ManagedConsumer) Monitor() func()

Monitor a scoped deferrable lock

func (*ManagedConsumer) Receive

func (m *ManagedConsumer) Receive(ctx context.Context) (msg.Message, error)

Receive returns a single Message, if available. A reasonable context should be provided that will be used to wait for an incoming message if none are available.

func (*ManagedConsumer) ReceiveAsync

func (m *ManagedConsumer) ReceiveAsync(ctx context.Context, msgs chan<- msg.Message) error

ReceiveAsync blocks until the context is done. It continuously reads messages from the consumer and Sends them to the provided channel. It manages flow control internally based on the queue size.

func (*ManagedConsumer) RedeliverOverflow

func (m *ManagedConsumer) RedeliverOverflow(ctx context.Context) (int, error)

RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that were dropped because of full message buffer. Note that for all subscription types other than `shared`, _all_ unacknowledged messages will be redelivered. https://github.com/apache/incubator-pulsar/issues/2003

func (*ManagedConsumer) RedeliverUnacknowledged

func (m *ManagedConsumer) RedeliverUnacknowledged(ctx context.Context) error

RedeliverUnacknowledged sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that have not been acked.

func (*ManagedConsumer) Unactive

func (m *ManagedConsumer) Unactive() bool

Unactive returns consumer's Unactive

func (*ManagedConsumer) Unsubscribe

func (m *ManagedConsumer) Unsubscribe(ctx context.Context) error

Unsubscribe the consumer from its topic.

type ManagedProducer

type ManagedProducer struct {
	ClientPool *ClientPool
	Cfg        ProducerConfig
	AsyncErrs  utils.AsyncErrors

	Mu       sync.RWMutex  // protects following
	Producer *pub.Producer // either producer is nil and wait isn't or vice versa
	Waitc    chan struct{} // if producer is nil, this will unblock when it's been re-set
}

ManagedProducer wraps a Producer with re-connect logic.

func NewManagedProducer

func NewManagedProducer(cp *ClientPool, cfg ProducerConfig) *ManagedProducer

NewManagedProducer returns an initialized ManagedProducer. It will create and re-create a Producer for the given discovery address and topic on a background goroutine.

func (*ManagedProducer) Close

func (m *ManagedProducer) Close(ctx context.Context) error

Close producer

func (*ManagedProducer) Monitor

func (m *ManagedProducer) Monitor() func()

Monitor a scoped deferrable lock

func (*ManagedProducer) NewProducer

func (m *ManagedProducer) NewProducer(ctx context.Context) (*pub.Producer, error)

NewProducer attempts to create a Producer.

func (*ManagedProducer) Reconnect

func (m *ManagedProducer) Reconnect(initial bool) *pub.Producer

Reconnect blocks while a new Producer is created.

func (*ManagedProducer) Send

func (m *ManagedProducer) Send(ctx context.Context, payload []byte) (*api.CommandSendReceipt, error)

Send attempts to use the Producer's Send method if available. If not available, an error is returned.

func (*ManagedProducer) Set

func (m *ManagedProducer) Set(p *pub.Producer)

Set unblocks the "wait" channel (if not nil), and sets the producer under lock.

func (*ManagedProducer) Unset

func (m *ManagedProducer) Unset()

Unset creates the "wait" channel (if nil), and sets the producer to nil under lock.

type ProducerConfig

type ProducerConfig struct {
	ClientConfig

	Topic string
	Name  string

	NewProducerTimeout    time.Duration // maximum duration to create Producer, including topic lookup
	InitialReconnectDelay time.Duration // how long to initially wait to reconnect Producer
	MaxReconnectDelay     time.Duration // maximum time to wait to attempt to reconnect Producer
}

ProducerConfig is used to configure a ManagedProducer.

type SubscriptionMode

type SubscriptionMode int

SubscriptionMode represents Pulsar's three subscription models

const (
	// SubscriptionModeExclusive , only one consumer can be bound to a subscription.
	// If more than one consumer attempts to subscribe to the topic in the same way,
	// the consumer will receive an error.
	SubscriptionModeExclusive SubscriptionMode = iota + 1 // 1
	// SubscriptionModeShard In shared or round robin mode,
	// multiple consumers can be bound to the same subscription.
	// Messages are distributed to different consumers via the round robin polling mechanism,
	// and each message is only distributed to one consumer.
	// When the consumer disconnects, all messages sent to him
	// but not confirmed will be rescheduled and distributed to other surviving consumers.
	SubscriptionModeShard // 2

	// SubscriptionModeFailover multiple consumers can be bound to the same subscription.
	// Consumers will be sorted in lexicographic order,
	// and the first consumer is initialized to the only consumer who accepts the message.
	// This consumer is called the master consumer.
	// When the master consumer is disconnected,
	// all messages (unconfirmed and subsequently entered) will be distributed to the next consumer in the queue.
	SubscriptionModeFailover // 3
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL