streamtester

package module
v0.0.0-...-994c7f3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2023 License: BSD-3-Clause Imports: 13 Imported by: 0

README

StreamTester Trigger

This trigger gives you the ability to test your stream application using mock data provided in a csv file.

Installation

flogo install github.com/iosif02/stream/trigger/streamtester
Configuration
Settings:
Name Type Description
port int The port for the control api to listen on - REQUIRED
Handler Settings:
Setting Type Description
filePath string Path to a CSV file (local file path or url) - REQUIRED
emitDelay int The delay between data emission in milliseconds, the default is 100ms (min is 5ms)
replayData bool Continuously replay the data set (default is true)
dataAsMap bool Convert the data to a Map, with column names as keys
getColumnNames bool Get all the column names
allDataAtOnce bool Indicates that the data be sent all at once, otherwise one row at a time
Output:
Name Type Description
columnNames array The array of column names if getColumnNames was enabled
data any The data that is being emitted from the CSV (either a row or the entire set)
Trigger Control API

The tester can be controlled using a REST API.

Method Resource Description
POST /tester/start Starts all data emission
POST /tester/stop Stops all data emission
POST /tester/pause Pauses all data emission
POST /tester/resume Resumes all data emission
POST /tester/reload Reloads the data from all csv files
Fine Grained Control

If a handler name has been specified, the name can be control the data mission for that specified handler.

Method Resource Description
POST /tester/start/:handlerName Starts data emission for the specified handler
POST /tester/stop/:handlerName Stops data emission for the specified handler
POST /tester/pause/:handlerName Pauses data emission for the specified handler
POST /tester/resume/:handlerName Resumes data emission for the specified handler
POST /tester/reload/:handlerName Reloads the data from the csv for the specified handler

Examples

Simple

Configure the trigger to emit data from the csv file every 50 milliseconds.

{
  "triggers": [
    {
      "id": "stream-tester",
      "ref": "github.com/iosif02/stream/trigger/streamtester",
      "handlers": [
        {
          "settings": {
            "filePath": "out.csv",
            "dataAsMap": true,
            "emitDelay": 50
          },
          "action": {
            "ref": "github.com/iosif02/stream",
            "settings": {
              "streamURI": "res://stream:mystream"
            }
          }
        }
      ]
    }
  ]
}

Documentation

Index

Constants

View Source
const (
	Resume = iota + 1
	Pause
	Start
	Stop
	Reload
	Kill
)

Variables

This section is empty.

Functions

func ReadCSV

func ReadCSV(path string) ([][]string, error)

Types

type DataSet

type DataSet interface {
	ColumnNames() []interface{}
	DataPoint() (interface{}, bool)
	Reset()
	Reload() error
}

func NewDataSet

func NewDataSet(csvPath string, replay bool, dataAsMap, allAtOnce bool) (DataSet, error)

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

func NewEmitter

func NewEmitter(logger log.Logger, handler trigger.Handler) (*Emitter, error)

func (*Emitter) Kill

func (e *Emitter) Kill()

func (*Emitter) Name

func (e *Emitter) Name() string

func (*Emitter) Pause

func (e *Emitter) Pause()

func (*Emitter) Reload

func (e *Emitter) Reload()

func (*Emitter) Resume

func (e *Emitter) Resume()

func (*Emitter) Run

func (e *Emitter) Run()

func (*Emitter) Start

func (e *Emitter) Start()

func (*Emitter) Stop

func (e *Emitter) Stop()

type Factory

type Factory struct {
}

func (*Factory) Metadata

func (*Factory) Metadata() *trigger.Metadata

Metadata implements trigger.Factory.Metadata

func (*Factory) New

func (*Factory) New(config *trigger.Config) (trigger.Trigger, error)

New implements trigger.Factory.New

type HandlerSettings

type HandlerSettings struct {
	FilePath       string `md:"filePath,required"`
	EmitDelay      int    `md:"emitDelay"`
	DataAsMap      bool   `md:"dataAsMap"`
	ReplayData     bool   `md:"replayData"`
	GetColumnNames bool   `md:"getColumnNames"`
	AllDataAtOnce  bool   `md:"allDataAtOnce"`
}

type Output

type Output struct {
	Data        interface{}   `md:"data"`
	ColumnNames []interface{} `md:"columnNames"`
}

func (*Output) FromMap

func (o *Output) FromMap(values map[string]interface{}) error

func (*Output) ToMap

func (o *Output) ToMap() map[string]interface{}

type Settings

type Settings struct {
	Port string `md:"port,required"`
}

type Trigger

type Trigger struct {
	// contains filtered or unexported fields
}

func (*Trigger) Initialize

func (t *Trigger) Initialize(ctx trigger.InitContext) error

Init implements trigger.Init

func (*Trigger) Start

func (t *Trigger) Start() error

Start implements ext.Trigger.Start

func (*Trigger) Stop

func (t *Trigger) Stop() error

Stop implements ext.Trigger.Stop

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL