rlogs

package module
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2020 License: BSD-3-Clause Imports: 12 Imported by: 2

README

rlogs: A framework to load remote log files in Go

Travis-CI Report card

rlogs is a framework to download and parse a log file on remote object storage (currently only AWS S3 is supported). It's good architecture to send log files to high availablity and scalable object storage such as AWS S3. In general, object storage does not care schema of log and the logs can be put easily. However a user and system to leverage stored logs in object storage need to parse the logs before leveraging. Then the schema of the logs should be managed and this framework support the task.

Getting Started

func main() {
	pipeline := rlogs.Pipeline{
		Psr: &parser.JSON{
			Tag:             "ts",
			TimestampField:  rlogs.String("ts"),
			TimestampFormat: rlogs.String("2006-01-02T15:04:05"),
		},
		Ldr: &rlogs.S3LineLoader{},
	}

	reader := rlogs.NewReader([]*rlogs.LogEntry{
		{
			Pipe: pipeline,
			Src: &rlogs.AwsS3LogSource{
				Region: "ap-northeast-1",
				Bucket: "your-bucket",
				Key:    "http/",
			},
		},
	})

	// s3://your-bucket/http/log.json is following:
	// {"ts":"2019-10-10T10:00:00","src":"10.1.2.3","port":34567,"path":"/hello"}
	// {"ts":"2019-10-10T10:00:02","src":"10.2.3.4","port":45678,"path":"/world"}

	ch := reader.Read(&rlogs.AwsS3LogSource{
		Region: "some-region",
		Bucket: "your-bucket",
		Key:    "http/log.json",
	})

	for q := range ch {
		if q.Error != nil {
			log.Fatal(q.Error)
		}
		values := q.Log.Values.(map[string]interface{})
		fmt.Printf("[log] tag=%s time=%s src=%v\n", q.Log.Tag, q.Log.Timestamp, values["src"])
	}
	// Output:
	// [log] tag=ts time=2019-10-10 10:00:00 +0000 UTC src=10.1.2.3
	// [log] tag=ts time=2019-10-10 10:00:02 +0000 UTC src=10.2.3.4
}

Usage

Reader

BasicReader is provided for now. This reader has slice of rlogs.LogEntry that has Parser, Loader and LogSource. When calling Read(*LogSource) function, the reader checks given LogSource with LogSource one by one. If an entry hits, download S3 object and parse it with Loader and Parser in the entry.

Loader
  • S3LineLoader: Download AWS S3 object and split the file line by line
  • S3FileLoader: Download AWS S3 object and pass whole data of the object to Parser directly
Parser

Following parser is available in this pacakge.

  • JSON: Generic JSON parser. A field name and time foramt are required as arguments.
  • VpcFlowLogs: Parse VPC flog log S3 object taht is put by VPCFlowLogs directly. The parser requires S3LineLoader
  • CloudTrail: Parse CloudTrail S3 object log taht is put by CloudTrail directly. The parser requires S3FileLoader

License

Documentation

Index

Examples

Constants

View Source
const Version = "v0.1.1"

Version of rlogs pacakge

Variables

View Source
var Logger = logrus.New()

Logger is logrus based logger and exposed to be controlled from outside also.

View Source
var NewS3Client = newAwsS3Client

NewS3Client is constructor of AWS S3 client. It can be replaced for testing

Functions

func String

func String(s string) *string

String function just converts string to string pointer

Types

type AwsS3LogSource

type AwsS3LogSource struct {
	Region string // required
	Bucket string // required
	Key    string // required
}

AwsS3LogSource indicates location of AWS S3 object

func (*AwsS3LogSource) Contains

func (x *AwsS3LogSource) Contains(src LogSource) bool

Contains checks if src is included in own AwsS3LogSource

type Loader

type Loader interface {
	Load(src LogSource) chan *MessageQueue
}

Loader downloads object from cloud object storage and create MessageQueue(s)

type LogEntry

type LogEntry struct {
	Src  LogSource
	Pipe Pipeline
}

LogEntry is a pair of Loader and Paresr

type LogQueue

type LogQueue struct {
	Log   *LogRecord
	Error error
}

LogQueue is message queue between Reader and main procedure. It includes both of LogRecord and Error but should be set either one.

type LogRecord

type LogRecord struct {
	// Tag indicates log type (log schema)
	Tag string
	// Timestamp comes from log data
	Timestamp time.Time
	// Raw is raw log data
	Raw []byte
	// Value is parsed log data
	Values interface{}
	// Sequence number in log object
	Seq int
	// Log source location
	Src LogSource
}

LogRecord has not only log message (original log) but also parsed meta data.

type LogSource

type LogSource interface {
	Contains(src LogSource) bool
}

LogSource indicates location of log object data. Only AWS S3 is supported for now.

type MessageQueue

type MessageQueue struct {
	Error error
	Raw   []byte
	Seq   int
	Src   LogSource
}

MessageQueue is a queue bring raw log message and sequence between Loader and Parser

type Parser

type Parser interface {
	Parse(msg *MessageQueue) ([]*LogRecord, error)
}

Parser converts raw log message to LogRecord(s)

type Pipeline added in v0.1.1

type Pipeline struct {
	Ldr       Loader
	Psr       Parser
	QueueSize int
}

Pipeline is a pair of Parser and Loader.

func (*Pipeline) Run added in v0.1.1

func (x *Pipeline) Run(src LogSource, ch chan *LogQueue)

Run of Pipeline downloads object and parse it.

type Reader

type Reader struct {
	LogEntries []*LogEntry
	QueueSize  int
}

Reader provides basic structured Reader with naive implementation

Example
// To avoid accessing actual S3.
dummy := dummyS3ClientForReader{}
rlogs.InjectNewS3Client(&dummy)
defer rlogs.FixNewS3Client()

// Example is below
pipeline := rlogs.Pipeline{
	Psr: &parser.JSON{
		Tag:             "ts",
		TimestampField:  rlogs.String("ts"),
		TimestampFormat: rlogs.String("2006-01-02T15:04:05"),
	},
	Ldr: &rlogs.S3LineLoader{},
}

reader := rlogs.NewReader([]*rlogs.LogEntry{
	{
		Pipe: pipeline,
		Src: &rlogs.AwsS3LogSource{
			Region: "ap-northeast-1",
			Bucket: "your-bucket",
			Key:    "http/",
		},
	},
})

// s3://your-bucket/http/log.json is following:
// {"ts":"2019-10-10T10:00:00","src":"10.1.2.3","port":34567,"path":"/hello"}
// {"ts":"2019-10-10T10:00:02","src":"10.2.3.4","port":45678,"path":"/world"}

ch := reader.Read(&rlogs.AwsS3LogSource{
	Region: "ap-northeast-1",
	Bucket: "your-bucket",
	Key:    "http/log.json",
})

for q := range ch {
	if q.Error != nil {
		log.Fatal(q.Error)
	}
	values := q.Log.Values.(map[string]interface{})
	fmt.Printf("[log] tag=%s time=%s src=%v\n", q.Log.Tag, q.Log.Timestamp, values["src"])
}
Output:

[log] tag=ts time=2019-10-10 10:00:00 +0000 UTC src=10.1.2.3
[log] tag=ts time=2019-10-10 10:00:02 +0000 UTC src=10.2.3.4

func NewReader added in v0.1.1

func NewReader(entries []*LogEntry) *Reader

NewReader is constructor of Reader

func (*Reader) Read

func (x *Reader) Read(src LogSource) chan *LogQueue

type S3FileLoader

type S3FileLoader struct{}

S3FileLoader is for whole file data (not line delimitered) on AWS S3

func (*S3FileLoader) Load

func (x *S3FileLoader) Load(src LogSource) chan *MessageQueue

Load of S3LineLoader reads a log object as one log message

type S3LineLoader

type S3LineLoader struct {
	ScanBufferSize  int
	ScanBufferLimit int
}

S3LineLoader is for line delimitered log file on AWS S3

func (*S3LineLoader) Load

func (x *S3LineLoader) Load(src LogSource) chan *MessageQueue

Load of S3LineLoader reads a log object line by line

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL