substation

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2024 License: MIT Imports: 6 Imported by: 0

README

Substation

Substation Banner

Substation is a cloud-native, event-driven data pipeline toolkit built for security teams.

Releases   |   Docs   |   Quickstart   |   Announcement Post (2022)

At a Glance

Substation is inspired by data pipeline systems like Logstash and Fluentd, but is built for modern security teams:

  • Extensible Data Processing: Build data processing pipeline systems and microservices using out-of-the-box applications and 100+ data transformation functions, or create your own written in Go.
  • Route Data Across the Cloud: Conditionally route data to, from, and between AWS cloud services, including S3, Kinesis, SQS, and Lambda, or to any HTTP endpoint.
  • Bring Your Own Schema: Format, normalize, and enrich event logs to comply with the Elastic Common Schema (ECS), Open Cybersecurity Schema Framework (OCSF), or any other schema.
  • Unlimited Data Enrichment: Use external APIs to enrich event logs affordably and at scale with enterprise and threat intelligence, or build a microservice that reduces spend in expensive security APIs.
  • No Servers, No Maintenance: Deploys as a serverless application in your AWS account, launches in minutes using Terraform, and requires no maintenance after deployment.
  • Runs Almost Anywhere: Create applications that run on most platforms supported by Go and transform data consistently across laptops, servers, containers, and serverless functions.
  • High Performance, Low Cost: Transform 100,000+ events per second while keeping cloud costs as low as a few cents per GB. Vendor solutions, like Cribl and Datadog, can cost up to 10x more.

All of these data pipeline and microservice systems, and many more, can be built with Substation:

Example Substation architectures

Getting Started

You can run Substation on these platforms:

The project includes a Makefile that simplifies local development and test deployments. To test the system in AWS, run this from the project root:

# Checks that dependencies are installed and environment variables are set.
make check
# Deploys Substation to AWS. This deploys the Kinesis Time Travel example 
# and writes data to the Kinesis stream.
make test-aws

The AWS examples folder contains reusable deployment patterns that demonstrate best practices for managing the system using Terraform and Jsonnet. Deploy them using these commands:

make check
# Builds dependencies required for AWS deployments.
make build-aws
# Deploys the DynamoDB Change Data Capture example.
make deploy-aws DEPLOYMENT_DIR=examples/terraform/aws/dynamodb/cdc AWS_APPCONFIG_ENV=example

We do not recommend managing cloud deployments from a local machine using the Makefile. Production deployments should use a CI/CD pipeline with a remote state backend to manage infrastructure.

Transforming Event Logs

Substation excels at formatting, normalizing, and enriching event logs. For example, Zeek connection logs can be transformed to comply with the Elastic Common Schema:

Raw Event Transformed Event
{
  "ts": 1591367999.430166,
  "uid": "C5bLoe2Mvxqhawzqqd",
  "id.orig_h": "192.168.4.76",
  "id.orig_p": 46378,
  "id.resp_h": "31.3.245.133",
  "id.resp_p": 80,
  "proto": "tcp",
  "service": "http",
  "duration": 0.25411510467529297,
  "orig_bytes": 77,
  "resp_bytes": 295,
  "conn_state": "SF",
  "missed_bytes": 0,
  "history": "ShADadFf",
  "orig_pkts": 6,
  "orig_ip_bytes": 397,
  "resp_pkts": 4,
  "resp_ip_bytes": 511
}
{
  "event": {
    "original": {
      "ts": 1591367999.430166,
      "uid": "C5bLoe2Mvxqhawzqqd",
      "id.orig_h": "192.168.4.76",
      "id.orig_p": 46378,
      "id.resp_h": "31.3.245.133",
      "id.resp_p": 80,
      "proto": "tcp",
      "service": "http",
      "duration": 0.25411510467529297,
      "orig_bytes": 77,
      "resp_bytes": 295,
      "conn_state": "SF",
      "missed_bytes": 0,
      "history": "ShADadFf",
      "orig_pkts": 6,
      "orig_ip_bytes": 397,
      "resp_pkts": 4,
      "resp_ip_bytes": 511
    },
    "hash": "af70ea0b38e1fb529e230d3eca6badd54cd6a080d7fcb909cac4ee0191bb788f",
    "created": "2022-12-30T17:20:41.027505Z",
    "id": "C5bLoe2Mvxqhawzqqd",
    "kind": "event",
    "category": [
      "network"
    ],
    "action": "network-connection",
    "outcome": "success",
    "duration": 254115104.675293
  },
  "@timestamp": "2020-06-05T14:39:59.430166Z",
  "client": {
    "address": "192.168.4.76",
    "ip": "192.168.4.76",
    "port": 46378,
    "packets": 6,
    "bytes": 77
  },
  "server": {
    "address": "31.3.245.133",
    "ip": "31.3.245.133",
    "port": 80,
    "packets": 4,
    "bytes": 295,
    "domain": "h31-3-245-133.host.redstation.co.uk",
    "top_level_domain": "co.uk",
    "subdomain": "h31-3-245-133.host",
    "registered_domain": "redstation.co.uk",
    "as": {
      "number": 20860,
      "organization": {
        "name": "Iomart Cloud Services Limited"
      }
    },
    "geo": {
      "continent_name": "Europe",
      "country_name": "United Kingdom",
      "city_name": "Manchester",
      "location": {
        "latitude": 53.5039,
        "longitude": -2.1959
      },
      "accuracy": 1000
    }
  },
  "network": {
    "protocol": "tcp",
    "bytes": 372,
    "packets": 10,
    "direction": "outbound"
  }
}

Routing Data

Substation can route data to several destinations from a single process and, unlike most other data pipeline systems, data transformation and routing are functionally equivalent -- this means that data can be transformed or routed in any order.

In this configuration, data is:

  • Written to AWS S3
  • Printed to stdout
  • Conditionally dropped (filtered, removed)
  • Sent to an HTTPS endpoint
// The input is a JSON array of objects, such as:
// [
//   { "field1": "a", "field2": 1, "field3": true },
//   { "field1": "b", "field2": 2, "field3": false },
//   ...
// ]
local sub = import 'substation.libsonnet';

// This filters events based on the value of field3.
local is_false = sub.cnd.str.eq(settings={ object: { source_key: 'field3' }, value: 'false' });

{
  transforms: [
    // Pre-transformed data is written to an object in AWS S3 for long-term storage.
    sub.tf.send.aws.s3(settings={ bucket_name: 'example-bucket-name' }),
    // The JSON array is split into individual events that go through 
    // the remaining transforms. Each event is printed to stdout.
    sub.tf.agg.from.array(),
    sub.tf.send.stdout(),
    // Events where field3 is false are removed from the pipeline.
    sub.pattern.tf.conditional(condition=is_false, transform=sub.tf.util.drop()),
    // The remaining events are sent to an HTTPS endpoint.
    sub.tf.send.http.post(settings={ url: 'https://example-http-endpoint.com' }),
  ],
}

Alternatively, the data can be conditionally routed to different destinations:

local sub = import 'substation.libsonnet';

{
  transforms: [
    // If field3 is false, then the event is sent to an HTTPS endpoint; otherwise,
    // the event is written to an object in AWS S3.
    sub.tf.meta.switch(settings={ cases: [
      {
        condition: sub.cnd.any(sub.cnd.str.eq(settings={ object: { source_key: 'field3' }, value: 'false' })),
        transform: sub.tf.send.http.post(settings={ url: 'https://example-http-endpoint.com' }),
      },
      {
        transform: sub.tf.send.aws.s3(settings={ bucket_name: 'example-bucket-name' }),
      },
    ] }),
    // The event is always available to any remaining transforms.
    sub.tf.send.stdout(),
  ],
}

Configuring Applications

Substation applications run almost anywhere (laptops, servers, containers, serverless functions) and all transform functions behave identically regardless of where they are run. This makes it easy to develop configuration changes locally, validate them in a build (CI/CD) pipeline, and run integration tests in a staging environment before deploying to production.

Configurations are written in Jsonnet and can be expressed as functional code, simplifying version control and making it easy to build custom data processing libraries. For power users, configurations also have abbreviations that make them easier to write. Compare the configuration below to similar configurations for Logstash and Fluentd:

Substation Logstash Fluentd
local sub = import 'substation.libsonnet';

{
  transforms: [
    sub.tf.obj.cp(
      settings={ object: { source_key: 'src_field_1', target_key: 'dest_field_1' } }
    ),
    sub.tf.obj.cp({ obj: { src: 'src_field_2', trg: 'dest_field_2' } }),
    sub.tf.send.stdout(),
    sub.tf.send.http.post(
      settings={ url: 'https://example-http-endpoint.com' }
    ),
  ],
}
input {
  file {
    path => "/path/to/your/file.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => "json"
  }
}

filter {
  json {
    source => "message"
  }

  mutate {
    copy => { "src_field_1" => "dest_field_1" }
    copy => { "src_field_2" => "dest_field_2" }
  }
}

output {
  stdout {
    codec => rubydebug
  }

  http {
    url => "https://example-http-endpoint.com"
    http_method => "post"
    format => "json"
  }
}
<source>
  @type tail
  path /path/to/your/file.log
  pos_file /dev/null
  tag file.log
  format json
</source>

<filter file.log>
  @type record_transformer
  enable_ruby
  <record>
    dest_field_1 ${record['src_field_1']}
    dest_field_2 ${record['src_field_2']}
  </record>
</filter>

<match file.log>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type http
    url https://example-http-endpoint.com
    http_method post
    <format>
      @type json
    </format>
  </store>
</match>

Deploying to AWS

Substation includes Terraform modules for securely deploying data pipelines and microservices in AWS. These modules are designed for ease of use, but are also flexible enough to support managing complex systems. This configuration deploys a data pipeline that is capable of receiving data from API Gateway and storing it in an S3 bucket:

resources.tf node.tf
# These resources are deployed once and are used by all Substation infrastructure.

# Substation resources can be encrypted using a customer-managed KMS key.
module "kms" {
  source = "build/terraform/aws/kms"

  config = {
    name   = "alias/substation"
  }
}

# Substation typically uses AppConfig to manage configuration files, but
# configurations can also be loaded from an S3 URI or an HTTP endpoint.
module "appconfig" {
  source = "build/terraform/aws/appconfig"

  config = {
    name = "substation"
    environments = [{
        name = "example"
    }]
  }
}

module "ecr" {
  source = "build/terraform/aws/ecr"
  kms    = module.kms

  config = {
    name         = "substation"
    force_delete = true
  }
}

resource "random_uuid" "s3" {}

module "s3" {
  source = "build/terraform/aws/s3"
  kms    = module.kms

  config = {
    # Bucket name is randomized to avoid collisions.
    name = "${random_uuid.s3.result}-substation"
  }

  # Access is granted by providing the role name of a
  # resource. This access applies least privilege and
  # grants access to dependent resources, such as KMS.
  access = [
    # Lambda functions create unique roles that are
    # used to access resources.
    module.node.role.name,
  ]
}
# Deploys an unauthenticated API Gateway that forwards data to the node.
module "node_gateway" {
  source = "build/terraform/aws/api_gateway/lambda"
  lambda = module.node

  config = {
    name = "node_gateway"
  }

  depends_on = [
    module.node
  ]
}

module "node" {
  source = "build/terraform/aws/lambda"
  kms       = module.kms  # Optional
  appconfig = module.appconfig  # Optional

  config = {
    name        = "node"
    description = "Substation node that writes data to S3."
    image_uri   = "${module.ecr.url}:latest"
    image_arm   = true

    env = {
      "SUBSTATION_CONFIG" : "https://localhost:2772/applications/substation/environments/example/configurations/node"
      "SUBSTATION_DEBUG" : true
      # This Substation node will ingest data from API Gateway. More nodes can be 
      # deployed to ingest data from other sources, such as Kinesis or SQS.
      "SUBSTATION_LAMBDA_HANDLER" : "AWS_API_GATEWAY"
    }
  }

  depends_on = [
    module.appconfig.name,
    module.ecr.url,
  ]
}

Licensing

Substation and its associated code is released under the terms of the MIT License.

Documentation

Overview

Example (SubstationCustomConfig)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/brexhq/substation"
)

// Custom applications should embed the Substation configuration and
// add additional configuration options.
type customConfig struct {
	substation.Config

	Auth struct {
		Username string `json:"username"`

		Password string `json:"password"`
	} `json:"auth"`
}

// String returns an example string representation of the custom configuration.
func (c customConfig) String() string {
	return fmt.Sprintf("%s:%s", c.Auth.Username, c.Auth.Password)
}

func main() {
	// Substation applications rely on a context for cancellation and timeouts.
	ctx := context.Background()

	// Define and load the custom configuration. This config includes a username
	// and password for authentication.
	conf := []byte(`
		{
			"transforms":[
				{"type":"object_copy","settings":{"object":{"source_key":"a","target_key":"c"}}},
				{"type":"send_stdout"}
			],
			"auth":{
				"username":"foo",
				"password":"bar"
			}
		}
	`)

	cfg := customConfig{}
	if err := json.Unmarshal(conf, &cfg); err != nil {
		// Handle error.
		panic(err)
	}

	// Create a new Substation instance from the embedded configuration.
	sub, err := substation.New(ctx, cfg.Config)
	if err != nil {
		// Handle error.
		panic(err)
	}

	// Print the Substation configuration.
	fmt.Println(sub)

	// Print the custom configuration.
	fmt.Println(cfg)

}
Output:

{"transforms":[{"type":"object_copy","settings":{"object":{"source_key":"a","target_key":"c"}}},{"type":"send_stdout","settings":null}]}
foo:bar
Example (SubstationCustomTransforms)
package main

import (
	"context"
	"encoding/json"

	"github.com/brexhq/substation"
	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/message"
	"github.com/brexhq/substation/transform"
)

func main() {
	// Substation applications rely on a context for cancellation and timeouts.
	ctx := context.Background()

	// Define and load the configuration. This config includes a transform that
	// is not part of the standard Substation package.
	conf := []byte(`
		{
			"transforms":[
				{"type":"utility_duplicate"},
				{"type":"send_stdout"}
			]
		}
	`)

	cfg := substation.Config{}
	if err := json.Unmarshal(conf, &cfg); err != nil {
		// Handle error.
		panic(err)
	}

	// Create a new Substation instance with a custom transform factory for loading
	// the custom transform.
	sub, err := substation.New(ctx, cfg, substation.WithTransformFactory(customFactory))
	if err != nil {
		// Handle error.
		panic(err)
	}

	msg := []*message.Message{
		message.New().SetData([]byte(`{"a":"b"}`)),
		message.New().AsControl(),
	}

	// Transform the group of messages. In this example, results are not used.
	if _, err := sub.Transform(ctx, msg...); err != nil {
		// Handle error.
		panic(err)
	}

}

// customFactory is used in the custom transform example to load the custom transform.
func customFactory(ctx context.Context, cfg config.Config) (transform.Transformer, error) {
	switch cfg.Type {

	case "utility_duplicate":
		return &utilityDuplicate{Count: 1}, nil
	}

	return transform.New(ctx, cfg)
}

// Duplicates a message.
type utilityDuplicate struct {
	Count int `json:"count"`
}

func (t *utilityDuplicate) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) {

	if msg.IsControl() {
		return []*message.Message{msg}, nil
	}

	output := []*message.Message{msg}
	for i := 0; i < t.Count; i++ {
		output = append(output, msg)
	}

	return output, nil
}
Output:

{"a":"b"}
{"a":"b"}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithTransformFactory

func WithTransformFactory(fac transform.Factory) func(*Substation)

WithTransformFactory implements a custom transform factory.

Types

type Config

type Config struct {
	// Transforms contains a list of data transformatons that are executed.
	Transforms []config.Config `json:"transforms"`
}

Config is the core configuration for the application. Custom applications should embed this and add additional configuration options.

type Substation

type Substation struct {
	// contains filtered or unexported fields
}

Substation provides access to data transformation functions.

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/brexhq/substation"
	"github.com/brexhq/substation/message"
)

func main() {
	// Substation applications rely on a context for cancellation and timeouts.
	ctx := context.Background()

	// Define a configuration. For native Substation applications, this is managed by Jsonnet.
	//
	// This example copies an object's value and prints the data to stdout.
	conf := []byte(`
		{
			"transforms":[
				{"type":"object_copy","settings":{"object":{"source_key":"a","target_key":"c"}}},
				{"type":"send_stdout"}
			]
		}
	`)

	cfg := substation.Config{}
	if err := json.Unmarshal(conf, &cfg); err != nil {
		// Handle error.
		panic(err)
	}

	// Create a new Substation instance.
	sub, err := substation.New(ctx, cfg)
	if err != nil {
		// Handle error.
		panic(err)
	}

	// Print the Substation configuration.
	fmt.Println(sub)

	// Substation instances process data defined as a Message. Messages can be processed
	// individually or in groups. This example processes multiple messages as a group.
	msg := []*message.Message{
		// The first message is a data message. Only data messages are transformed.
		message.New().SetData([]byte(`{"a":"b"}`)),
		// The second message is a ctrl message. ctrl messages flush the pipeline.
		message.New().AsControl(),
	}

	// Transform the group of messages. In this example, results are not used.
	if _, err := sub.Transform(ctx, msg...); err != nil {
		// Handle error.
		panic(err)
	}

}
Output:

{"transforms":[{"type":"object_copy","settings":{"object":{"source_key":"a","target_key":"c"}}},{"type":"send_stdout","settings":null}]}
{"a":"b","c":"b"}

func New

func New(ctx context.Context, cfg Config, opts ...func(*Substation)) (*Substation, error)

New returns a new Substation instance.

func (*Substation) String

func (s *Substation) String() string

String returns a JSON representation of the configuration.

func (*Substation) Transform

func (s *Substation) Transform(ctx context.Context, msg ...*message.Message) ([]*message.Message, error)

Transform runs the configured data transformation functions on the provided messages.

This is safe to use concurrently.

Directories

Path Synopsis
cmd
development/benchmark/substation
Benchmarks the performance of Substation by sending a configurable number of events through the system and reporting the total time taken, the number of events sent, the amount of data sent, and the rate of events and data sent per second.
Benchmarks the performance of Substation by sending a configurable number of events through the system and reporting the total time taken, the number of events sent, the amount of data sent, and the rate of events and data sent per second.
Package condition provides functions for evaluating data.
Package condition provides functions for evaluating data.
Package config provides structures for building configurations.
Package config provides structures for building configurations.
examples
internal
aws
aws/appconfig
package appconfig provides functions for interacting with AWS AppConfig.
package appconfig provides functions for interacting with AWS AppConfig.
aws/s3manager
package s3manager provides methods and functions for downloading and uploading objects in AWS S3.
package s3manager provides methods and functions for downloading and uploading objects in AWS S3.
config
package config provides configuration types and functions for Substation.
package config provides configuration types and functions for Substation.
file
package file provides functions that can be used to retrieve files from local and remote locations.
package file provides functions that can be used to retrieve files from local and remote locations.
kv
log
Package log wraps logrus and provides global logging only debug logging should be used in condition/, process/, and internal/ to reduce the likelihood of corrupting output for apps debug and info logging can be used in cmd/
Package log wraps logrus and provides global logging only debug logging should be used in condition/, process/, and internal/ to reduce the likelihood of corrupting output for apps debug and info logging can be used in cmd/
media
package media provides capabilities for inspecting the content of data and identifying its media (Multipurpose Internet Mail Extensions, MIME) type.
package media provides capabilities for inspecting the content of data and identifying its media (Multipurpose Internet Mail Extensions, MIME) type.
secrets
Package secrets provides functions for retrieving local and remote secrets and interpolating them into configuration files.
Package secrets provides functions for retrieving local and remote secrets and interpolating them into configuration files.
Package message provides functions for managing data used by conditions and transforms.
Package message provides functions for managing data used by conditions and transforms.
Package transform provides functions for transforming messages.
Package transform provides functions for transforming messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL