eventstream

package module
v0.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

README

Eventstream message pipeline service

License Docker Pulls Go Report Card Coverage Status Testing Status Publish Docker Status

Eventstream pipeline for storing and re-sending events inside the system.

go get -v -u github.com/geniusrabbit/eventstream/cmd/eventstream

Run eventstream service in docker

docker run -d -it --rm -v ./custom.config.hcl:/config.hcl \
  geniusrabbit/eventstream

Source list

  • kafka
  • NATS & NATS stream
  • Redis stream

Storage list

  • Clickhouse
  • Vertica
  • kafka
  • NATS
  • Redis stream

Config example

Supports two file formats YAML & HCL

stores {
  clickhouse_1 {
    connect = "{{@env:CLICKHOUSE_STORE_CONNECT}}"
    buffer = 1000
    init_query = [<<Q
      CREATE TABLE IF NOT EXISTS stat.testlog (
         timestamp        DateTime
       , datemark         Date default toDate(timestamp)
       , service          String
       , msg              String
       , error            String
       , created_at       DateTime default now()
      ) Engine=Memory COMMENT 'The test table';
    Q]
  }
  kafka_1 {
    connect = "{{@env:KAFKA_EVENTS_CONNECT}}"
  }
}

// Source could be any supported stream service like kafka, nats, etc...
sources {
  nats_1 {
    connect = "{{@env:NATS_SOURCE_CONNECT}}"
    format  = "json"
  }
}

// Streams it's pipelines which have source and destination store
streams {
  log_1 {
    store  = "clickhouse_1"
    source = "nats_1"
    target = "testlog"
    // Optional if fields in log and in message the same
    // Transforms into:
    //   INSERT INTO testlog (service, msg, error, timestamp) VALUES($srv, $msg, $err, @toDateTime($timestamp))
    fields = "service=srv,msg,error=err,timestamp=@toDateTime({{timestamp:date}})"
    where  = "srv == \"main\""
    metrics = [
      {
        name = "log.counter"
        type = "counter"
        tags {
          server  = "{{srv}}"
        }
      }
    ]
  }
  kafka_retranslate {
    store  = "kafka_1"
    source = "nats_1"
    targets = [
      {
        fields = {
          server = "{{srv}}"
          timestamp = "{{timestamp}}"
        }
        where = "type = \"statistic\""
      }
    ]
    where = "srv = \"events\""
  }
}

Metrics

Metrics helps analyze some events during processing and monitor streams state. Every stream can process metrics with the keyword metrics.

Example:

metrics = [
  {
    name = "log.counter"
    type = "counter"
    tags { server = "{{srv}}" }
  },
  {
    name = "actions.counter"
    type = "counter"
    tags { action = "{{action}}" }
  },
  {...}
]

All metrics available by URL /metrics with prometheus protocol. To activate metrics need to define profile connection port.

SERVER_PROFILE_MODE=net
SERVER_PROFILE_LISTEN=:6060

Health check

curl "http://hostname:port/health-check"
{"status":"OK"}

TODO

  • Add processing custom error metrics
  • Add MySQL database storage
  • Add PostgreSQL database storage
  • Add MongoDB database storage
  • Add Redis database storage
  • Prepare evetstream as Framework extension
  • Add Kafka stream writer support
  • Add NATS stream writer support
  • Add Redis stream source/storage support
  • Add RabbitMQ stream source/storage support
  • Add health check API
  • Add customizable prometheus metrics
  • Add 'where' stream condition (http://github.com/Knetic/govaluate)
  • Ack message only if success
  • Buffering all data until be stored
  • Add support HCL config

Documentation

Index

Constants

View Source
const (
	FieldTypeString     = message.FieldTypeString
	FieldTypeFixed      = message.FieldTypeFixed
	FieldTypeUUID       = message.FieldTypeUUID
	FieldTypeInt        = message.FieldTypeInt
	FieldTypeInt32      = message.FieldTypeInt32
	FieldTypeInt8       = message.FieldTypeInt8
	FieldTypeUint       = message.FieldTypeUint
	FieldTypeUint32     = message.FieldTypeUint32
	FieldTypeUint8      = message.FieldTypeUint8
	FieldTypeFloat      = message.FieldTypeFloat
	FieldTypeBoolean    = message.FieldTypeBoolean
	FieldTypeIP         = message.FieldTypeIP
	FieldTypeDate       = message.FieldTypeDate
	FieldTypeUnixnano   = message.FieldTypeUnixnano
	FieldTypeArrayInt32 = message.FieldTypeArrayInt32
	FieldTypeArrayInt64 = message.FieldTypeArrayInt64
)

Field scalar types enum

Variables

View Source
var EmptyMessage = message.EmptyMessage

Functions

This section is empty.

Types

type FieldType

type FieldType = message.FieldType

func TypeByString

func TypeByString(t string) FieldType

TypeByString name

type Formater

type Formater = message.Formater

type MapMessage added in v0.5.2

type MapMessage = message.MapMessage

type Message

type Message = message.Message

func MessageDecode

func MessageDecode(data []byte, converter unmarshalel) (msg Message, err error)

MessageDecode from bytes

type Sourcer

type Sourcer interface {
	// Close extension
	io.Closer

	// Subscribe new stream to data processing.
	// For all subscribed streams sends the same data messages
	Subscribe(ctx context.Context, streams ...Streamer) error

	// Start runs observing for data writing into subscribed streams
	Start(ctx context.Context) error
}

Sourcer describes the input stream interface. The source accepts messages from some queue popeline like Kafka, NATS, RabbitMQ and etc and send this data one by one into the stream processor.

type StorageConfig added in v0.2.1

type StorageConfig struct {
	Debug   bool
	Connect string
	Driver  string
	Buffer  uint
	Raw     json.RawMessage
}

StorageConfig of the storage

func (*StorageConfig) Decode added in v0.2.1

func (c *StorageConfig) Decode(v any) error

Decode raw data to the target object

func (*StorageConfig) UnmarshalJSON added in v0.2.1

func (c *StorageConfig) UnmarshalJSON(data []byte) (err error)

UnmarshalJSON data

func (*StorageConfig) Validate added in v0.2.1

func (c *StorageConfig) Validate() error

Validate config

type Storager

type Storager interface {
	// Closer extension of interface
	io.Closer

	// Stream returns new stream writer for some specific configs
	Stream(opts ...any) (Streamer, error)
}

Storager describe method of interaction with storage. Storage creates new stream interfaces to process data from sources.

type StreamConfig added in v0.2.1

type StreamConfig = stream.Config

StreamConfig of the stream

type Streamer

type Streamer = stream.Streamer

Streamer interface of data processing describes basic methods of data pipeline

func NewStreamWrapper

func NewStreamWrapper(stream Streamer, where condition.Condition, metrics metrics.Metricer) Streamer

NewStreamWrapper with support condition

Directories

Path Synopsis
cmd
examples
internal
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
sql
utils

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL