forkjoin

package module
v0.0.0-...-b46bbe4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2023 License: MIT Imports: 6 Imported by: 0

README

forkjoin

The library implements a fork(fanout) and join(fanin) pattern using goroutines

Fork Join Design

  1. Multiplexer spawns N goroutines for N worker added through addWorker method on the Multiplexer
  2. Multiplexer's model is request/response, it return only one response form the worker
  3. Each worker needs to
    • Implement Worker interface and return on result channel. Heartbeat is managed for the worker
    • Exit its work on a signal from Manager on the done channel
    • Worker only need to implement the actual work
  4. The worker (goroutine) is considered unhealthy if the heartbeat is delayed by more than two seconds and is restarted

Using fork join

  1. As go library
  2. GRPC streaming service

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SendPulse

func SendPulse(ctx context.Context, pulseStream chan Heartbeat, pulseInterval time.Duration)

TODO how to make worker whom this lib has no control implement send pulse? SendPulse to be called by worker to send heart beat back to manager

Types

type BaseWorker

type BaseWorker struct {
	ActiveDealine int32
}

type EventCode

type EventCode int32

ErrorCode for GRPC error responses

const (
	InternalError EventCode = iota
	RequestError
	ResponseError
	ConnectionError
	ConcurrencyContextError
	RequestAborted
	AuthenticationError
	RequestInfo
	Info
	HeartBeatInfo
)

Error codes for GRPC error responses

type FJError

type FJError struct {
	Code       EventCode
	Inner      error
	Message    string
	StackTrace string
	Misc       map[string]interface{}
}

FJError error reported by ForkJoin

type Heartbeat

type Heartbeat struct {
	// contains filtered or unexported fields
}

type LogEvent

type LogEvent struct {
	// contains filtered or unexported fields
}

LogEvent stores log message

type Multiplexer

type Multiplexer struct {
	// contains filtered or unexported fields
}

Multiplexer starts N goroutine for N dispatchers

func NewMultiplexer

func NewMultiplexer() Multiplexer

NewMultiplexer creates new basic multiplexer

func (*Multiplexer) AddWorker

func (m *Multiplexer) AddWorker(w Worker)

AddWorker adds workers to multiplex on N worker

func (*Multiplexer) Multiplex

func (m *Multiplexer) Multiplex(ctx context.Context, x interface{}) <-chan Result

Multiplex starts N goroutines configured in []config

type Result

type Result struct {
	ID  string
	X   interface{}
	Err *FJError
}

Result returned by checks with the result

type StandardLogger

type StandardLogger struct {
	*logrus.Logger
}

StandardLogger enforces specific log message formats

func NewLogger

func NewLogger() *StandardLogger

func (*StandardLogger) LogAbortedRequest

func (l *StandardLogger) LogAbortedRequest(requestID, messageID, message string)

func (*StandardLogger) LogAuthenticationError

func (l *StandardLogger) LogAuthenticationError(requestID, messageID, message string)

func (*StandardLogger) LogHeartBeatMsg

func (l *StandardLogger) LogHeartBeatMsg(requestID, messageID, message string)

func (*StandardLogger) LogInfo

func (l *StandardLogger) LogInfo(requestID, message string)

func (*StandardLogger) LogInvalidRequest

func (l *StandardLogger) LogInvalidRequest(requestID, messageID, message string)

func (*StandardLogger) LogRequestDispatchError

func (l *StandardLogger) LogRequestDispatchError(requestID, messageID, message string)

func (*StandardLogger) LogRequestInfo

func (l *StandardLogger) LogRequestInfo(requestID, message string)

TODO: have one info or request and one generic

func (*StandardLogger) LogResponseError

func (l *StandardLogger) LogResponseError(requestID, messageID, message string)

type Worker

type Worker interface {
	Work(ctx context.Context, x interface{}) (<-chan Result, <-chan Heartbeat)
	ActiveDeadLineSeconds() uint32
}

Worker will be implement the work to be done and exit on the done channel

Directories

Path Synopsis
cmd
tls

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL