core

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RootPath    = "/"
	VarzPath    = "/varz"
	HealthzPath = "/healthz"
)

HTTP endpoints

Variables

View Source
var Version = "0.0-dev"

Version specifies the command version. This should be set at compile time.

Functions

This section is empty.

Types

type Bin

type Bin struct {
	Value float64 `json:"v"`
	Count float64 `json:"c"`
}

Bin holds a float64 value and count

type BridgeConnector

type BridgeConnector struct {
	sync.Mutex
	// contains filtered or unexported fields
}

BridgeConnector is the base type used for connectors so that they can share code The config, bridge and stats are all fixed at creation, so no lock is required on the connector at this level. The stats do keep a lock to protect their data. The connector has a lock for use by composing types to protect themselves during start/shutdown.

func (*BridgeConnector) CheckConnections

func (conn *BridgeConnector) CheckConnections() error

CheckConnections is a no-op, designed for overriding This is called when nats or stan goes down the connector should return an error if it has to be shut down

func (*BridgeConnector) ID

func (conn *BridgeConnector) ID() string

ID returns the id from the stats

func (*BridgeConnector) Shutdown

func (conn *BridgeConnector) Shutdown() error

Shutdown is a no-op, designed for overriding

func (*BridgeConnector) Start

func (conn *BridgeConnector) Start() error

Start is a no-op, designed for overriding

func (*BridgeConnector) Stats

func (conn *BridgeConnector) Stats() ConnectorStats

Stats returns a copy of the current stats for this connector

func (*BridgeConnector) String

func (conn *BridgeConnector) String() string

String returns the name passed into init

type BridgeStats

type BridgeStats struct {
	StartTime    int64            `json:"start_time"`
	ServerTime   int64            `json:"current_time"`
	UpTime       string           `json:"uptime"`
	RequestCount int64            `json:"request_count"`
	Connections  []ConnectorStats `json:"connectors"`
	HTTPRequests map[string]int64 `json:"http_requests"`
}

BridgeStats wraps the current status of the bridge and all of its connectors

type Connector

type Connector interface {
	Start() error
	Shutdown() error

	CheckConnections() error

	String() string
	ID() string

	Stats() ConnectorStats
}

Connector is the abstraction for all of the bridge connector types

func CreateConnector

func CreateConnector(config conf.ConnectorConfig, bridge *NATSKafkaBridge) (Connector, error)

CreateConnector builds a connector from the supplied configuration

func NewJetStream2KafkaConnector added in v1.0.0

func NewJetStream2KafkaConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector

NewJetStream2KafkaConnector create a new stan to kafka

func NewKafka2JetStreamConnector added in v1.0.0

func NewKafka2JetStreamConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector

NewKafka2JetStreamConnector create a new Kafka to JetStream connector

func NewKafka2NATSConnector

func NewKafka2NATSConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector

NewKafka2NATSConnector create a new Kafka to NATS connector

func NewKafka2StanConnector

func NewKafka2StanConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector

NewKafka2StanConnector create a new Kafka to STAN connector

func NewNATS2KafkaConnector

func NewNATS2KafkaConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector

NewNATS2KafkaConnector create a nats to MQ connector

func NewStan2KafkaConnector

func NewStan2KafkaConnector(bridge *NATSKafkaBridge, config conf.ConnectorConfig) Connector

NewStan2KafkaConnector create a new stan to kafka

type ConnectorStats

type ConnectorStats struct {
	Name          string  `json:"name"`
	ID            string  `json:"id"`
	Connected     bool    `json:"connected"`
	Connects      int64   `json:"connects"`
	Disconnects   int64   `json:"disconnects"`
	BytesIn       int64   `json:"bytes_in"`
	BytesOut      int64   `json:"bytes_out"`
	MessagesIn    int64   `json:"msg_in"`
	MessagesOut   int64   `json:"msg_out"`
	RequestCount  int64   `json:"count"`
	MovingAverage float64 `json:"rma"`
	Quintile50    float64 `json:"q50"`
	Quintile75    float64 `json:"q75"`
	Quintile90    float64 `json:"q90"`
	Quintile95    float64 `json:"q95"`
}

ConnectorStats captures the statistics for a single connector times are in nanoseconds, use a holder to get the protection of a lock and to fill in the quantiles

type ConnectorStatsHolder

type ConnectorStatsHolder struct {
	sync.Mutex
	// contains filtered or unexported fields
}

ConnectorStatsHolder provides a lock and histogram for a connector to updated it's stats. The holder's Stats() method should be used to get the current values.

func NewConnectorStatsHolder

func NewConnectorStatsHolder(name string, id string) *ConnectorStatsHolder

NewConnectorStatsHolder creates an empty stats holder, and initializes the request time histogram

func (*ConnectorStatsHolder) AddConnect

func (stats *ConnectorStatsHolder) AddConnect()

AddConnect updates the reconnects field locks/unlocks the stats

func (*ConnectorStatsHolder) AddDisconnect

func (stats *ConnectorStatsHolder) AddDisconnect()

AddDisconnect updates the disconnects field locks/unlocks the stats

func (*ConnectorStatsHolder) AddMessageIn

func (stats *ConnectorStatsHolder) AddMessageIn(bytes int64)

AddMessageIn updates the messages in and bytes in fields locks/unlocks the stats

func (*ConnectorStatsHolder) AddMessageOut

func (stats *ConnectorStatsHolder) AddMessageOut(bytes int64)

AddMessageOut updates the messages out and bytes out fields locks/unlocks the stats

func (*ConnectorStatsHolder) AddRequest

func (stats *ConnectorStatsHolder) AddRequest(bytesIn int64, bytesOut int64, reqTime time.Duration)

AddRequest groups addMessageIn, addMessageOut and addRequest time into a single call to reduce locking requirements. locks/unlocks the stats

func (*ConnectorStatsHolder) AddRequestTime

func (stats *ConnectorStatsHolder) AddRequestTime(reqTime time.Duration)

AddRequestTime register a time, updating the request count, RMA and histogram For information on the running moving average, see https://en.wikipedia.org/wiki/Moving_average locks/unlocks the stats

func (*ConnectorStatsHolder) ID

func (stats *ConnectorStatsHolder) ID() string

ID returns the ID the holder was created with

func (*ConnectorStatsHolder) Name

func (stats *ConnectorStatsHolder) Name() string

Name returns the name the holder was created with

func (*ConnectorStatsHolder) Stats

func (stats *ConnectorStatsHolder) Stats() ConnectorStats

Stats updates the quantiles and returns a copy of the stats locks/unlocks the stats

type Flags

type Flags struct {
	ConfigFile string

	Debug           bool
	Verbose         bool
	DebugAndVerbose bool
}

Flags defines the various flags you can call the account server with. These are used in main and passed down to the server code to process.

type Histogram

type Histogram struct {
	Bins    []Bin  `json:"bins"`
	MaxBins int    `json:"max"`
	Total   uint64 `json:"total"`
}

Histogram stores N bins using the streaming approximate histogram approach The histogram is not thread safe

func NewHistogram

func NewHistogram(n int) *Histogram

NewHistogram returns a new Histogram with a maximum of n bins.

There is no "optimal" bin count, but somewhere between 20 and 80 bins should be sufficient.

func (*Histogram) Add

func (h *Histogram) Add(n float64)

Add a value to the histogram, creating a bucket if necessary

func (*Histogram) Count

func (h *Histogram) Count() float64

Count returns the total number of entries in the histogram

func (*Histogram) Mean

func (h *Histogram) Mean() float64

Mean returns the sample mean of the distribution

func (*Histogram) Quantile

func (h *Histogram) Quantile(q float64) float64

Quantile returns the value for the bin at the provided quantile This is "approximate" in the since that the bin may straddle the quantile value

func (*Histogram) Scale

func (h *Histogram) Scale(s float64)

Scale the buckets by s, this is useful for requests or other values that may be in large numbers ie nanoseconds

type JetStream2KafkaConnector added in v1.0.0

type JetStream2KafkaConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

JetStream2KafkaConnector connects a JetStream stream to Kafka

func (*JetStream2KafkaConnector) CheckConnections added in v1.0.0

func (conn *JetStream2KafkaConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*JetStream2KafkaConnector) Shutdown added in v1.0.0

func (conn *JetStream2KafkaConnector) Shutdown() error

Shutdown the connector

func (*JetStream2KafkaConnector) Start added in v1.0.0

func (conn *JetStream2KafkaConnector) Start() error

Start the connector

type Kafka2JetStreamConnector added in v1.0.0

type Kafka2JetStreamConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Kafka2JetStreamConnector connects Kafka topic to JetStream

func (*Kafka2JetStreamConnector) CheckConnections added in v1.0.0

func (conn *Kafka2JetStreamConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Kafka2JetStreamConnector) Shutdown added in v1.0.0

func (conn *Kafka2JetStreamConnector) Shutdown() error

Shutdown the connector

func (*Kafka2JetStreamConnector) Start added in v1.0.0

func (conn *Kafka2JetStreamConnector) Start() error

Start the connector

type Kafka2NATSConnector

type Kafka2NATSConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Kafka2NATSConnector connects Kafka topic to a NATS subject

func (*Kafka2NATSConnector) CheckConnections

func (conn *Kafka2NATSConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Kafka2NATSConnector) Shutdown

func (conn *Kafka2NATSConnector) Shutdown() error

Shutdown the connector

func (*Kafka2NATSConnector) Start

func (conn *Kafka2NATSConnector) Start() error

Start the connector

type Kafka2StanConnector

type Kafka2StanConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Kafka2StanConnector connects Kafka topic to a nats streaming channel

func (*Kafka2StanConnector) CheckConnections

func (conn *Kafka2StanConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Kafka2StanConnector) Shutdown

func (conn *Kafka2StanConnector) Shutdown() error

Shutdown the connector

func (*Kafka2StanConnector) Start

func (conn *Kafka2StanConnector) Start() error

Start the connector

type NATS2KafkaConnector

type NATS2KafkaConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

NATS2KafkaConnector connects a NATS subject to a Kafka topic

func (*NATS2KafkaConnector) CheckConnections

func (conn *NATS2KafkaConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*NATS2KafkaConnector) Shutdown

func (conn *NATS2KafkaConnector) Shutdown() error

Shutdown the connector

func (*NATS2KafkaConnector) Start

func (conn *NATS2KafkaConnector) Start() error

Start the connector

type NATSCallback

type NATSCallback func(msg kafka.Message) error

NATSCallback used by conn-nats connectors in an conn library callback The lock will be held by the caller!

type NATSKafkaBridge

type NATSKafkaBridge struct {
	sync.Mutex
	// contains filtered or unexported fields
}

NATSKafkaBridge is the core structure for the server.

func NewNATSKafkaBridge

func NewNATSKafkaBridge() *NATSKafkaBridge

NewNATSKafkaBridge creates a new account server with a default logger

func (*NATSKafkaBridge) ApplyConfigFile

func (server *NATSKafkaBridge) ApplyConfigFile(configFile string) error

ApplyConfigFile applies the config file to the server's config

func (*NATSKafkaBridge) CheckJetStream added in v1.0.0

func (server *NATSKafkaBridge) CheckJetStream() bool

CheckJetStream returns true if the bridge is connected to JetStream

func (*NATSKafkaBridge) CheckNATS

func (server *NATSKafkaBridge) CheckNATS() bool

CheckNATS returns true if the bridge is connected to nats

func (*NATSKafkaBridge) CheckStan

func (server *NATSKafkaBridge) CheckStan() bool

CheckStan returns true if the bridge is connected to stan

func (*NATSKafkaBridge) ConnectorError

func (server *NATSKafkaBridge) ConnectorError(connector Connector, err error)

ConnectorError is called by a connector if it has a failure that requires a reconnect

func (*NATSKafkaBridge) FatalError

func (server *NATSKafkaBridge) FatalError(format string, args ...interface{})

FatalError stops the server, prints the messages and exits

func (*NATSKafkaBridge) GetMonitoringRootURL

func (server *NATSKafkaBridge) GetMonitoringRootURL() string

GetMonitoringRootURL returns the protocol://host:port for the monitoring server, useful for testing

func (*NATSKafkaBridge) HandleHealthz

func (server *NATSKafkaBridge) HandleHealthz(w http.ResponseWriter, r *http.Request)

HandleHealthz returns status 200.

func (*NATSKafkaBridge) HandleRoot

func (server *NATSKafkaBridge) HandleRoot(w http.ResponseWriter, r *http.Request)

HandleRoot will show basic info and links to others handlers.

func (*NATSKafkaBridge) HandleVarz

func (server *NATSKafkaBridge) HandleVarz(w http.ResponseWriter, r *http.Request)

HandleVarz returns statistics about the server.

func (*NATSKafkaBridge) InitializeFromConfig

func (server *NATSKafkaBridge) InitializeFromConfig(config conf.NATSKafkaBridgeConfig) error

InitializeFromConfig initialize the server's configuration to an existing config object, useful for tests Does not change the config at all, use DefaultServerConfig() to create a default config

func (*NATSKafkaBridge) InitializeFromFlags

func (server *NATSKafkaBridge) InitializeFromFlags(flags Flags) error

InitializeFromFlags is called from main to configure the server, the server will decide what needs to happen based on the flags. On reload the same flags are passed

func (*NATSKafkaBridge) JetStream added in v1.0.0

func (server *NATSKafkaBridge) JetStream() nats.JetStreamContext

JetStream hosts a shared JetStream connection for the connectors

func (*NATSKafkaBridge) Logger

func (server *NATSKafkaBridge) Logger() logging.Logger

Logger hosts a shared logger

func (*NATSKafkaBridge) NATS

func (server *NATSKafkaBridge) NATS() *nats.Conn

NATS hosts a shared nats connection for the connectors

func (*NATSKafkaBridge) SafeStats

func (server *NATSKafkaBridge) SafeStats() BridgeStats

SafeStats grabs the lock then calls stats(), useful for tests

func (*NATSKafkaBridge) Stan

func (server *NATSKafkaBridge) Stan() stan.Conn

Stan hosts a shared streaming connection for the connectors

func (*NATSKafkaBridge) Start

func (server *NATSKafkaBridge) Start() error

Start the server, will lock the server, assumes the config is loaded

func (*NATSKafkaBridge) Stop

func (server *NATSKafkaBridge) Stop()

Stop the account server

func (*NATSKafkaBridge) StopMonitoring

func (server *NATSKafkaBridge) StopMonitoring() error

StopMonitoring shuts down the http server used for monitoring expects the lock to be held

type ShutdownCallback

type ShutdownCallback func() error

ShutdownCallback is returned when setting up a callback or polling so the connector can shut it down

type Stan2KafkaConnector

type Stan2KafkaConnector struct {
	BridgeConnector
	// contains filtered or unexported fields
}

Stan2KafkaConnector connects a STAN channel to Kafka

func (*Stan2KafkaConnector) CheckConnections

func (conn *Stan2KafkaConnector) CheckConnections() error

CheckConnections ensures the nats/stan connection and report an error if it is down

func (*Stan2KafkaConnector) Shutdown

func (conn *Stan2KafkaConnector) Shutdown() error

Shutdown the connector

func (*Stan2KafkaConnector) Start

func (conn *Stan2KafkaConnector) Start() error

Start the connector

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL