core

package
v0.0.0-...-7bbab05 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: Apache-2.0 Imports: 9 Imported by: 3

Documentation

Index

Constants

View Source
const (
	//Resize is signal used to trigger resize of a Running Channel
	Resize ControlSignal = 1
	//Stop is signenal used to Stop Dmux
	Stop ControlSignal = 2

	//Sucess response code for ControlSignal Action
	Sucess uint8 = 1
	//Failed response code for ControlSignal Action
	Failed uint8 = 2
)
View Source
const SidelineMessage string = "sidelining the message"

Variables

This section is empty.

Functions

func Contains

func Contains(a []int, x int) bool

Contains method implements existence of an integer in an array

func Encode

func Encode(data [][]byte) []byte

Encode function is used to convert 2d byte[] to 1d byte[] This function uses the following encoding scheme first 4 bytes = batch Size now for every batch first 4 bytes = data Size followed by data byte[]

func EncodePayload

func EncodePayload(key []byte, offset int64, data []byte) []byte

Encode function is used to convert payload byte[] to 1d byte[] along with the key and offset This function uses the following encoding scheme first 4 bytes = data Size next 8 bytes = offset next 4 bytes = key length next n byte = key followed by data[]

func EncodeV2

func EncodeV2(partition int, data [][]byte) []byte

Encode function is used to convert 2d byte[] to 1d byte[] This function uses the following encoding scheme first 4 bytes = partition number next 4 bytes = batch size

Types

type ChannelObject

type ChannelObject struct {
	Msg      interface{}
	Sideline Sideline
	Version  int32
}

type ControlMsg

type ControlMsg struct {
	// contains filtered or unexported fields
}

ControlMsg is the struct passed to Dmux control Channel to enable it perform Admin operations such as Resize

type ControlSignal

type ControlSignal uint8

ControlSignal type defined to build constants used in passing ControlSignals to DefaultDmux

type Distributor

type Distributor interface {
	//Distribute method take incoming data interface and number of outbound channels
	//to return the index of channel to be selected for Distribution of this message
	Distribute(data interface{}, size int) int
}

Distributor interface abstracts the Logic to distribute the load from Source to Sink. Client can choose to use HashDistributor or RoundRobinDistributor or write their own distribution Logic

func GetDistribution

func GetDistribution(distributorType DistributorType, h Hasher) Distributor

GetDistribution returns correct Distributor based on distributorType

func GetHashDistribution

func GetHashDistribution(h Hasher) Distributor

GetHashDistribution returns hashDistributor implementation of Distributor interface to provide Consistent Hash based routing in Dmux from Source to Sink. This needs Client to implement Hasher and pass Hasher in this method arg

func GetRoundRobinDistribution

func GetRoundRobinDistribution() Distributor

GetRoundRobinDistribution return roundRobinDistributor

type DistributorType

type DistributorType string

DistributorType based on this distribution is determined

const (
	//HashDistributor will distribute based on the key hash
	HashDistributor DistributorType = "Hash"
	//RoundRobinDistributor will distribute on round robin fashion
	RoundRobinDistributor DistributorType = "RoundRobin"
)

type Dmux

type Dmux struct {
	// contains filtered or unexported fields
}

Dmux struct which enables Size based Dmultiplexing for Source to Sink connections. TODO restrict size to be powers of 2 for better optimization in modulo

func GetDmux

func GetDmux(conf DmuxConf, d Distributor) *Dmux

GetDmux is public method used to Get instance of a Dmux struct

func (*Dmux) Await

func (d *Dmux) Await(duration time.Duration)

Await method added to enable testing when using bounded source

func (*Dmux) ConnectWithSideline

func (d *Dmux) ConnectWithSideline(source Source, sink Sink, sidelineImpl sideline_module.CheckMessageSideline, optionalParams DmuxOptionalParams)

Connect method holds Dmux logic used to Connect Source to Sink With Sideline

func (*Dmux) Join

func (d *Dmux) Join()

Join used to sleep the main routine forever

func (*Dmux) Resize

func (d *Dmux) Resize(size int)

Resize method is used to Resize a running Dmux

func (*Dmux) Stop

func (d *Dmux) Stop()

Stop is used to GracefulStop running Dmux

type DmuxConf

type DmuxConf struct {
	Size            int             `json:"size"`
	SourceQSize     int             `json:"source_queue_size"`
	SinkQSize       int             `json:"sink_queue_size"`
	DistributorType DistributorType `json:"distributor_type"`
	BatchSize       int             `json:"batch_size"`
	Version         int             `json:"version"`
	Sideline        Sideline        `json:"sideline"`
}

DmuxConf holds configuration parameters for Dmux

type DmuxOptionalParams

type DmuxOptionalParams struct {
	EnableDebugLog bool
}

type Duration

type Duration struct {
	time.Duration
}

Duration type embeds time.Duration, this was added to fix JSON parsing of time to valid go duration

func (Duration) MarshalJSON

func (d Duration) MarshalJSON() ([]byte, error)

MarshalJSON implements encoding/json to serialize this type to json string

func (*Duration) UnmarshalJSON

func (d *Duration) UnmarshalJSON(b []byte) error

UnmarshalJSON implements encoding/json to deserialize this to Duration

type Hasher

type Hasher interface {
	//ComputeHash method has to be implemented by Client to define how to distribute
	// when using HashDistributor
	ComputeHash(data interface{}) int
}

Hasher interface that can be implemented to define data interface and compute and return the righ hash int for it

type ResizeMeta

type ResizeMeta struct {
	// contains filtered or unexported fields
}

ResizeMeta is the struct used to define resize value which is used when Dmux is resizing

type ResponseMsg

type ResponseMsg struct {
	// contains filtered or unexported fields
}

ResponseMsg is used for running Dmux instnace to response to client

type Sideline

type Sideline struct {
	Retries               int         `json:"retries"`
	SidelineResponseCodes []int       `json:"sidelineResponseCodes"`
	ConsumerGroupName     string      `json:"consumerGroupName"`
	ClusterName           string      `json:"clusterName"`
	ConnectionType        string      `json:"type"`
	SidelineMeta          interface{} `json:"sidelineMeta"`
}

Sideline holds config parameters for sideline

type Sink

type Sink interface {
	// Clone method is expected to return instance of Sink. If Sink is Stateless
	// this can return selfRefrence back. If Sink is Stateful, its good idea to
	// create new instnace of Sink.
	Clone() Sink

	// Consume method gets The interface.
	//TODO currently this method does not return error, need to solve for error
	// handling
	Consume(msg interface{}, retries int, sidelineResponseCodes []int) error

	//BatchConsume method is invoked in batch_size is configured
	BatchConsume(msg []interface{}, version int)
}

Sink is interface that implements OutputSink of Dmux operation

type Source

type Source interface {
	//Generate method takes output channel to which it writes data. The
	//implementation can write to to this using multiple goroutines
	//This method is not expected to return, its run in a separate goroutine
	Generate(out chan<- interface{})
	//Method used to trigger GracefulStop of Source
	Stop()
	// GetKey Method to get key for a message
	GetKey(msg interface{}) []byte
	// GetPartition Method to get partition for a message
	GetPartition(msg interface{}) int32
	// GetValue Method to get value for a message
	GetValue(msg interface{}) []byte
	// GetOffset Method to get offsets
	GetOffset(msg interface{}) int64
}

Source is interface that implements input Source to the Dmux

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL