vpcflow

package module
v0.0.0-...-069fac2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2019 License: Apache-2.0 Imports: 17 Imported by: 4

README

go-vpc - Tools for working with AWS VPC Flow Logs

GoDoc Build Status codecov.io

Status: Incubation

Overview

AWS Flow Logs are a data source by which a team can detect anomalies in connection patterns, use of non-standard ports, or even view the interconnections of systems. To assist in the consumption and analysis of these logs, go-vpc provides the following feature set:

  • extract flow logs from an S3 bucket
  • filter out logs of interest based on log metadata
  • perform compactions on the data resulting in a digest for a particular network interface
  • convert the AWS VPC log file format into a DOT graph to easily visualize nodes and edges in a network

Usage

In order to get started, your AWS account should be configured to publish flow logs to S3

This project provides an iterator interface to interact with the objects in an S3 bucket as well as chains together io.ReadCloser streams to get access to bucket data.

Iterating over bucket objects

To iterate over the objects in an S3 bucket, use the vpcflow.BucketStateIterator. This will iterate over objects, and provide various metadata about the log files.

bucketIter := &vpcflow.BucketStateIterator{
	Bucket: bucket,
	Queue:  client,
}
for bucketIter.Iterate() {
	logFile := bucketIter.Current()
	...
}
err := bucketIter.Close()
// check error

To focus on a subset of data in your bucket, you can apply a prefix fileter to the iterator. Only objects with this prefix will be returned. By default, all objects in the bucket will be iterated over.

bucketIter := &vpcflow.BucketStateIterator{
	Bucket: bucket,
	Queue:  client,
	Prefix: "AWSLogs/123456789123/vpcflowlogs/us-west-2/2018/10/15",
}

Filtering bucket objects

It's probable that not all object in the S3 bucket will be of interest. A Log File decorator is provided to filter out log files which may not be relevant.

bucketIter := &vpcflow.BucketStateIterator{
	Bucket: bucket,
	Queue:  client,
}
filterIter := &vpcflow.BucketFilter{
	BucketIterator: bucketIter,
	Filter: vpcflow.LogFileTimeFilter{
		Start: start,
		End:   stop,
	},
}
for filterIter.Iterate() {
	logFile := filterIter.Current()
	...
}
err := bucketIter.Close()

Reading Log File contents

To consume the contents of the log files, vpcflow.BucketIteratorReader is provided to convert the iterator into a consumable stream of Log File contents. The reader should be initialized with an iterator from which to consume, and a FetchPolicy. The built-in FetchPolicy produces a vpcflow.FileManager which will eagerly fetch file contents before they are needed.

bucketIter := &vpcflow.BucketStateIterator{
	Bucket: bucket,
	Queue:  client,
}
filterIter := &vpcflow.BucketFilter{
	BucketIterator: bucketIter,
	Filter: vpcflow.LogFileTimeFilter{
		Start: start,
		End:   stop,
	},
}
readerIter := &vpcflow.BucketIteratorReader{
	BucketIterator: filterIter,
	FetchPolicy:    vpcflow.NewPrefetchPolicy(client, maxBytes, concurrency),
}

Digesting multiple log files

To compact log files, use the vpcflow.Digester component. This will aggregate all of the traffic between two nodes into one log line. Note, this will also result in a loss of some data, specifically, the ephemeral port.

d := &vpcflow.ReaderDigester{Reader: readerIter}
reader, err := d.Digest()

To illustrate, the following lines:

2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20641 80 6 20 1000 1418530010 1418530070 ACCEPT OK
2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20541 80 6 20 1000 1518530010 1518530070 ACCEPT OK
2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20441 80 6 20 1000 1618530010 1618530070 ACCEPT OK
2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20341 80 6 20 1000 1718530010 1718530070 REJECT OK
2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20241 80 6 20 1000 1818530010 1818530070 ACCEPT OK

would become:

2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 0 80 6 20 1000 1418530010 1818530070 REJECT OK
2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 0 80 6 80 8000 1418530010 1818530070 ACCEPT OK

Converting to DOT

The vpcflow.DOTConverter converts an AWS VPC Flow log file format into a DOT graph representation. This is useful for visualizing the nodes and edges in a network graph.

d := &vpcflow.ReaderDigester{Reader: readerIter}
digested, _ := d.Digest()
converted, err := vpcflow.DOTConvter(digested)

Contributing

License

This project is licensed under Apache 2.0. See LICENSE.txt for details.

Contributing Agreement

Atlassian requires signing a contributor's agreement before we can accept a patch. If you are an individual you can fill out the individual CLA. If you are contributing on behalf of your company then please fill out the corporate CLA.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DOTConverter

func DOTConverter(r io.ReadCloser) (io.ReadCloser, error)

DOTConverter takes in as input a sinle AWS VPC Flow Log file, or a digest of VPC Flow Logs, and converts the data into a DOT graph.DOTConverter. The input ReadCloser will be closed after conversion, the caller should close the output ReadCloser when done reading.

func NewPrefetchPolicy

func NewPrefetchPolicy(q s3iface.S3API, maxBytes int64, maxConcurrent int) func(BucketIterator) FileManager

NewPrefetchPolicy implements the signature required for the BucketIteratorReader.FetchPolicy by producing a FileManager that will pre-fetch content before it is requested. This can dramatically speed up reading but must be tuned to the correct concurrency and memory limits of a system.

Types

type BucketFilter

type BucketFilter struct {
	Filter LogFileFilter
	BucketIterator
}

BucketFilter is a BucketIterator wrapper that drops anything that fails the filter check.

func (*BucketFilter) Iterate

func (it *BucketFilter) Iterate() bool

Iterate will consume from the wrapped iterator until an element is found that passes the filter.

type BucketIterator

type BucketIterator interface {
	// Iterate pushes the cursor one record forward such that
	// the current value is fetched when calling Current().
	// This method should return false after all records have
	// been iterated over or an error is encountered attempting
	// to fetch records.
	Iterate() bool
	// Get the current value of the iterator.
	Current() LogFile
	// Close cleans up any resources used by the iterator and
	// returns an error, if any, that caused iterations to stop.
	Close() error
}

BucketIterator scans an S3 bucket and converts AWS API responses to LogFile records.

type BucketIteratorReader

type BucketIteratorReader struct {
	BucketIterator BucketIterator
	FetchPolicy    func(BucketIterator) FileManager
	// contains filtered or unexported fields
}

BucketIteratorReader converts implementations of the BucketIterator interfaces into an io.ReaderCloser that acts as a continuous stream of data from all the files returned by the interator.

func (*BucketIteratorReader) Close

func (r *BucketIteratorReader) Close() error

Close the reader and the underlying BucketIterator. The reader may not be used again after calling Close().

func (*BucketIteratorReader) Read

func (r *BucketIteratorReader) Read(b []byte) (int, error)

Read from files produced by the iterator as though they are one, continuous file.

type BucketStateIterator

type BucketStateIterator struct {
	Bucket string
	Prefix string

	Queue s3iface.S3API
	// contains filtered or unexported fields
}

BucketStateIterator holds the current state of the iterator.

func (BucketStateIterator) Close

func (iter BucketStateIterator) Close() error

Close cleans up any resources used by the iterator and returns an error, if any, that caused iterations to stop.

func (*BucketStateIterator) Current

func (iter *BucketStateIterator) Current() LogFile

Current gets the current value of the iterator.

func (*BucketStateIterator) Iterate

func (iter *BucketStateIterator) Iterate() bool

Iterate pushes the cursor one record forward such that the current value is fetched when calling Current(). This method should return false after all records have been iterated over or an error is encountered attempting to fetch records.

type Converter

type Converter func(io.ReadCloser) (io.ReadCloser, error)

Converter provides an interface for converting the input data into a different format, made available in the output io.ReadCloser

type Digester

type Digester interface {
	Digest() (io.ReadCloser, error)
}

Digester interface digests input data, and outputs an io.ReaderCloser from which the compacted data can be read

type FileManager

type FileManager interface {
	// Get a consumable reader for the next object from the manager.
	Get() (io.Reader, error)
	// Put a consumed reader back in the manager for cleanup.
	Put(io.Reader)
}

FileManager is the binding between an S3 file download strategy and the BucketIteratorReader. It will be called by the BucketIteratorReader to fetch a new S3 Object reader as well as to return consumed readers for cleanup.

type LogFile

type LogFile struct {
	// Bucket is the S3 bucket in which the logs are stored.
	Bucket string
	// Key is "key" value used by ListObject and GetObject.
	Key string
	// Account is the AWS account ID extraced from the log path.
	Account string
	// Region is the AWS region extracted from the log path.
	Region string
	// Timestamp is the value from the log file name.
	Timestamp time.Time
	// FlowLogID is the key for the VPC log resource in AWS.
	FlowLogID string
	// Hash is the checksum value extracted from the log file name.
	Hash string
	// Size of the file containing the logs.
	Size int64
}

LogFile is a structured representation of a VPC Flow log file. It should contain enough data that a consumer could fetch the file contents.

type LogFileAccountFilter

type LogFileAccountFilter struct {
	Account map[string]bool
}

LogFileAccountFilter reduces the set to only those from a particular set of accounts.

func (LogFileAccountFilter) FilterLogFile

func (f LogFileAccountFilter) FilterLogFile(lf LogFile) bool

FilterLogFile compares against the set of allowed accounts.

type LogFileFilter

type LogFileFilter interface {
	FilterLogFile(LogFile) bool
}

LogFileFilter is used to inspect LogFile instances and determine if they are fit to be emitted from a BucketIterator

type LogFileRegionFilter

type LogFileRegionFilter struct {
	Region map[string]bool
}

LogFileRegionFilter reduces the set to only those from a particular set of regions.

func (LogFileRegionFilter) FilterLogFile

func (f LogFileRegionFilter) FilterLogFile(lf LogFile) bool

FilterLogFile compares against the set of allowed regions.

type LogFileTimeFilter

type LogFileTimeFilter struct {
	Start time.Time
	End   time.Time
}

LogFileTimeFilter applies an inclusive start/end time bound to all files.

func (LogFileTimeFilter) FilterLogFile

func (f LogFileTimeFilter) FilterLogFile(lf LogFile) bool

FilterLogFile applies the time bound checks.

type MultiLogFileFilter

type MultiLogFileFilter []LogFileFilter

MultiLogFileFilter composes any number of filters into a single filter that returns false on the first failed filter or true if all filters pass.

func (MultiLogFileFilter) FilterLogFile

func (f MultiLogFileFilter) FilterLogFile(lf LogFile) bool

FilterLogFile executes all enclosed filters until one of them returns false.

type PrefetchFileManager

type PrefetchFileManager struct {
	// Queue is any implemenation of the S3API and will be used
	// to download the individual object contents.
	Queue s3iface.S3API
	// BucketIterator is the source from which the manager will
	// pull when deciding what to prefetch.
	BucketIterator BucketIterator
	// Lock is used to control concurrency of downloads.
	// Using a sync.Lock will result in sequential downloads
	// while using the Semaphore will allow up to N
	// number of concurrent downloads in the background.
	Lock sync.Locker
	// MaxBytes is used to control the amount of data fetched
	// into memory from S3. While the Lock attributes controls
	// the maximum concurrent downloads, this attribute controls
	// how much data is actually pre-fetched. Ideally, this value
	// is larger than some number of individual objects in order
	// to allow for actual prefetching of data. In the event that
	// prefetching the next object would put the buffer over the
	// limit the prefetching will stop until the buffer is drained
	// enough to contain the next file.
	//
	// One exception to this is when the buffer is empty and the next
	// file is still larger, on its own, than the max bytes. In this
	// case, the prefetcher will still download the file but will then
	// wait for the buffer to drain before downloading the next file.
	// This means that if the MaxBytes are set to less than the average
	// file size then the prefetcher may degenerate into sequential
	// downloads.
	MaxBytes int64
	// Ready is the channel/buffer on which downloaded files are placed
	// while awaiting consumption. This channel may be given a buffer
	// size or be blocking.
	Ready chan io.Reader
	// contains filtered or unexported fields
}

PrefetchFileManager implements the FileManager interface by eagerly fetching content in the background to maximize the chances of having a reader ready whenever a consumer asks for one. Typically, this is created with the NewPrefetchPolicy method and used as the FetchPolicy for the BucketIteratorReader.

func (*PrefetchFileManager) Get

func (f *PrefetchFileManager) Get() (io.Reader, error)

Get a prefetched file. If prefetch is lagging behind then this call will block until a file is available. If any error was encountered since the last call to Get then it is returned.

func (*PrefetchFileManager) Prefetch

func (f *PrefetchFileManager) Prefetch()

Prefetch starts a loop that consumes from the attached BucketIterator and attempts to load that content before it is needed.

func (*PrefetchFileManager) Put

func (f *PrefetchFileManager) Put(r io.Reader)

Put returns a file to the manager for cleanup.

type ReaderDigester

type ReaderDigester struct {
	Reader io.ReadCloser
}

ReaderDigester is responsible for compacting multiple VPC flow log lines into fewer, summarized lines.

func (*ReaderDigester) Digest

func (d *ReaderDigester) Digest() (io.ReadCloser, error)

Digest reads from the given io.Reader, and compacts multiple VPC flow log lines, producing a digest of the material made available via the resulting io.ReadCloser. A digest is created by squashing "stable" values together, and aggregating more volatile values. Stable values would be the srcaddr, dstaddr, dstport, protocol, and action values. These are not as likely to change with great frequency as the more volatile values such as srcport, start, end, log-status, bytes, and packets. For the most part, these volatile values will change with every entry even when the stable values are exactly the same.

type Semaphore

type Semaphore struct {
	C chan interface{}
}

Semaphore implements the sync.Locker interface to help with concurrency control for fetching files. The buffer size of C defines the max concurrent callers of Lock.

func (Semaphore) Lock

func (c Semaphore) Lock()

Lock attempts to acquire the semaphore. If the limit has not yet been reached then the call returns immediately. If the limit is reached then this call blocks until the number of concurrent lock holders crosses back under the limit.

func (Semaphore) Unlock

func (c Semaphore) Unlock()

Unlock indicates that the caller no longer needs space in the semaphore. This must be called at the end of a critical section just like any other Locker implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL