shoveler

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0 Imports: 25 Imported by: 1

README

XRootD Monitoring Shoveler

This shoveler gathers UDP monitoring messages from XRootD servers and sends them to a reliable message bus.

Unit Tests Linting Static Code Analysis Go Reference license

Documentation · Report Bug · Request Feature

graph LR
  subgraph Site
    subgraph Node 1
    node1[XRootD] -- UDP --> shoveler1{Shoveler};
    end
    subgraph Node 2
    node2[XRootD] -- UDP --> shoveler1{Shoveler};
    end
  end;
  subgraph OSG Operations
  shoveler1 -- TCP/TLS --> C[Message Bus];
  C -- Raw --> D[XRootD Collector];
  D -- Summary --> C;
  C --> E[(Storage)];
  style shoveler1 font-weight:bolder,stroke-width:4px,stroke:#E74C3C,font-size:4em,color:#E74C3C
  end;

📔 Table of Contents

Getting Started

Requirements
  1. An open UDP port from the XRootD servers, defaults to port 9993. The port does not need to be open to the public internet, only the XRootD servers.
  2. Outgoing network access to connect to the message bus.
  3. Disk space for a persistent message queue if the shoveler is disconnected from the message bus. Calculations have shown production servers generate <30 MB of data a day.

The shoveler can run on a dedicated server or on a shared server. The shoveler does not require many resources. For example, a shoveler serving 12 production XRootD servers can be expected to consume 10-50 MB of ram, and require a small fraction of a CPU.

⚙ Installation

Binaries and packages are provided in the latest Github releases.

Configuration

The shoveler will read from:

  1. Configuration file.
  2. Environment Variables
  3. Command line arguments.

An example configuration file, config.yaml is in the repo. Each variable in the configuration file has a corresponding environment variable, listed below. The environment variables are useful for deployment in docker or kubernetes. By default, the config is stored in /etc/xrootd-monitoring-shoveler.

When running as a daemon, environment variables can still be used for configuration. The service will be looking for them under /etc/sysconfig/xrootd-monitoring-shoveler.

Environment variables:

  • SHOVELER_MQ
  • SHOVELER_AMQP_TOKEN_LOCATION
  • SHOVELER_AMQP_URL
  • SHOVELER_AMQP_EXCHANGE
  • SHOVELER_LISTEN_PORT
  • SHOVELER_LISTEN_IP
  • SHOVELER_VERIFY
  • SHOVELER_QUEUE_DIRECTORY
  • SHOVELER_STOMP_USER
  • SHOVELER_STOMP_PASSWORD
  • SHOVELER_STOMP_URL
  • SHOVELER_STOMP_TOPIC
  • SHOVELER_STOMP_CERT
  • SHOVELER_STOMP_CERT_KEY
  • SHOVELER_METRICS_PORT
  • SHOVELER_METRICS_ENABLE
  • SHOVELER_MAP_ALL
Message Bus Credentials

When running using AMQP as the protocol to connect the shoveler uses a JWT to authorize with the message bus. The token will be issued by an automated process, but for now, long lived tokens are issued to sites.

On the other hand, if STOMP is the selected protocol user and password will need to be provided when configuring the shoveler.

Packet Verification

If the verify option or SHOVELER_VERIFY env. var. is set to true (the default), the shoveler will perform simple verification that the incoming UDP packets conform to XRootD monitoring packets.

IP Mapping

When the shoveler runs on the same node as the XRootD server, or in the same private network, the IP of the incoming XRootD packets may report the private IP address rather than the public IP address. The public ip address is used for reverse DNS lookup when summarizing the records. You may map incoming IP addresses to other addresses with the map configuration value.

To map all incoming messages to a single IP:

map:
  all: <ip address>

or the environment variable SHOVELER_MAP_ALL=

To map multiple ip addresses, the config file would be:

map:
   <ip address>: <ip address>
   <ip address>: <ip address>
   

Running the Shoveler

The shoveler is a statically linked binary, distributed as an RPM and uploaded to docker hub and OSG's container hub. You will need to configure the config.yaml before starting.

Install the RPM from the latest release.
Start the systemd service with:

systemctl start xrootd-monitoring-shoveler.service

From Docker, you can start the container from the OSG hub with the following command.

docker run -v config.yaml:/etc/xrootd-monitoring-shoveler/config.yaml hub.opensciencegrid.org/opensciencegrid/xrootd-monitoring-shoveler

🧭 Design

Queue Design

The shoveler receives UDP packets and stores them onto a queue before being sent to the message bus. 100 messages are stored in memory. When the in memory messages reaches over 100, the messages are written to disk under the SHOVELER_QUEUE_DIRECTORY (env) or queue_directory (yaml) configured directories. A good default is /var/spool/xrootd-monitoring-shoveler/queue. Note that /var/run or /tmp should not be used, as these directories are not persistent and may be cleaned regularly by tooling such as systemd-tmpfiles. The on-disk queue is persistent across shoveler restarts.

The queue length can be monitored through the prometheus monitoring metric name: shoveler_queue_size.

⚠ License

Distributed under the Apache 2.0 License. See LICENSE.txt for more information.

💎 Acknowledgements

This project is supported by the National Science Foundation under Cooperative Agreements OAC-2030508 and OAC-1836650.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ShovelerVersion string
	ShovelerCommit  string
	ShovelerDate    string
	ShovelerBuiltBy string
)
View Source
var (
	PacketsReceived = promauto.NewCounter(prometheus.CounterOpts{
		Name: "shoveler_packets_received",
		Help: "The total number of packets received",
	})

	ValidationsFailed = promauto.NewCounter(prometheus.CounterOpts{
		Name: "shoveler_validations_failed",
		Help: "The total number of packets that failed validation",
	})

	RabbitmqReconnects = promauto.NewCounter(prometheus.CounterOpts{
		Name: "shoveler_rabbitmq_reconnects",
		Help: "The total number of reconnections to rabbitmq bus",
	})

	QueueSize = promauto.NewGauge(prometheus.GaugeOpts{
		Name: "shoveler_queue_size",
		Help: "The number of messages in the queue",
	})
)
View Source
var (
	ErrEmpty     = errors.New("queue is empty")
	MaxInMemory  = 100
	LowWaterMark = 50
)

Functions

func CheckTokenFile

func CheckTokenFile(config *Config, tokenAge time.Time, triggerReconnect chan<- bool)

Listen to the channel for messages

func GetStompConnection

func GetStompConnection(session *StompSession) (*stomp.Conn, error)

func ItemBuilder

func ItemBuilder() interface{}

ItemBuilder creates a new item and returns a pointer to it. This is used when we load a segment of the queue from disk.

func PackageUdp

func PackageUdp(packet []byte, remote *net.UDPAddr, config *Config) []byte

func SetLogger added in v1.3.0

func SetLogger(logger logrus.FieldLogger)

func StartAMQP

func StartAMQP(config *Config, queue *ConfirmationQueue)

This should run in a new go co-routine.

func StartMetrics

func StartMetrics(metricsPort int)

func StartStomp

func StartStomp(config *Config, queue *ConfirmationQueue)

func VerifyPacket

func VerifyPacket(packet []byte) bool

verifyPacket will verify the packet matches the expected format from XRootD

Types

type Config

type Config struct {
	MQ            string   // Which technology to use for the MQ connection
	AmqpURL       *url.URL // AMQP URL (password comes from the token)
	AmqpExchange  string   // Exchange to shovel messages
	AmqpToken     string   // File location of the token
	ListenPort    int
	ListenIp      string
	DestUdp       []string
	Debug         bool
	Verify        bool
	StompUser     string
	StompPassword string
	StompURL      *url.URL
	StompTopic    string
	Metrics       bool
	MetricsPort   int
	StompCert     string
	StompCertKey  string
	QueueDir      string
	IpMapAll      string
	IpMap         map[string]string
}

func (*Config) ReadConfig

func (c *Config) ReadConfig()

type ConfirmationQueue

type ConfirmationQueue struct {
	// contains filtered or unexported fields
}

func NewConfirmationQueue

func NewConfirmationQueue(config *Config) *ConfirmationQueue

NewConfirmationQueue returns an initialized list.

func (*ConfirmationQueue) Close

func (cq *ConfirmationQueue) Close() error

Close will close the on-disk files

func (*ConfirmationQueue) Dequeue

func (cq *ConfirmationQueue) Dequeue() ([]byte, error)

Dequeue Blocking function to receive a message

func (*ConfirmationQueue) Enqueue

func (cq *ConfirmationQueue) Enqueue(msg []byte)

Enqueue the message

func (*ConfirmationQueue) Init

func (cq *ConfirmationQueue) Init(config *Config) *ConfirmationQueue

Init initializes the queue

func (*ConfirmationQueue) Size

func (cq *ConfirmationQueue) Size() int
type Header struct {
	Code        byte
	Pseq        uint8
	Plen        uint16
	ServerStart int32
}

Header is the XRootD structure 1 + 1 + 2 + 4 = 8 bytes

type Message

type Message struct {
	Remote          string `json:"remote"`
	ShovelerVersion string `json:"version"`
	Data            string `json:"data"`
}

type MessageStruct

type MessageStruct struct {
	Message []byte
}

type Session

type Session struct {
	// contains filtered or unexported fields
}

Copied from the amqp documentation at: https://pkg.go.dev/github.com/streadway/amqp

func New

func New(url url.URL) *Session

New creates a new consumer state instance, and automatically attempts to connect to the server.

func (*Session) Close

func (session *Session) Close() error

Close will cleanly shutdown the channel and connection.

func (*Session) Push

func (session *Session) Push(exchange string, data []byte) error

Push will push data onto the queue, and wait for a confirm. If no confirms are received until within the resendTimeout, it continuously re-sends messages until a confirm is received. This will block until the server sends a confirm. Errors are only returned if the push action itself fails, see UnsafePush.

func (*Session) UnsafePush

func (session *Session) UnsafePush(exchange string, data []byte) error

UnsafePush will push to the queue without checking for confirmation. It returns an error if it fails to connect. No guarantees are provided for whether the server will recieve the message.

type StompSession

type StompSession struct {
	// contains filtered or unexported fields
}

func GetNewStompConnection

func GetNewStompConnection(username string, password string,
	stompUrl url.URL, topic string, stompCert string, stompCertKey string) *StompSession

func NewStompConnection

func NewStompConnection(username string, password string,
	stompUrl url.URL, topic string, cert ...tls.Certificate) *StompSession

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL