engine

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2018 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package engine can read from any endpoints that provides expvar data and ships them to elasticsearch. You can inspect the metrics with kibana.

Please refer to golang's expvar documentation for more information. Installation guides can be found on github page: https://github.com/arsham/expipe

At the heart of this package, there is Engine. It acts like a glue between multiple Readers and a Recorder. Messages are transferred in a package called DataContainer, which is a list of DataType objects.

Example configuration

Save it somewhere (let's call it expipe.yml for now):

settings:
    log_level: info

readers:                           # You can specify the applications you want to show the metrics
    FirstApp:                      # service name
        type: expvar               # the type of reader. More to come soon!
        type_name: AppVastic       # this will be the _type in elasticsearch
        endpoint: localhost:1234   # where the application
        routepath: /debug/vars     # the endpoint that app provides the metrics
        interval: 500ms            # every half a second, it will collect the metrics.
        timeout: 3s                # in 3 seconds it gives in if the application is not responsive
        backoff: 10                # after 10 times the application didn't response, it will stop reading from it
    AnotherApplication:
        type: expvar
        type_name: this_is_awesome
        endpoint: localhost:1235
        routepath: /metrics
        timeout: 13s
        backoff: 10

recorders:                         # This section is where the data will be shipped to
    main_elasticsearch:
        type: elasticsearch        # the type of recorder. More to come soon!
        endpoint: 127.0.0.1:9200
        index_name: expipe
        timeout: 8s
        backoff: 10
    the_other_elasticsearch:
        type: elasticsearch
        endpoint: 127.0.0.1:9201
        index_name: expipe
        timeout: 18s
        backoff: 10

routes:                            # You can specify metrics of which application will be recorded in which target
    route1:
        readers:
            - FirstApp
        recorders:
            - main_elasticsearch
    route2:
        readers:
            - FirstApp
            - AnotherApplication
        recorders:
            - main_elasticsearch
    route3:                      # Yes, you can have multiple!
        readers:
            - AnotherApplication
        recorders:
            - main_elasticsearch
            - the_other_elasticsearch

Then run the application:

expipe -c expipe.yml

You can mix and match the routes, but the engine will choose the best set-up to achieve your goal without duplicating the results. For instance assume you set the routes like this:

readers:
    app_0: type: expvar
    app_1: type: expvar
    app_2: type: expvar
    app_3: type: expvar
    app_4: type: expvar
    app_5: type: expvar
    not_used_app: type: expvar # note that this one is not specified in the routes, therefore it is ignored
recorders:
    elastic_0: type: elasticsearch
    elastic_1: type: elasticsearch
    elastic_2: type: elasticsearch
    elastic_3: type: elasticsearch
routes:
    route1:
        readers:
            - app_0
            - app_2
            - app_4
        recorders:
            - elastic_1
    route2:
        readers:
            - app_0
            - app_5
        recorders:
            - elastic_2
            - elastic_3
    route3:
        readers:
            - app_1
            - app_2
        recorders:
            - elastic_0
            - elastic_1

Expipe creates three engines like so:

elastic_0 records data from app_0, app_1
elastic_1 records data from app_0, app_1, app_2, app_4
elastic_2 records data from app_0, app_5
elastic_3 records data from app_0, app_5

You can change the numbers to your liking:

gc_types:                      # These inputs will be collected into one list and zero values will be removed
    memstats.PauseEnd
    memstats.PauseNs

memory_bytes:                   # These values will be transoformed from bytes
    StackInuse: mb              # To MB
    memstats.Alloc: gb          # To GB

To run the tests for the codes, in the root of the application run:

go test $(glide nv)

Or for testing readers:

go test ./readers

To show the coverage, use this gist: https://gist.github.com/arsham/f45f7e7eea7e18796bc1ed5ced9f9f4a. Then run:

gocover

It will open a browser tab and show you the coverage.

To run all benchmarks:

go test $(glide nv) -run=^$ -bench=.

For showing the memory and cpu profiles, on each folder run:

BASENAME=$(basename $(pwd))
go test -run=^$ -bench=. -cpuprofile=cpu.out -benchmem -memprofile=mem.out
go tool pprof -pdf $BASENAME.test cpu.out > cpu.pdf && open cpu.pdf
go tool pprof -pdf $BASENAME.test mem.out > mem.pdf && open mem.pdf

License

Use of this source code is governed by the Apache 2.0 license. License that can be found in the LICENSE file.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrNoReader   = fmt.Errorf("no reader provided")
	ErrNoRecorder = fmt.Errorf("no recorder provided")
	ErrNoLogger   = fmt.Errorf("no logger provided")
	ErrNoCtx      = fmt.Errorf("no ctx provided")
)

Errors returning from Engine operations.

Functions

func Start

func Start(e Engine) chan struct{}

Start begins pulling data from DataReader and chip them to the DataRecorder. When the context is cancelled or timed out, the engine abandons its operations and returns an error if accrued.

Example

You need at least a pair of DataReader and DataRecorder to start an engine. In this example we are using the mocked versions.

log := tools.DiscardLogger()
ctx, cancel := context.WithCancel(context.Background())
recorded := make(chan string)

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	recorded <- "Job was recorded"
}))
defer ts.Close()

red := getReader(log)
rec := getRecorders(log, ts.URL)
e, err := engineWithReadRecs(ctx, log, red, rec)
if err != nil {
	log.Fatalln("This error should not happen:", err)
}
done := make(chan struct{})
go func() {
	engine.Start(e)
	done <- struct{}{}
}()
fmt.Println("Engine creation success:", err == nil)
fmt.Println(<-recorded)

cancel()
<-done
fmt.Println("Client closed gracefully")
Output:

Engine creation success: true
Job was recorded
Client closed gracefully

func WithCtx

func WithCtx(ctx context.Context) func(Engine) error

WithCtx uses ctx as the Engine's background context.

Example
package main

import (
	"context"
	"fmt"

	"github.com/arsham/expipe/engine"
)

func main() {
	ctx := context.Background()
	o := &engine.Operator{}
	err := engine.WithCtx(ctx)(o)
	fmt.Println("Error:", err)
	fmt.Println("o.Ctx() == ctx:", o.Ctx() == ctx)

}
Output:

Error: <nil>
o.Ctx() == ctx: true

func WithLogger

func WithLogger(log tools.FieldLogger) func(Engine) error

WithLogger sets the logger.

Example
package main

import (
	"fmt"

	"github.com/arsham/expipe/engine"
	"github.com/arsham/expipe/tools"
)

func main() {
	log := tools.DiscardLogger()
	o := &engine.Operator{}
	err := engine.WithLogger(log)(o)
	fmt.Println("Error:", err)
	fmt.Println("o.Log() == log:", o.Log() == log)

}
Output:

Error: <nil>
o.Log() == log: true

func WithReader added in v0.11.0

func WithReader(red reader.DataReader) func(Engine) error

WithReader builds up the reader.

Example
package main

import (
	"fmt"
	"net/http"
	"net/http/httptest"
	"time"

	"github.com/arsham/expipe/engine"
	"github.com/arsham/expipe/reader"

	rdt "github.com/arsham/expipe/reader/testing"
	"github.com/arsham/expipe/tools"
)

func readerWithUrl(url string) reader.DataReader {
	log := tools.DiscardLogger()
	red, err := rdt.New(
		reader.WithLogger(log),
		reader.WithEndpoint(url),
		reader.WithName("reader_example"),
		reader.WithTypeName("typeName"),
		reader.WithInterval(time.Millisecond*100),
		reader.WithTimeout(time.Second),
		reader.WithBackoff(5),
	)
	if err != nil {
		log.Fatalln("This error should not happen:", err)
	}
	return red
}

func main() {
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
	red := readerWithUrl(ts.URL)

	o := &engine.Operator{}
	err := engine.WithReader(red)(o)
	fmt.Println("Error:", err)

}
Output:

Error: <nil>
Example (PingError)

If the DataReader couldn't ping, it will return an error.

package main

import (
	"fmt"
	"net/http"
	"net/http/httptest"
	"reflect"
	"time"

	"github.com/arsham/expipe/engine"
	"github.com/arsham/expipe/reader"

	rdt "github.com/arsham/expipe/reader/testing"
	"github.com/arsham/expipe/tools"
)

func readerWithUrl(url string) reader.DataReader {
	log := tools.DiscardLogger()
	red, err := rdt.New(
		reader.WithLogger(log),
		reader.WithEndpoint(url),
		reader.WithName("reader_example"),
		reader.WithTypeName("typeName"),
		reader.WithInterval(time.Millisecond*100),
		reader.WithTimeout(time.Second),
		reader.WithBackoff(5),
	)
	if err != nil {
		log.Fatalln("This error should not happen:", err)
	}
	return red
}

func main() {
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
	ts.Close()
	red := readerWithUrl(ts.URL)

	o := &engine.Operator{}
	err := engine.WithReader(red)(o)
	fmt.Println("Error type:", reflect.TypeOf(err))

}
Output:

Error type: engine.PingError

func WithRecorders added in v0.11.0

func WithRecorders(recs ...recorder.DataRecorder) func(Engine) error

WithRecorders builds up the recorder and checks them.

Example
package main

import (
	"fmt"
	"net/http"
	"net/http/httptest"
	"time"

	"github.com/arsham/expipe/engine"
	"github.com/arsham/expipe/recorder"

	rct "github.com/arsham/expipe/recorder/testing"
	"github.com/arsham/expipe/tools"
)

func recorderWithUrl(url string) recorder.DataRecorder {
	log := tools.DiscardLogger()
	rec, err := rct.New(
		recorder.WithLogger(log),
		recorder.WithEndpoint(url),
		recorder.WithName("recorder_example"),
		recorder.WithIndexName("indexName"),
		recorder.WithTimeout(time.Second),
		recorder.WithBackoff(5),
	)
	if err != nil {
		log.Fatalln("This error should not happen:", err)
	}
	return rec
}

func main() {
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
	rec := recorderWithUrl(ts.URL)
	o := &engine.Operator{}
	err := engine.WithRecorders(rec)(o)
	fmt.Println("Error:", err)

}
Output:

Error: <nil>
Example (PingError)

If the DataRecorder couldn't ping, it will return an error.

package main

import (
	"fmt"
	"net/http"
	"net/http/httptest"
	"reflect"
	"time"

	"github.com/arsham/expipe/engine"
	"github.com/arsham/expipe/recorder"

	rct "github.com/arsham/expipe/recorder/testing"
	"github.com/arsham/expipe/tools"
)

func recorderWithUrl(url string) recorder.DataRecorder {
	log := tools.DiscardLogger()
	rec, err := rct.New(
		recorder.WithLogger(log),
		recorder.WithEndpoint(url),
		recorder.WithName("recorder_example"),
		recorder.WithIndexName("indexName"),
		recorder.WithTimeout(time.Second),
		recorder.WithBackoff(5),
	)
	if err != nil {
		log.Fatalln("This error should not happen:", err)
	}
	return rec
}

func main() {
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
	ts.Close()
	rec := recorderWithUrl(ts.URL)
	o := &engine.Operator{}
	err := engine.WithRecorders(rec)(o)
	fmt.Println("Error type:", reflect.TypeOf(err))

}
Output:

Error type: engine.PingError

Types

type Engine

type Engine interface {
	fmt.Stringer
	SetCtx(context.Context)
	SetLog(tools.FieldLogger)
	SetRecorders(map[string]recorder.DataRecorder)
	SetReader(reader.DataReader)
	Ctx() context.Context
	Log() tools.FieldLogger
	Recorders() map[string]recorder.DataRecorder
	Reader() reader.DataReader
}

Engine is an interface to Operator's behaviour. This abstraction is very tight on purpose.

func New

func New(options ...func(Engine) error) (Engine, error)

New generates the Engine based on the provided options.

Example

You can pass your configuration.

package main

import (
	"context"
	"fmt"
	"net/http"
	"net/http/httptest"
	"time"

	"github.com/arsham/expipe/engine"
	"github.com/arsham/expipe/reader"

	rdt "github.com/arsham/expipe/reader/testing"
	"github.com/arsham/expipe/recorder"

	rct "github.com/arsham/expipe/recorder/testing"
	"github.com/arsham/expipe/tools"
)

func recorderWithUrl(url string) recorder.DataRecorder {
	log := tools.DiscardLogger()
	rec, err := rct.New(
		recorder.WithLogger(log),
		recorder.WithEndpoint(url),
		recorder.WithName("recorder_example"),
		recorder.WithIndexName("indexName"),
		recorder.WithTimeout(time.Second),
		recorder.WithBackoff(5),
	)
	if err != nil {
		log.Fatalln("This error should not happen:", err)
	}
	return rec
}

func readerWithUrl(url string) reader.DataReader {
	log := tools.DiscardLogger()
	red, err := rdt.New(
		reader.WithLogger(log),
		reader.WithEndpoint(url),
		reader.WithName("reader_example"),
		reader.WithTypeName("typeName"),
		reader.WithInterval(time.Millisecond*100),
		reader.WithTimeout(time.Second),
		reader.WithBackoff(5),
	)
	if err != nil {
		log.Fatalln("This error should not happen:", err)
	}
	return red
}

func main() {
	log := tools.DiscardLogger()
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
	ctx := context.Background()

	rec := recorderWithUrl(ts.URL)
	red := readerWithUrl(ts.URL)

	e, err := engine.New(
		engine.WithCtx(ctx),
		engine.WithLogger(log),
		engine.WithReader(red),
		engine.WithRecorders(rec),
	)
	fmt.Println("Error:", err)
	fmt.Println("Engine is nil:", e == nil)

}
Output:

Error: <nil>
Engine is nil: false
Example (Replaces)

Please note that if you have a duplicate, the last one will replace the old ones.

package main

import (
	"context"
	"fmt"
	"net/http"
	"net/http/httptest"
	"time"

	"github.com/arsham/expipe/engine"
	"github.com/arsham/expipe/reader"

	rdt "github.com/arsham/expipe/reader/testing"
	"github.com/arsham/expipe/recorder"

	rct "github.com/arsham/expipe/recorder/testing"
	"github.com/arsham/expipe/tools"
)

func recorderWithUrl(url string) recorder.DataRecorder {
	log := tools.DiscardLogger()
	rec, err := rct.New(
		recorder.WithLogger(log),
		recorder.WithEndpoint(url),
		recorder.WithName("recorder_example"),
		recorder.WithIndexName("indexName"),
		recorder.WithTimeout(time.Second),
		recorder.WithBackoff(5),
	)
	if err != nil {
		log.Fatalln("This error should not happen:", err)
	}
	return rec
}

func readerWithUrl(url string) reader.DataReader {
	log := tools.DiscardLogger()
	red, err := rdt.New(
		reader.WithLogger(log),
		reader.WithEndpoint(url),
		reader.WithName("reader_example"),
		reader.WithTypeName("typeName"),
		reader.WithInterval(time.Millisecond*100),
		reader.WithTimeout(time.Second),
		reader.WithBackoff(5),
	)
	if err != nil {
		log.Fatalln("This error should not happen:", err)
	}
	return red
}

func main() {
	log := tools.DiscardLogger()
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
	ctx1, cancel := context.WithCancel(context.Background())
	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel()
	defer cancel2()

	rec := recorderWithUrl(ts.URL)
	red := readerWithUrl(ts.URL)

	e, err := engine.New(
		engine.WithCtx(ctx1),
		engine.WithCtx(ctx2),
		engine.WithLogger(log),
		engine.WithReader(red),
		engine.WithRecorders(rec),
	)
	fmt.Println("Error:", err)
	fmt.Println("e.Ctx() == ctx1:", e.Ctx() == ctx1)
	fmt.Println("e.Ctx() == ctx2:", e.Ctx() == ctx2)

}
Output:

Error: <nil>
e.Ctx() == ctx1: false
e.Ctx() == ctx2: true

type JobError

type JobError struct {
	Name string // Name of the operator; reader, recorder.
	ID   token.ID
	Err  error
}

JobError caries an error around in Engine operations.

func (JobError) Error

func (e JobError) Error() string

type Operator added in v0.11.0

type Operator struct {
	// contains filtered or unexported fields
}

Operator represents an Engine that receives information from a reader and ships them to multiple recorders. The Operator is allowed to change the index and type names at will. When the context times out or cancelled, the Engine will close and return. Use the shut down channel to signal the Engine to stop recording. The ctx context will create a new context based on the parent.

func (Operator) Ctx added in v0.11.0

func (o Operator) Ctx() context.Context

Ctx returns the context assigned to this Engine.

func (Operator) Log added in v0.11.0

func (o Operator) Log() tools.FieldLogger

Log returns the logger assigned to this Engine.

func (Operator) Reader added in v0.11.0

func (o Operator) Reader() reader.DataReader

Reader returns the reader.

func (Operator) Recorders added in v0.11.0

func (o Operator) Recorders() map[string]recorder.DataRecorder

Recorders returns the recorder map.

func (*Operator) SetCtx added in v0.11.0

func (o *Operator) SetCtx(ctx context.Context)

SetCtx sets the context of this Engine.

func (*Operator) SetLog added in v0.11.0

func (o *Operator) SetLog(log tools.FieldLogger)

SetLog sets the logger of this Engine.

func (*Operator) SetReader added in v0.11.0

func (o *Operator) SetReader(reader reader.DataReader)

SetReader sets the reader.

func (*Operator) SetRecorders added in v0.11.0

func (o *Operator) SetRecorders(recorders map[string]recorder.DataRecorder)

SetRecorders sets the recorder map.

func (*Operator) String added in v0.11.0

func (o *Operator) String() string

type PingError

type PingError map[string]error

PingError is the error when one of readers/recorder has a ping error.

func (PingError) Error

func (e PingError) Error() string

type Service added in v0.11.0

type Service struct {
	Log       tools.FieldLogger
	Ctx       context.Context
	Conf      *config.ConfMap
	Configure func(...func(Engine) error) (Engine, error)
}

Service initialises Engines. Configure injects the input values into the Operator by calling each function on it.

func (*Service) Start added in v0.11.0

func (s *Service) Start() (chan struct{}, error)

Start creates some Engines and returns a channel that closes it when it's done its work. For each routes, we need one engine that has multiple readers and writes to one recorder. When all recorders of one reader go out of scope, the Engine stops that reader because there is no destination. Each Engine is ran in its own goroutine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL