stream

package module
v1.0.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: MIT Imports: 12 Imported by: 0

README

Build Status

Vulcan Stream

Vulcan Stream provides a channel of communication between Vulcan Scan Engine and the Vulcan Agents.

Vulcan Scan Engine requires broadcast communication with Vulcan Agents in order to manage the Agent pool and control checks in execution. Because Agents might not be reachable from the internet, the Stream provides a websocket stream that Agents connect to in order to receive input from the Scan Engine.

Requirements

Vulcan Stream works on top of two main services:

Constraints

Current implementation of vulcan-stream must be deployed as a single instance. The reason for this is we took a design decision to maintain a local in memory cache to speed up checks endpoint requests so we could maximize Vulcan agents performance, which have to query this endpoint before executing each check.

API

Vulcan Stream exposes two endpoints to abort and retrieve the list of aborted checks.

Abort checks:

curl -X POST https://stream.vulcan.com/abort -H "Content-Type: application/json" -d '{"checks": ["<check_id1>", "<check_id2>", ... ]}'
->
<-
200 OK

Get checks:

curl -X GET https://stream.vulcan.com/checks
->
<-
200 OK 
["<check_id1>", "<check_id2>", ...]
...
Build & Run

Two binaries are provided:

  • vulcan-stream
  • vulcan-stream-test-client

Assuming you have Docker in your machine and there are no services listening on ports 6379 or 8080.

Run vulcan-stream:

go get -x github.com/adevinta/vulcan-stream/cmd/vulcan-stream

docker run -d -p 6379:6379 redis

vulcan-stream ${GOPATH}/src/github.com/adevinta/vulcan-stream/_resources/config/local.toml

Run vulcan-stream websocket client integration test:

go get -x github.com/adevinta/vulcan-stream/cmd/vulcan-stream-test-client

vulcan-stream-test-client ${GOPATH}/src/github.com/adevinta/vulcan-stream.git/_resources/config/local.toml

Or, connect to the stream and push some messages:

curl --include --no-buffer --header "Connection: Upgrade" --header "Upgrade: websocket" \
        --header "Host: localhost:8080" --header "Origin: http://localhost:8080" \
        --header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" --header "Sec-WebSocket-Version: 13" \
        "http://localhost:8080/stream" &

curl -X POST http://localhost:8080/abort -H 'Content-Type: application/json' -d '{"checks": ["00000000-0000-0000-0000-000000000000"]}'
Configure

You can see and modify Vulcan Stream configuration as required:

_resources/config/local.toml

Docker execute

These are the variables you have to setup:

Variable Description Sample
PORT Listen http port 8080
LOG_LEVEL DEBUG
REDIS_(HOST|PORT|USR|PWD|PORT\DB) Redis variables
REDIS_TTL TTL to apply for aborted check entries 7 days
docker build . -t vs

# Use the default config.toml customized with env variables.
docker run --env-file ./local.env vs

# Use custom config.toml
docker run -v `pwd`/custom.toml:/app/config.toml vs

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewLogger

func NewLogger(lc LoggerConfig) (logrus.FieldLogger, *os.File, error)

NewLogger provides a logrus FieldLogger.

Types

type API

type API struct {
	// contains filtered or unexported fields
}

API represents the stream REST API.

func NewAPI

func NewAPI(port int, sender *Sender, storage Storage, logger logrus.FieldLogger,
	metrics metrics.Client) *API

NewAPI builds a new stream API.

func (*API) Start

func (a *API) Start()

Start starts the stream API.

type APIConfig

type APIConfig struct {
	Port int
}

APIConfig represents the config necessary for stream API.

type AbortRequest

type AbortRequest struct {
	Checks []string `json:"checks"`
}

AbortRequest represents the body for an abort cheks request.

type LoggerConfig

type LoggerConfig struct {
	LogFile  string
	LogLevel string
}

LoggerConfig defines required Vulcan Logger configuration.

type Message

type Message struct {
	CheckID string `json:"check_id,omitempty"`
	AgentID string `json:"agent_id,omitempty"`
	ScanID  string `json:"scan_id,omitempty"`
	Action  string `json:"action"`
}

Message describes a stream message

type RedisConfig

type RedisConfig struct {
	Host string
	Port int
	Usr  string
	Pwd  string
	DB   int
	TTL  int
}

RedisConfig specifies the required config for RedisStorage.

type RedisDB

type RedisDB struct {
	// contains filtered or unexported fields
}

RedisDB is the implementation of a RemoteDB for a Redis database.

func NewRedisDB

func NewRedisDB(c RedisConfig) *RedisDB

NewRedisDB builds a new redis DB connector.

func (*RedisDB) GetChecks

func (r *RedisDB) GetChecks(ctx context.Context) ([]string, error)

GetChecks returns checks stored in redis.

func (*RedisDB) SetChecks

func (r *RedisDB) SetChecks(ctx context.Context, checks []string) error

SetChecks sets input checks in redis as a single transaction.

type RemoteDB

type RemoteDB interface {
	GetChecks(ctx context.Context) ([]string, error)
	SetChecks(ctx context.Context, checks []string) error
}

RemoteDB represents interface to interact with remote DB.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender defines a websocket event server

func NewSender

func NewSender(l logrus.FieldLogger, c SenderConfig) *Sender

NewSender creates a Vulcan Stream sender instance

func (*Sender) Broadcast

func (s *Sender) Broadcast(msg Message)

Broadcast emits msg to the specified Stream channel

func (*Sender) HandleConn

func (s *Sender) HandleConn(w http.ResponseWriter, r *http.Request)

HandleConn handles a connection to sender web socket topic.

func (*Sender) Start

func (s *Sender) Start()

Start initializes a websocket event server instance with provided configuration

type SenderConfig

type SenderConfig struct {
	HTTPStream   string
	PingInterval time.Duration
}

SenderConfig defines required Vulcan websocket event server configuration

type Storage

type Storage interface {
	GetAbortedChecks(ctx context.Context) ([]string, error)
	AddAbortedChecks(ctx context.Context, checks []string) error
}

Storage represents the stream storage for aborted checks.

func NewStorage

func NewStorage(db RemoteDB, logger log.FieldLogger) (Storage, error)

NewStorage builds a new Storage.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL