pulsar

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithDeliverAfter

func WithDeliverAfter(deliverAfter time.Duration) queue.CallOptions

func WithDeliverAt

func WithDeliverAt(deliverAt time.Time) queue.CallOptions

func WithMessage

func WithMessage(message Message) queue.CallOptions

func WithOrderingKey

func WithOrderingKey(OrderingKey string) queue.CallOptions

func WithSync

func WithSync() queue.CallOptions

Types

type Conf

type Conf struct {
	Brokers          []string
	Topic            string
	SubscriptionName string
	Conns            int `json:",default=1"`
	Processors       int `json:",default=8"`
}

type Message

type Message struct {
	// Payload for the message
	Payload []byte

	// Value and payload is mutually exclusive, `Value interface{}` for schema message.
	Value interface{}

	// Key sets the key of the message for routing policy
	Key string

	// OrderingKey sets the ordering key of the message
	OrderingKey string

	// Properties attach application defined properties on the message
	Properties map[string]string

	// EventTime set the event time for a given message
	// By default, messages don't have an event time associated, while the publish
	// time will be be always present.
	// Set the event time to a non-zero timestamp to explicitly declare the time
	// that the event "happened", as opposed to when the message is being published.
	EventTime time.Time

	// ReplicationClusters override the replication clusters for this message.
	ReplicationClusters []string

	// DisableReplication disables the replication for this message
	DisableReplication bool

	// SequenceID sets the sequence id to assign to the current message
	SequenceID *int64

	// DeliverAfter requests to deliver the message only after the specified relative delay.
	// Note: messages are only delivered with delay when a consumer is consuming
	//     through a `SubscriptionType=Shared` subscription. With other subscription
	//     types, the messages will still be delivered immediately.
	DeliverAfter time.Duration

	// DeliverAt delivers the message only at or after the specified absolute timestamp.
	// Note: messages are only delivered with delay when a consumer is consuming
	//     through a `SubscriptionType=Shared` subscription. With other subscription
	//     types, the messages will still be delivered immediately.
	DeliverAt time.Time
}

type PushOption

type PushOption func(options *chunkOptions)

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

func WithFlushInterval

func WithFlushInterval(interval time.Duration) PushOption

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher

func (*Pusher) Close

func (p *Pusher) Close() error

func (*Pusher) Name

func (p *Pusher) Name() string

func (*Pusher) Push

func (p *Pusher) Push(ctx context.Context, k, v []byte, opts ...queue.CallOptions) (
	interface{}, error)

type QueueOption

type QueueOption func(*queueOptions)

func WithCommitInterval

func WithCommitInterval(interval time.Duration) QueueOption

func WithMaxWait

func WithMaxWait(wait time.Duration) QueueOption

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) QueueOption

type Queues

type Queues struct {
	// contains filtered or unexported fields
}

func MustNewQueue

func MustNewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) *Queues

func NewQueue

func NewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) (*Queues, error)

func (Queues) Start

func (q Queues) Start()

func (Queues) Stop

func (q Queues) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL