scheduler

package
v1.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: BSD-3-Clause Imports: 15 Imported by: 0

README

Goka Scheduler

This tool implements a distributed scheduler fully based on goka.

Use it to delay events or throttle events. The basic input is an "order", that is either a Delay or a Throttle. Each order contains a "payload", i.e. a target topic, key and message that are being send when the delay is expired.

Delay

Send event after certain delay, each order is unique, so there's no throttling

Throttle

Send event after a delay, but intermediate orders with the same key are ignored. A key is made of

  • target topic
  • target key
  • a variant that allows to "seed" the message to avoid unwanted collisions and thus throttling of events.

There are two versions:

  • ThrottleFirst will send the first order's payload and will ignore following orders
  • ThrottleLast will send the last order's payload, so it will keep updating the target payload.

Testing

The scheduler cannot be tested using goka's builtin tester-feature, so instead there are tests that require a running kafka-cluster. The easiest way is to run the kafka-cluster using docker-compose, provided in goka/examples. Then run make full-test to include the integration-tests that need kafka.

Example

TODO

Client usage

TODO

Intervallers

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	OrderType_name = map[int32]string{
		0: "Invalid",
		1: "Delay",
		2: "ThrottleFirst",
		3: "ThrottleFirstReschedule",
	}
	OrderType_value = map[string]int32{
		"Invalid":                 0,
		"Delay":                   1,
		"ThrottleFirst":           2,
		"ThrottleFirstReschedule": 3,
	}
)

Enum value maps for OrderType.

View Source
var File_order_proto protoreflect.FileDescriptor

Functions

func NewOrderCodec

func NewOrderCodec() goka.Codec

NewOrderCodec creates a new codec for encoding/decoding a Order

func NewWaitCodec

func NewWaitCodec() goka.Codec

NewWaitCodec creates a new codec for encoding/decoding a Wait

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config configures the running scheduler

func NewConfig

func NewConfig() *Config

NewConfig creates a new config for

func (*Config) WithMxExecuteRoundTrips

func (c *Config) WithMxExecuteRoundTrips(h Observe) *Config

WithMxExecuteRoundTrips sets an observer for measuring the number of round trips an order has done before finally being executed. This will usually be a histogram or summary

func (*Config) WithMxExecutionDropped

func (c *Config) WithMxExecutionDropped(cnt Count) *Config

WithMxExecutionDropped adds a counter to track dropped executions due to retention or rescheduling issues.

func (*Config) WithMxExecutionTimeDeviation

func (c *Config) WithMxExecutionTimeDeviation(h Observe) *Config

WithMxExecutionTimeDeviation sets an observer for measuring the seconds of deviation between planned execution and actual execution. This will usually be a histogram or summary. Times can also be negative in case there are no waiters small enough for the last iteration, which means the order will be executed before its actual deadline.

func (*Config) WithMxPlaceOrderLag

func (c *Config) WithMxPlaceOrderLag(o Observe) *Config

WithMxPlaceOrderLag sets an observer for measuring the lag in seconds for order placement

func (*Config) WithMxRescheduled

func (c *Config) WithMxRescheduled(cnt CountForType) *Config

WithMxRescheduled sets a counter for measuring the number of reschedules in total

func (*Config) WithMxThrottleDuplicate

func (c *Config) WithMxThrottleDuplicate(cnt Count) *Config

WithMxThrottleDuplicate sets a counter for measuring the number of duplicates/throttles for a throttling order

func (*Config) WithMxThrottleFirstRescheduled

func (c *Config) WithMxThrottleFirstRescheduled(cnt Count) *Config

WithMxThrottleFirstRescheduled sets a counter for measuring the number of reschedules for orders configured with ThrottleFirstReschedule

func (*Config) WithMxZombieEvicted

func (c *Config) WithMxZombieEvicted(cnt Count) *Config

WithMxZombieEvicted sets a counter for measuring the number of zombie orders being evicted.

func (*Config) WithOrderCatchupTimeout

func (c *Config) WithOrderCatchupTimeout(timeout time.Duration) *Config

WithOrderCatchupTimeout sets a counter for measuring the number of catchups after restarting the scheduler with existing delay-orders

func (*Config) WithOrderZombieTTL

func (c *Config) WithOrderZombieTTL(ttl time.Duration) *Config

WithOrderZombieTTL sets the order zombie ttl

type Count

type Count func(float64)

Count allows to add values and calculate a rate

type CountForType

type CountForType func(string, float64)

CountForType allows to count events belonging to a specific type

type Emitter

type Emitter interface {
	Emit(key string, msg interface{}) (*goka.Promise, error)
	Finish() error
}

Emitter is a generic emitter, but probably a *goka.Emitter Note that the emitter must be closed (by calling Finish) by the creator, the scheduler is not capable of closing the emitters, because it does not have an internal state.

type EmitterCreator

type EmitterCreator func(goka.Stream, goka.Codec) (Emitter, error)

EmitterCreator is a callback that will be used by the scheduler to create emitters on the fly while receiving orders.

type Observe

type Observe func(float64)

Observe metric allows to observe multiple values, e.g. a histogram or a summary

type Order

type Order struct {
	Payload   *Order_Payload `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
	OrderType OrderType      `` /* 143-byte string literal not displayed */
	DelayMs   int64          `protobuf:"varint,3,opt,name=delay_ms,json=delayMs,proto3" json:"delay_ms,omitempty"`
	// marks the next or last execution time of the order.
	ExecutionTime *timestamp.Timestamp `protobuf:"bytes,4,opt,name=execution_time,json=executionTime,proto3" json:"execution_time,omitempty"`
	NoCatchup     bool                 `protobuf:"varint,5,opt,name=no_catchup,json=noCatchup,proto3" json:"no_catchup,omitempty"`
	// contains filtered or unexported fields
}

func CreateDelayedOrderWithPrefix

func CreateDelayedOrderWithPrefix(targetKey []byte, targetTopic string, variant string, message []byte, orderType OrderType, delay time.Duration) (string, *Order, error)

CreateDelayedOrderWithPrefix allows to create an order to be sent to the scheduler. The order's destination time is defined by a delay, so there will be an implicit catchup.

func (*Order) Descriptor deprecated

func (*Order) Descriptor() ([]byte, []int)

Deprecated: Use Order.ProtoReflect.Descriptor instead.

func (*Order) GetDelayMs

func (x *Order) GetDelayMs() int64

func (*Order) GetExecutionTime

func (x *Order) GetExecutionTime() *timestamp.Timestamp

func (*Order) GetNoCatchup

func (x *Order) GetNoCatchup() bool

func (*Order) GetOrderType

func (x *Order) GetOrderType() OrderType

func (*Order) GetPayload

func (x *Order) GetPayload() *Order_Payload

func (*Order) ProtoMessage

func (*Order) ProtoMessage()

func (*Order) ProtoReflect

func (x *Order) ProtoReflect() protoreflect.Message

func (*Order) Reset

func (x *Order) Reset()

func (*Order) String

func (x *Order) String() string

type OrderClient

type OrderClient interface {
	// NewOrder creates a new order
	NewOrder(targetKey []byte, targetTopic string, variant string, message []byte, orderType OrderType, delay time.Duration) (string, *Order, error)

	// OrderEdge returns the group graph edge that can be used to client procesors
	// to be able to place orders into the scheduler. Like this:
	// goka.NewProcessor(..., goka.DefineGroup(..., om.OrderEdge()))
	OrderEdge() goka.Edge

	// OrderTopic returns the topic to be used for the emitting orders to the scheduler.
	OrderTopic() goka.Stream
}

OrderClient helps communicating with a scheduler to place orders

func NewOrderClient

func NewOrderClient(topicPrefix string) OrderClient

type OrderCodec

type OrderCodec struct{}

OrderCodec allows to marshal and unmarshal items of type Order

func (*OrderCodec) Decode

func (c *OrderCodec) Decode(data []byte) (interface{}, error)

Decode provides unmarshals a Order json into the struct.

func (*OrderCodec) Encode

func (c *OrderCodec) Encode(value interface{}) ([]byte, error)

Encode marshals a Order

type OrderType

type OrderType int32
const (
	// invalid to avoid default type misunderstandings
	OrderType_Invalid OrderType = 0
	// simple delay
	OrderType_Delay OrderType = 1
	// delay ignoring intermediate orders
	// Send the first order's value
	OrderType_ThrottleFirst OrderType = 2
	// delay taking the shortest delay while waiting for the event.
	// delays in future will be ignored and if the wait gets rescheduled,
	// the later executions will get dropped
	OrderType_ThrottleFirstReschedule OrderType = 3
)

func (OrderType) Descriptor

func (OrderType) Descriptor() protoreflect.EnumDescriptor

func (OrderType) Enum

func (x OrderType) Enum() *OrderType

func (OrderType) EnumDescriptor deprecated

func (OrderType) EnumDescriptor() ([]byte, []int)

Deprecated: Use OrderType.Descriptor instead.

func (OrderType) Number

func (x OrderType) Number() protoreflect.EnumNumber

func (OrderType) String

func (x OrderType) String() string

func (OrderType) Type

type Order_Payload

type Order_Payload struct {
	Topic   string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
	Key     []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
	Message []byte `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
	// contains filtered or unexported fields
}

func (*Order_Payload) Descriptor deprecated

func (*Order_Payload) Descriptor() ([]byte, []int)

Deprecated: Use Order_Payload.ProtoReflect.Descriptor instead.

func (*Order_Payload) GetKey

func (x *Order_Payload) GetKey() []byte

func (*Order_Payload) GetMessage

func (x *Order_Payload) GetMessage() []byte

func (*Order_Payload) GetTopic

func (x *Order_Payload) GetTopic() string

func (*Order_Payload) ProtoMessage

func (*Order_Payload) ProtoMessage()

func (*Order_Payload) ProtoReflect

func (x *Order_Payload) ProtoReflect() protoreflect.Message

func (*Order_Payload) Reset

func (x *Order_Payload) Reset()

func (*Order_Payload) String

func (x *Order_Payload) String() string

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler implements a scheduler

Example
// create new scheduler config
cfg := NewConfig()
// e.g. replace selected metrics to be exported
cfg.WithMxZombieEvicted(func(value float64) {
	// handle metric
})

sched := CreateScheduler(cfg,
	[]time.Duration{
		1 * time.Hour,
		30 * time.Minute,
		1 * time.Minute,
		10 * time.Second,
		3 * time.Second,
	},
	func(topic goka.Stream, codec goka.Codec) (Emitter, error) {
		return goka.NewEmitter([]string{"localhost:9092"},
			topic,
			codec)
	},
	"my-scheduler", // some prefix if we have multiple schedulers running in the same kafka cluster
)

errg, ctx := multierr.NewErrGroup(context.Background())

for _, graph := range sched.CreateGraphs() {
	proc, err := goka.NewProcessor([]string{"localhost:9092"}, graph)
	if err != nil {
		log.Fatalf("Error creating processor for graph %#v: %v", graph, err)
	}
	errg.Go(func() error {
		return proc.Run(ctx)
	})
}

if err := errg.Wait().ErrorOrNil(); err != nil {
	log.Printf("Error running scheduler: %v", err)
}

if err := sched.Close(); err != nil {
	log.Printf("Error closing scheduler: %v", err)
}
Output:

func CreateScheduler

func CreateScheduler(config *Config, waitIntervals []time.Duration, creator EmitterCreator, topicPrefix string) *Scheduler

CreateScheduler creates a scheduler type that provides the group graphs for all included components. To be independent of the processor handling boilerplate code, creating the group graphs is sufficient waitIntervals is a slice of time.Duration, creating a wait-processor for each wait interval. Note that the duration will be truncated to milliseconds and the list deduplicated automatically.

func (*Scheduler) Close

func (s *Scheduler) Close() error

Close finishes all created emitters. Note that the processors are not stopped, because it is started by the client

func (*Scheduler) CreateGraphs

func (s *Scheduler) CreateGraphs() []*goka.GroupGraph

CreateGraphs creates the group graphs for all components used for the scheduler: one for the scheduler itself (that does the deduplication and store the payload) one for each waiter, i.e. one for each interval.

type Wait

type Wait struct {
	ExecutionTime  *timestamp.Timestamp `protobuf:"bytes,1,opt,name=execution_time,json=executionTime,proto3" json:"execution_time,omitempty"`
	EnterQueueTime *timestamp.Timestamp `protobuf:"bytes,3,opt,name=enter_queue_time,json=enterQueueTime,proto3" json:"enter_queue_time,omitempty"`
	Iterations     int32                `protobuf:"varint,2,opt,name=iterations,proto3" json:"iterations,omitempty"`
	// contains filtered or unexported fields
}

func (*Wait) Descriptor deprecated

func (*Wait) Descriptor() ([]byte, []int)

Deprecated: Use Wait.ProtoReflect.Descriptor instead.

func (*Wait) GetEnterQueueTime

func (x *Wait) GetEnterQueueTime() *timestamp.Timestamp

func (*Wait) GetExecutionTime

func (x *Wait) GetExecutionTime() *timestamp.Timestamp

func (*Wait) GetIterations

func (x *Wait) GetIterations() int32

func (*Wait) Inc

func (w *Wait) Inc() *Wait

Inc increases the iterations counter

func (*Wait) ProtoMessage

func (*Wait) ProtoMessage()

func (*Wait) ProtoReflect

func (x *Wait) ProtoReflect() protoreflect.Message

func (*Wait) Reset

func (x *Wait) Reset()

func (*Wait) String

func (x *Wait) String() string

type WaitCodec

type WaitCodec struct{}

WaitCodec allows to marshal and unmarshal items of type Wait

func (*WaitCodec) Decode

func (c *WaitCodec) Decode(data []byte) (interface{}, error)

Decode provides unmarshals a Wait json into the struct.

func (*WaitCodec) Encode

func (c *WaitCodec) Encode(value interface{}) ([]byte, error)

Encode marshals a Wait

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL