relay

package
v1.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2023 License: MIT Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultNumWorkers is the number of goroutine relay workers to launch
	DefaultNumWorkers = 10

	// QueueFlushInterval is how often to flush messages to GRPC collector if we don't reach the batch size
	QueueFlushInterval = 10 * time.Second

	// DefaultBatchSize is the number of messages to send to GRPC collector in each batch
	DefaultBatchSize = 100 // number of messages to batch

	// MaxGRPCRetries is the number of times we will attempt a GRPC call before giving up
	MaxGRPCRetries = 5

	// MaxGRPCMessageSize is the maximum message size for GRPC client in bytes
	MaxGRPCMessageSize = 1024 * 1024 * 100 // 100MB

	// GRPCRetrySleep determines how long we sleep between GRPC call retries
	GRPCRetrySleep = time.Second * 5
)

Variables

View Source
var (
	ErrNSQMissingChannel = errors.New("NSQ channel cannot be empty")
	ErrNSQMissingTopic   = errors.New("NSQ topic cannot be empty")
)
View Source
var (
	ErrMissingConfig             = errors.New("Relay config cannot be nil")
	ErrMissingToken              = errors.New("Token cannot be empty")
	ErrMissingServiceShutdownCtx = errors.New("ServiceShutdownCtx cannot be nil")
	ErrMissingGRPCAddress        = errors.New("GRPCAddress cannot be empty")
	ErrMissingRelayCh            = errors.New("RelayCh cannot be nil")
	ErrMissingMessage            = errors.New("msg cannot be nil")
	ErrMissingMessageValue       = errors.New("msg.Value cannot be nil")
	ErrMissingMessageOptions     = errors.New("msg.Options cannot be nil")
)
View Source
var (
	ErrMissingID         = errors.New("missing ID in relay message")
	ErrMissingKeyName    = errors.New("missing Key in relay message")
	ErrMissingStreamName = errors.New("missing Stream in relay message")
)

Functions

func NewConnection

func NewConnection(address, token string, timeout time.Duration, disableTLS, noCtx bool) (*grpc.ClientConn, context.Context, error)

func TestConnection

func TestConnection(cfg *Config) error

Types

type Config

type Config struct {
	Token              string
	GRPCAddress        string
	NumWorkers         int32
	BatchSize          int32
	RelayCh            chan interface{}
	ErrorCh            chan *records.ErrorRecord
	DisableTLS         bool
	Timeout            time.Duration // general grpc timeout (used for all grpc calls)
	Type               string
	ServiceShutdownCtx context.Context
	MainShutdownFunc   context.CancelFunc
	MainShutdownCtx    context.Context
}

type IRelayBackend added in v0.29.0

type IRelayBackend interface {
	Relay() error
}

type Relay

type Relay struct {
	*Config
	Workers      map[int32]struct{}
	WorkersMutex *sync.RWMutex
	// contains filtered or unexported fields
}

func New

func New(relayCfg *Config) (*Relay, error)

New creates a new instance of the Relay

func (*Relay) CallWithRetry added in v0.16.0

func (r *Relay) CallWithRetry(ctx context.Context, method string, publish func(ctx context.Context) error) error

CallWithRetry will retry a GRPC call until it succeeds or reaches a maximum number of retries defined by MaxGRPCRetries

func (*Relay) Run

func (r *Relay) Run(id int32, conn *grpc.ClientConn, outboundCtx, shutdownCtx context.Context)

Run is a GRPC worker that runs as a goroutine. outboundCtx is used for sending GRPC requests as it will contain metadata, specifically "batch-token". shutdownCtx is passed from the main plumber app to shut down workers TODO: This should also read from errorCh

func (*Relay) StartWorkers

func (r *Relay) StartWorkers(shutdownCtx context.Context) error

func (*Relay) WaitForShutdown added in v0.29.0

func (r *Relay) WaitForShutdown()

WaitForShutdown will wait for service shutdown context to be canceled. It will then start constantly polling the workers map until it is empty. When the workers map is empty, MainShutdownFunc() is called allowing the application to exit gracefully and ensuring we have sent all relay messages to the grpc-collector

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL