models

package
v0.0.0-...-cbcb865 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildMergedS3ObjectKey

func BuildMergedS3ObjectKey(prefix, schema, partition, chunkKey string) string

BuildMergedS3ObjectKey creates S3 key of merged object from paramters

Types

type AthenaTableName

type AthenaTableName string

AthenaTableName idicates table name and directory name

const (
	// AthenaTableIndex is table name for index objects and directory name
	AthenaTableIndex AthenaTableName = "indices"
	// AthenaTableMessage is table name for message objects and directory name
	AthenaTableMessage = "messages"
)

type Chunk

type Chunk struct {
	Schema    string   `dynamo:"schema"`
	RecordIDs []string `dynamo:"record_ids,set"`
	TotalSize int64    `dynamo:"total_size"`
	Partition string   `dynamo:"partition"`
	CreatedAt int64    `dynamo:"created_at"`
	ChunkKey  string   `dynamo:"chunk_key"`
	Freezed   bool     `dynamo:"freezed"`

	// For DynamoDB
	PK string `dynamo:"pk"`
	SK string `dynamo:"sk"`
}

func NewChunkFromDynamoEvent

func NewChunkFromDynamoEvent(image map[string]events.DynamoDBAttributeValue) (*Chunk, error)

NewChunkFromDynamoEvent builds Chunk by DynamoDBAttributeValue

type ComposeQueue

type ComposeQueue struct {
	RecordID  string   `json:"record_id"`
	S3Object  S3Object `json:"s3_object"`
	Size      int64    `json:"size"`
	Schema    string   `json:"schema"`
	Partition string   `json:"partition"`
}

ComposeQueue is sent by indexer and received by composer

type IndexRecord

type IndexRecord struct {
	Tag string `parquet:"name=tag, type=UTF8, encoding=PLAIN_DICTIONARY" json:"tag" msgpack:"tag"`
	// Timestamp is unixtime (second) of original log.
	Timestamp int64  `parquet:"name=timestamp, type=INT64" json:"timestamp" msgpack:"timestamp"`
	Field     string `parquet:"name=field, type=UTF8, encoding=PLAIN_DICTIONARY" json:"field" msgpack:"field"`
	Term      string `parquet:"name=term, type=UTF8, encoding=PLAIN_DICTIONARY" json:"term" msgpack:"term"`
	ObjectID  int64  `parquet:"name=object_id, type=INT64" json:"object_id" msgpack:"object_id"`
	Seq       int32  `parquet:"name=seq, type=INT32" json:"seq" msgpack:"seq"`
}

IndexRecord is used for inverted index of log files on S3 bucket.

type LogQueue

type LogQueue struct {
	Err       error
	Timestamp time.Time
	Tag       string
	Message   string
	Value     interface{}
	Seq       int32
	Src       S3Object
}

LogQueue is used in indexer

type MergeQueue

type MergeQueue struct {
	Schema    ParquetSchemaName `json:"schema"`
	TotalSize int64             `json:"total_size"`
	RecordIDs []string          `json:"record_ids"`
	DstObject S3Object          `json:"dst_object"`
}

MergeQueue specify src object locations to be merged and destination object location.

type MessageRecord

type MessageRecord struct {
	// Timestamp is unixtime (second) of original log.
	Timestamp int64  `parquet:"name=timestamp, type=INT64" json:"timestamp" msgpack:"timestamp"`
	ObjectID  int64  `parquet:"name=object_id, type=INT64" json:"object_id" msgpack:"object_id"`
	Seq       int32  `parquet:"name=seq, type=INT32" json:"seq" msgpack:"seq"`
	Message   string `parquet:"name=message, type=UTF8, encoding=PLAIN_DICTIONARY" json:"message" msgpack:"message"`
}

MessageRecord stores original log message that is encoded to JSON.

type ParquetSchemaName

type ParquetSchemaName string

ParquetSchemaName identifies schema name

const (
	// ParquetSchemaIndex indicates IndexRecord parquet schema
	ParquetSchemaIndex ParquetSchemaName = "index"
	// ParquetSchemaMessage indicates MessageRecord
	ParquetSchemaMessage ParquetSchemaName = "message"
)

type PartitionQueue

type PartitionQueue struct {
	Location  string            `json:"location"`
	TableName string            `json:"table_name"`
	Keys      map[string]string `json:"keys"`
}

PartitionQueue is arguments of partitioner to add a new partition

type RawObject

type RawObject struct {
	DataSize int64
	// contains filtered or unexported fields
}

RawObject is converted from original log message, but not merged. File format of the object is not defined and any encoding is acceptable by DumpService. This structure is used to indicate path of S3 and partition for Athena.

func NewRawObject

func NewRawObject(prefix *RawObjectPrefix, ext string) *RawObject

NewRawObject is constrcutor of RawObject. *ext* is extension of the object.

func (*RawObject) Object

func (x *RawObject) Object() *S3Object

Object returns

func (*RawObject) Partition

func (x *RawObject) Partition() string

Partition returns a part of path

func (*RawObject) PartitionKeys

func (x *RawObject) PartitionKeys() map[string]string

PartitionKeys returns map of partition name and value

func (*RawObject) PartitionPath

func (x *RawObject) PartitionPath() string

PartitionPath returns S3 path to top of the partition. The path including s3:// prefix and bucket name. e.g.) s3://your-bucket/prefix/indicies/dt=2020-01-02-03/

func (*RawObject) Schema

func (x *RawObject) Schema() string

Schema returns schema name as string type

func (*RawObject) TableName

func (x *RawObject) TableName() string

TableName returns Athena table name as string type

type RawObjectPrefix

type RawObjectPrefix struct {
	// contains filtered or unexported fields
}

RawObjectPrefix is basic information of RawObject. Basically RawObject is one-on-one relationship to original log S3 object. However sometimes multiple RawObjects are generated from one original log object because too large object size. (Large object can not be converted to Parquet file because of OOM error.) Then base parameters are independent from RawObject.

func NewRawObjectPrefix

func NewRawObjectPrefix(schema ParquetSchemaName, base, src S3Object, ts time.Time) *RawObjectPrefix

NewRawObjectPrefix is constructor of RawObjectPrefix. *base* must has destination S3 bucket and prefix. *src* indicates S3 object of original logs. *ts* is log timestamp to identify partition.

func (*RawObjectPrefix) Key

func (x *RawObjectPrefix) Key() string

Key of RawObjectPrefix returns unique key for RawObject

func (*RawObjectPrefix) Schema

func (x *RawObjectPrefix) Schema() ParquetSchemaName

Schema is getter of schema

type Record

type Record interface{}

Record is interface of Index and Message records

type RecordQueue

type RecordQueue struct {
	Err     error
	Records []Record
}

RecordQueue is used for RecordService.Load

type S3Object

type S3Object struct {
	Region string `json:"region" dynamo:"s3_region"`
	Bucket string `json:"bucket" dynamo:"s3_bucket"`
	Key    string `json:"key" dynamo:"s3_key"`
}

func DecodeS3Object

func DecodeS3Object(raw string) (*S3Object, error)

func NewS3Object

func NewS3Object(region, bucket, key string) S3Object

func NewS3ObjectFromRecord

func NewS3ObjectFromRecord(record events.S3EventRecord) S3Object

func (*S3Object) AppendKey

func (x *S3Object) AppendKey(postfix string) *S3Object

func (*S3Object) Encode

func (x *S3Object) Encode() string

func (*S3Object) Path

func (x *S3Object) Path() string

Path returns full path by s3 bucket name and key. e.g.) s3://your-bucket/some/key

type S3Objects

type S3Objects struct {
	Raw string `json:"raw"`
	// contains filtered or unexported fields
}

func NewS3Objects

func NewS3Objects(objects []*S3Object) (*S3Objects, error)

func (*S3Objects) Append

func (x *S3Objects) Append(objects ...*S3Object) error

func (*S3Objects) Export

func (x *S3Objects) Export() ([]*S3Object, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL