scheduler

package
v0.0.0-...-f0734c7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2022 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PublishScheduler

type PublishScheduler struct {
	// Settings passed down to each bundler that gets created.
	DelayThreshold time.Duration
	// Once a bundle has this many items, handle the bundle. Since only one
	// item at a time is added to a bundle, no bundle will exceed this
	// threshold, so it also serves as a limit. The default is
	// DefaultBundleCountThreshold.
	BundleCountThreshold int
	// Once the number of bytes in current bundle reaches this threshold, handle
	// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
	// but does not cap the total size of a bundle.
	BundleByteThreshold int
	// The maximum number of bytes that the Bundler will keep in memory before
	// returning ErrOverflow. The default is DefaultBufferedByteLimit.
	BundleByteLimit   int
	BufferedByteLimit int
	// contains filtered or unexported fields
}

PublishScheduler is a batch-scheduler which is designed for Pub/Sub's Publish flow. It bundles items before handling them. All items in this PublishScheduler use the same handler.

Each item is added with a given key. Items added to the empty string key are handled in random order. Items added to any other key are handled sequentially.

func NewPublishScheduler

func NewPublishScheduler(workers int, handle func(bundle interface{})) *PublishScheduler

NewPublishScheduler returns a new PublishScheduler.

The workers arg is the number of workers that will operate on the queue of work. A reasonably large number of workers is highly recommended. If the workers arg is 0, then a healthy default of 10 workers is used.

The batch-scheduler does not use a parent context. If it did, canceling that context would immediately stop the batch-scheduler without waiting for undelivered messages.

The batch-scheduler should be stopped only with FlushAndStop.

func (*PublishScheduler) Add

func (s *PublishScheduler) Add(key string, item interface{}, size int) error

Add adds an item to the batch-scheduler at a given key.

Add never blocks. Buffering happens in the batch-scheduler's publishers. There is no flow control.

Since ordered keys require only a single outstanding RPC at once, it is possible to send ordered key messages to Topic.Publish (and subsequently to PublishScheduler.Add) faster than the bundler can publish them to the Pub/Sub service, resulting in a backed up queue of Pub/Sub bundles. Each item in the bundler queue is a goroutine.

func (*PublishScheduler) FlushAndStop

func (s *PublishScheduler) FlushAndStop()

FlushAndStop begins flushing items from bundlers and from the batch-scheduler. It blocks until all items have been flushed.

func (*PublishScheduler) IsPaused

func (s *PublishScheduler) IsPaused(orderingKey string) bool

IsPaused checks if the bundler associated with an ordering keys is paused.

func (*PublishScheduler) Pause

func (s *PublishScheduler) Pause(orderingKey string)

Pause pauses the bundler associated with the provided ordering key, preventing it from accepting new messages. Any outstanding messages that haven't been published will error. If orderingKey is empty, this is a no-op.

func (*PublishScheduler) Resume

func (s *PublishScheduler) Resume(orderingKey string)

Resume resumes accepting message with the provided ordering key.

type ReceiveScheduler

type ReceiveScheduler struct {
	// contains filtered or unexported fields
}

ReceiveScheduler is a batch-scheduler which is designed for Pub/Sub's Receive flow.

Each item is added with a given key. Items added to the empty string key are handled in random order. Items added to any other key are handled sequentially.

func NewReceiveScheduler

func NewReceiveScheduler(workers int) *ReceiveScheduler

NewReceiveScheduler creates a new ReceiveScheduler.

The workers arg is the number of concurrent calls to handle. If the workers arg is 0, then a healthy default of 10 workers is used. If less than 0, this will be set to an large number, similar to PublishScheduler's handler limit.

func (*ReceiveScheduler) Add

func (s *ReceiveScheduler) Add(key string, item interface{}, handle func(item interface{})) error

Add adds the item to be handled. Add may block.

Buffering happens above the ReceiveScheduler in the form of a flow controller that requests batches of messages to pull. A backed up ReceiveScheduler.Add call causes pushback to the pubsub service (less Receive calls on the long-lived stream), which keeps memory footprint stable.

func (*ReceiveScheduler) Shutdown

func (s *ReceiveScheduler) Shutdown()

Shutdown begins flushing messages and stops accepting new Add calls. Shutdown does not block, or wait for all messages to be flushed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL