gosd

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2022 License: MIT Imports: 5 Imported by: 0

README

gosd

Mentioned in Awesome Go Build Status Go Report Card codecov

go-schedulable-dispatcher (gosd), is a library for scheduling when to dispatch a message to a channel.

Implementation

The implementation provides an interactive API to handle scheduling messages with a dispatcher. Messages are ingested and processed into a heap based priority queue. Order is not guaranteed by default when messages have the same scheduled time, but can be changed through the config. By guaranteeing order, performance will be slightly worse. If strict-ordering isn't critical to your application, it's recommended to keep the default setting.

Example

// create instance of dispatcher
dispatcher, err := gosd.NewDispatcher[string](&gosd.DispatcherConfig{
    IngressChannelSize:  100,
    DispatchChannelSize: 100,
    MaxMessages:         100,
    GuaranteeOrder:      false,
})
checkErr(err)

// spawn process
go dispatcher.Start()

// schedule a message
dispatcher.IngressChannel() <- &gosd.ScheduledMessage[string]{
    At:      time.Now().Add(1 * time.Second),
    Message: "Hello World in 1 second!",
}

// wait for the message
msg := <-dispatcher.DispatchChannel()

fmt.Println(msg)
// Hello World in 1 second!

// shutdown without deadline
dispatcher.Shutdown(context.Background(), false)

More examples under examples.

Benchmarking

Tested with Go 1.19 and 1000 messages per iteration.

goos: windows
goarch: amd64
pkg: github.com/alexsniffin/gosd/v2
cpu: Intel(R) Core(TM) i7-8700K CPU @ 3.70GHz
Benchmark_integration_unordered
Benchmark_integration_unordered-12                           307           3690528 ns/op
Benchmark_integration_unorderedSmallBuffer
Benchmark_integration_unorderedSmallBuffer-12                274           4120104 ns/op
Benchmark_integration_unorderedSmallHeap
Benchmark_integration_unorderedSmallHeap-12                  348           3452703 ns/op
Benchmark_integration_ordered
Benchmark_integration_ordered-12                             135           8650709 ns/op
Benchmark_integration_orderedSmallBuffer
Benchmark_integration_orderedSmallBuffer-12                  207           5867338 ns/op
Benchmark_integration_orderedSmallHeap
Benchmark_integration_orderedSmallHeap-12                    350           3592990 ns/op
Benchmark_integration_orderedSameTime
Benchmark_integration_orderedSameTime-12                     133           8909311 ns/op

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher[T any] struct {
	// contains filtered or unexported fields
}

Dispatcher processes the ingress and dispatching of scheduled messages.

func NewDispatcher

func NewDispatcher[T any](config *DispatcherConfig) (*Dispatcher[T], error)

NewDispatcher creates a new instance of a Dispatcher.

func (*Dispatcher[T]) DispatchChannel

func (d *Dispatcher[T]) DispatchChannel() <-chan T

DispatchChannel returns a receive-only channel of type `T`.

func (*Dispatcher[T]) IngressChannel

func (d *Dispatcher[T]) IngressChannel() chan<- *ScheduledMessage[T]

IngressChannel returns the send-only channel of type `ScheduledMessage`.

func (*Dispatcher[T]) Pause

func (d *Dispatcher[T]) Pause() error

Pause updates the state of the Dispatcher to stop processing messages and will close the main process loop.

func (*Dispatcher[T]) Resume

func (d *Dispatcher[T]) Resume() error

Resume updates the state of the Dispatcher to start processing messages and starts the timer for the last message being processed and blocks.

func (*Dispatcher[T]) Shutdown

func (d *Dispatcher[T]) Shutdown(ctx context.Context, drainImmediately bool) error

Shutdown will attempt to shutdown the Dispatcher within the context deadline, otherwise terminating the process ungracefully.

If drainImmediately is true, then all messages will be dispatched immediately regardless of the schedule set. Order can be lost if new messages are still being ingested.

func (*Dispatcher[T]) Start

func (d *Dispatcher[T]) Start() error

Start initializes the processing of scheduled messages and blocks.

type DispatcherConfig

type DispatcherConfig struct {
	IngressChannelSize  int
	DispatchChannelSize int
	MaxMessages         int
	GuaranteeOrder      bool
}

DispatcherConfig config for creating an instance of a Dispatcher

type ScheduledMessage

type ScheduledMessage[T any] struct {
	At      time.Time
	Message T
}

ScheduledMessage is a message to schedule with the Dispatchers ingest channel `At` is when the message will dispatched `Message` is the content of the message

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL