gopeat

package module
v0.0.0-...-66232a0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2019 License: GPL-3.0 Imports: 8 Imported by: 0

README

go-peat : A Golang library for replaying time-stamped data

Overview GoDoc Build Status Go Report Card

Motivation and Example Uses

I've needed common replay capability in several projects. Gopeat(re-peat idk...) was created as a general replay library(DRY) designed to be used with a time-stamped sorted data stream that can be represented as a golang struct with a time.Time getter.

Some Examples:

Equities Market: historic tick data chart replay for trader training

Live data stream proxy for development-testing of soft real-time trading systems, live dash boards and live GIS gps tracking data feeds.

Distributed Interactive Simulation(DIS): After Action Review(AAR) GIS overlay replay of DIS packets recorded during training and experimentation with Soldiers using new equipment and tactics.

Tech details

Internally gopeat runs three goroutines:

  • Controller: responsible for starting the data loader, starting the timed data writer, and responding to Api control messages like Quit(), Pause() Resume().

  • Data Loader: a buffered read-ahead data stream fetcher that loads the source playback data

  • Data Timer: provides the timestamped data at the proper replay time accounting for the sim rate

The callback model seemed to offer the most flexibility without exposing channels and WaitGroups(Avoid concurrency in your API - Go Best Practices )

A back-pressure adjustment internally monitors the amount of time the client callback spends before returning and dynamically makes timing adjustments. For example, if callbacks are taking 15 milliseconds callbacks will be executed 15 milliseconds earlier than the exact replay time derived from the time-stamped data in an attempt to "lead" the client.

The client should return from the callback as fast as possible to maximize replay time accuracy. In some situations the client may not be able to keep up and might need to implement a strategy like queuing up data or dropping data.

Usage Notes

In order to utilize, the client should:

*Create a time stamp source struct that implements the gopeat.TimeStamper(name inspired by golang fmt.Stringer)

*Create a time stamper data source that implements gopeat.TimeBracket interface(sets replay start and end time) and implements gopeat.TimeStampSource.Next() iterator like interface that provides the "TimeStamper" structs from the above step. gopeat.CsvTsSource is a provided stamper data source for data stored in Csv format. Use that directly or view the source to get an idea of how to implement your own source.

*Create a callback func that matches the gopeat.OnTsDataReady func type.

3 steps(2.5 if gopeat.CsvTsSource is used) to get going

Install

go get github.com/michelpmcdonald/go-peat

Example

// A self contained simple demo for go-peat
package main

import (
	"fmt"
	"strconv"
	"strings"
	"time"

	"github.com/michelpmcdonald/go-peat"
)

// TsRec - any old struct will work as long
// as it has a time to use in playback
type tsRec struct {
	tsWeirdName time.Time
	amt         float64
}

// GetTimeStamp - your struct on gopeat
func (ts tsRec) GetTimeStamp() time.Time {
	return ts.tsWeirdName
}

// Now some data is needed, typically this would
// be from a csv file, but whatever suits ya
var tsCsv = `weird_name, amt
	     09/01/2013 17:00:00.083 UTC, 12
	     09/01/2013 17:00:00.088 UTC, 55
	     09/01/2013 17:00:00.503 UTC, 54
	     09/01/2013 17:00:03.201 UTC, 54`

// Define func to convert a csv line slice to our struct
func csvToTsRec(csv []string) (gopeat.TimeStamper, error) {
	tim, _ := time.Parse("01/02/2006 15:04:05.999 MST",
		strings.TrimSpace(csv[0]))
	amt, _ := strconv.ParseFloat(strings.TrimSpace(csv[1]), 64)
	return tsRec{tsWeirdName: tim, amt: amt}, nil
}

func main() {

	// A data source is needed. Since our demo data is csv, use
	// gopeats csv time stamper source and provide the data and
	// the csv line converter. It implements the needed interfaces.
	// Create your own source as needed, just implement
	// TimeBracket and TimeStampSource interfaces
	tsSource := &gopeat.CsvTsSource{
		Symbol:    "MES",
		CsvStream: strings.NewReader(tsCsv),
		CsvTsConv: csvToTsRec,
	}

	// PlayBack controls the sim run
	var sim *gopeat.PlayBack

	// Create a callback to handle data, called when the simulation time
	// reaches the data's timestamp, so it's in sim soft real time
	recCnt := 0
	dataOut := func(ts gopeat.TimeStamper) error {
		recCnt++
		if recCnt > 2 {
			// Stop the sim early, skip last record
			sim.Quit()
		}
		tsd := ts.(tsRec)
		fmt.Printf("Data Time: %v. Data Amt: %f\n",
			tsd.GetTimeStamp(),
			tsd.amt)
		return nil
	}

	//Create a playback
	sim, _ = gopeat.New(
		"MES",
		time.Date(2013, 9, 1, 17, 0, 0, 0, time.UTC), //Sim start time
		time.Date(2013, 9, 1, 17, 4, 0, 0, time.UTC), //Sim end time
		tsSource,
		2,       //Sim rate
		dataOut) //Call back

	// Start the replay
	sim.Play()

	// Block until done
	sim.Wait()
}


Author

Michel McDonald

Documentation

Overview

Package gopeat provides functionality to replay time stamped data with BESRTA(best effort soft real time accuracy). Playback preserves time between consecutive time stamped data values so any time drifts will be accumulated over the total run time.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CsvToTs

type CsvToTs func([]string) (TimeStamper, error)

CsvToTs converts a csv line slice to a TimeStamper value

type CsvTsSource

type CsvTsSource struct {
	Symbol    string
	CsvStream io.Reader
	CsvTsConv CsvToTs

	MaxRecs int64
	// contains filtered or unexported fields
}

CsvTsSource implement a time stamped data source for csv data(with header). Client must provide CsvToTs to convert csv data to timestamper value

func (*CsvTsSource) Next

func (st *CsvTsSource) Next() (TimeStamper, bool)

Next implements an iterator for the contents of the csv data

func (*CsvTsSource) SetEndTime

func (st *CsvTsSource) SetEndTime(endTime time.Time)

SetEndTime sets max timpstamp for data provided

func (*CsvTsSource) SetStartTime

func (st *CsvTsSource) SetStartTime(startTime time.Time)

SetStartTime sets min timpstamp for data provided

type OnTsDataReady

type OnTsDataReady func(TimeStamper) error

OnTsDataReady is the function the Playback client should provide to the playback to receive the time stamped data at simulation time. The client implementation should return as soon as the time sensitive part of it's processing is complete in order to keep Playback's internal backpressure calculation accurate. OnTsDataReady runs on Playback's send thread, not the clients thread

type PlayBack

type PlayBack struct {
	Symbol       string
	StartTime    time.Time
	EndTime      time.Time
	SendTs       OnTsDataReady
	TsDataSource TimeStampSource
	WallRunDur   time.Duration

	WallStartTime time.Time
	// contains filtered or unexported fields
}

PlayBack implements a simulation run. Playback clients need to provide a data source that implement both TimeBracket and TimeStampSource interfaces. Clients can stop the playback by closing StopChan.

func New

func New(symbol string,
	startTime time.Time, endTime time.Time,
	tsSource TimeStampSource,
	pbRate uint16,
	cb OnTsDataReady) (*PlayBack, error)

New allocates a new Playback struct

func (*PlayBack) Pause

func (pb *PlayBack) Pause()

Pause suspends the running replay

func (*PlayBack) Play

func (pb *PlayBack) Play()

Play starts replay process

func (*PlayBack) Quit

func (pb *PlayBack) Quit()

Quit stops the running PlayBack and eventually unblocks callers blocked on Wait()

func (*PlayBack) Resume

func (pb *PlayBack) Resume()

Resume continues a paused playback

func (*PlayBack) SetRate

func (pb *PlayBack) SetRate(rate uint16) error

SetRate controls the realtime rate of the playback.

func (*PlayBack) TimeDrift

func (pb *PlayBack) TimeDrift()

TimeDrift calculates some run time timing info

func (*PlayBack) Wait

func (pb *PlayBack) Wait()

Wait blocks until the controller shuts down or client calls Quit

type TimeBracket

type TimeBracket interface {
	SetStartTime(startTime time.Time)
	SetEndTime(startTime time.Time)
}

TimeBracket is implemented by any value that has SetStartTime and a SetEndTime methods, which defines the time bracket the value falls in. Playback uses the interface to notify its time series data source to limit data to the given time bracket

type TimeStampSource

type TimeStampSource interface {
	Next() (tsData TimeStamper, ok bool)
}

TimeStampSource is implemented by any value that has a Next iterator method which returns TimeStamper values. When ok is false iterator is past the last value and the previous Next call returned the last value. The way playback is currently designed, a implementor of TimeStampSource should have a complete stream of data available so next can either return the next value or return EOF. If Next() blocks, the loader goroutine will block and it will never terminate on it's own. This design will be revisited.(TODO)

type TimeStamper

type TimeStamper interface {
	GetTimeStamp() time.Time
}

TimeStamper is implemented by any value that has a GetTimeStamp method, which defines a time stamp for that value. The GetTimeStamp method is used by Playback as the point in time to provide the value to Playback clients

Directories

Path Synopsis
examples
demo
A self contained simple demo for go-peat
A self contained simple demo for go-peat
market_websocket
Package main creates a simple playback webserver.
Package main creates a simple playback webserver.
simple
Very simple example of how create a gopeat Playback simulation run.
Very simple example of how create a gopeat Playback simulation run.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL