arithmospora

package module
v0.0.0-...-bc64e80 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2018 License: MIT Imports: 12 Imported by: 0

README

Arithmospora

Arithmospora spreads numbers.

Arithmospora reads sources of data and broadcasts it to connected clients via websockets. It listens for updates to that data so that updates can be broadcast to clients in near realtime.

Its primary use case is for broadcasting live statistics for dynamic events such as online elections.

How it works

Sources consists of collections of data called stats, which are usually grouped together by the type of data held by each stat. Stat data can potentially be sourced from any kind of database, however data loaders have only been implmented for Redis so far. Stats can also listen for changes to their data, which is currently implmented by way of Redis PUB/SUB.

Client handling and messages

Each source is exposed as a websocket endpoint, e.g. a source named election2017 can be accessed via wss://server.hostname:port/election2017. Clients connecting to an endpoint receive JSON message in the following format:

{
  "event": "event:name",
  "payload": {...}
}

On connection, a client is sent an available event message, the payload of which being the groups and names of all the stats available from this source. This allows clients to set up listeners for the stats it is interested in following. After a short delay (100ms by default) the client is then sent data message for all the source's stats. Further data events are then sent as and when each stat updates.

All messages from the client are currently ignored, however in future support may be added for clients to send message in order to only subscribe to the stats they are interested in, rather than simply receiving all stats.

Stat data messages have event names of the form stats:<statGroup>:<statName>, e.g. stats:other:totalvotes. The payload takes the general form:

{
  "name": "<statName>",
  "data": {...}
  "dataPoints":{
    "<dp1>": {...},
    "<dp2>": {...},
    ...
    "<dpN>": {...}
  }
}

Data points are structured in the same way as the parent stat, allowing a stat to include a number of related sets of data. For example, a student election turnout can be broken down by the study type of voters (Undergraduate, Postgraduate Taught, and Postgraduate Research), which would look something like this:

{
  "name": "studytypes",
  "data": {...}
  "dataPoints": {
    "PG": {
      "name": "PG",
      "data": {...},
      "dataPoints": {
        "T": {
          "name": "T"
          "data": {...}
          "dataPoints": {}
        },
        "R":
          "name": "R"
          "data": {...}
          "dataPoints": {}
        }
      }
    },
    "UG": {
      ...
    }
  }
}
Stat types

There are currently five different type of stats supported by Arithmospora: single value, generic, proportion, rolling, and timed. The system can be readily extended to add support for further stat types.

Single value stats

Single value stats are the simplest: they consist of a single datum, such as the total number of votes cast in an election. Their data is encoded as {"<statName>": <value>}. For example:

{
  "name": "totalvotes",
  "data": {
    "totalvotes": 2345
  }
}
Generic stats

Generic stats consist of free-form key/value pairs, straightforwardly encoded directly as a javascript object.

Proportion stats

Proportion stats have the following fields:

  • current
  • total
  • proportion
  • percentage

For example:

{
  "current": 71,
  "total": 200,
  "proportion": 0.355,
  "percentage": 35.5
}
Rolling stats

Rolling stats are similar to proportion stats, but provide their data for a rolling time frame, such as the last five minutes. As such, their current value is expected to go down as well as up. Rolling stats add three extra fields to provide information about the busiest period: peak, peakProportion, and peakPercentage.

Timed stats

This is the most complicated type currently supported. Data consists of key/value pairs with each key corresponding to a time bucket of fixed size. The collection of buckets provide a time series of data, such as number of vote cast in successive five minute periods.

Milestones

Milestones are events which occur when particular conditions are met. For example, a milestone can be configured to generate a message when the number of voters in an election reaches 1000, or the percentage turnout of Postgraduate Taught students reaches 20%.

Milestones are grouped into collections relating to a particular base stat. The milestones themselves can either refer to that stat or a particular data point within that stat. Milestones compare a piece of data from the given stat or datapoint to a target value. The comparison is checked each time the stat updates; when the comparison becomes true for the first time the milestone becomes achieved, and a milestone event is broadcast to clients.

The payload of milestone events is as per the Milestone struct, with the primary field of interest to clients being the message, which can then be shown to users on screen, embedded in a tweet, etc.

Once a milestone has been achieved it is flagged as such so that it does not get resent on a subsequent stat update. All milestones are initially checked on startup to ensure previously met targets are not resent if the program is stopped and later restarted.

Installation and usage

Installation

Arithmospora is written in Go. Install to your go workspace as follows:

go get github.com/icunion/arithmospora/...
Configuration

Arithmospora is configured using a TOML configuration file. A fully annotated sample configuration file is provided in sample.conf

Usage

There are three commands provided. All three commands take -c flag to provide the path to the configuration file, and provide any further options by being invoked with -help. The commands are:

  • arithmospora - this is the main program. When executed it loads the sources specified by the configuration, establish a webserver to serve websockets, and will continue to run until aborted.
  • aslist - loads all stats from a given source and prints them to stdout. Supports printing in a human readable text representation or JSON output, which can optionally be pretty printed.
  • aswatch - prints stats to stdout when they update; continues to run until aborted.

Deployment

The arithmospora command can be run as a daemon under systemd: an example systemd unit is provided which assumes the binary is installed under /opt/arithmospora/bin with a configuration file located at /etc/arithmospora/arithmospora.conf

About

Arithmospora was created by Imperial College Union for its live election statistics to replace a previous Socket.IO system, which worked by polling a data output script and distributing the results to clients. The first use of Arithmospora was the Leadership Elections 2017 Live Statistics.

Copyright (c) 2017-2018 Imperial College Union

License: MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Config tomlConfig

Functions

func ParseConfig

func ParseConfig(configFile string) error

func RedisMakeKey

func RedisMakeKey(elements ...string) string

func RedisPool

func RedisPool() *redis.Pool

func ServeWs

func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request, errors chan<- error)

ServeWs handles websocket requests from the peer.

func SetRedisConfig

func SetRedisConfig(config RedisConfig)

func SetWebSocketConfig

func SetWebSocketConfig(config WebsocketConfig)

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

type DebounceConfig

type DebounceConfig struct {
	MinTimeMs int
	MaxTimeMs int
}

type GenericData

type GenericData struct {
	Data map[string]int
	// contains filtered or unexported fields
}

func (*GenericData) MarshalJSON

func (gd *GenericData) MarshalJSON() ([]byte, error)

func (*GenericData) MilestoneValue

func (gd *GenericData) MilestoneValue(field string) float64

func (*GenericData) Refresh

func (gd *GenericData) Refresh() error

func (*GenericData) String

func (gd *GenericData) String() string

type GenericDataLoader

type GenericDataLoader interface {
	StatDataLoader
	FetchData() (map[string]int, error)
}

type GenericDataLoaderRedis

type GenericDataLoaderRedis struct {
	RedisKeyMaker
}

func (*GenericDataLoaderRedis) FetchData

func (gdl *GenericDataLoaderRedis) FetchData() (map[string]int, error)

func (*GenericDataLoaderRedis) Load

func (gdl *GenericDataLoaderRedis) Load(*Stat) (StatData, error)

type HttpConfig

type HttpConfig struct {
	Address string
}

type HttpsConfig

type HttpsConfig struct {
	Address string
	Cert    string
	Key     string
}

type Hub

type Hub struct {
	Broadcast chan []byte
	// contains filtered or unexported fields
}

Hub: as per Gorilla chat example

func NewHub

func NewHub(source *Source) *Hub

func (*Hub) ClientCount

func (h *Hub) ClientCount() int

func (*Hub) Run

func (h *Hub) Run()

type Message

type Message struct {
	Event   string      `json:"event"`
	Payload interface{} `json:"payload"`
}

type Milestone

type Milestone struct {
	sync.Mutex   `json:"-"`
	Name         string    `json:"name"`
	DataPoints   []string  `json:"dataPointNames"`
	Field        string    `json:"field"`
	Target       float64   `json:"target"`
	Comparator   string    `json:"comparator"`
	Message      string    `json:"message"`
	Achieved     bool      `json:"-"`
	AchievedWhen time.Time `json:"achievedWhen"`
}

func (*Milestone) NewlyMet

func (m *Milestone) NewlyMet(stat *Stat) bool

func (*Milestone) String

func (m *Milestone) String() string

type MilestoneCollection

type MilestoneCollection struct {
	Name       string
	Stat       *Stat
	Milestones []*Milestone
}

func (*MilestoneCollection) Publish

func (mc *MilestoneCollection) Publish(achieved chan<- *Milestone)

type MilestoneConfig

type MilestoneConfig struct {
	Name       string
	Group      string
	Stat       string
	Milestones []*Milestone
}

type MilestoneValuer

type MilestoneValuer interface {
	MilestoneValue(string) float64
}

type Period

type Period struct {
	Granularity int64
	Cycles      int64
	BucketKeys  []int64
	Buckets     map[int64]int
}

type ProportionData

type ProportionData struct {
	Current int
	Total   int
	// contains filtered or unexported fields
}

func (*ProportionData) MarshalJSON

func (pd *ProportionData) MarshalJSON() ([]byte, error)

func (*ProportionData) MilestoneValue

func (pd *ProportionData) MilestoneValue(field string) float64

func (*ProportionData) Percentage

func (pd *ProportionData) Percentage() float64

func (*ProportionData) Proportion

func (pd *ProportionData) Proportion() float64

func (*ProportionData) Refresh

func (pd *ProportionData) Refresh() error

func (*ProportionData) String

func (pd *ProportionData) String() string

type ProportionDataLoader

type ProportionDataLoader interface {
	StatDataLoader
	FetchData() ([]int, error)
}

type ProportionDataLoaderRedis

type ProportionDataLoaderRedis struct {
	RedisKeyMaker
}

func (*ProportionDataLoaderRedis) FetchData

func (pdl *ProportionDataLoaderRedis) FetchData() ([]int, error)

func (*ProportionDataLoaderRedis) Load

func (*ProportionDataLoaderRedis) String

func (pdl *ProportionDataLoaderRedis) String() string

type RedisConfig

type RedisConfig struct {
	Server      string
	Password    string
	DB          int
	MaxIdle     int
	IdleTimeout int
}

type RedisDataLoader

type RedisDataLoader interface {
	StatDataLoader
	SetRedisPrefix(string)
}

type RedisDataPointLoader

type RedisDataPointLoader struct {
	RedisKeyMaker
}

func (*RedisDataPointLoader) DataPointNames

func (rdpl *RedisDataPointLoader) DataPointNames() ([]string, error)

func (*RedisDataPointLoader) NewDataLoader

func (rdpl *RedisDataPointLoader) NewDataLoader(sdl StatDataLoader, dpName string) StatDataLoader

func (*RedisDataPointLoader) NewDataPointLoader

func (rdpl *RedisDataPointLoader) NewDataPointLoader(dpName string) StatDataPointLoader

type RedisKeyMaker

type RedisKeyMaker struct {
	RedisPrefix string
}

func (*RedisKeyMaker) MakeKey

func (rkb *RedisKeyMaker) MakeKey(suffices ...string) string

func (*RedisKeyMaker) SetRedisPrefix

func (rkb *RedisKeyMaker) SetRedisPrefix(newPrefix string)

func (*RedisKeyMaker) String

func (rkb *RedisKeyMaker) String() string

type RedisUpdateListener

type RedisUpdateListener struct {
	RedisKeyMaker
}

func (*RedisUpdateListener) Subscribe

func (rul *RedisUpdateListener) Subscribe(updated chan<- bool)

type RollingData

type RollingData struct {
	ProportionData
	Peak int
	// contains filtered or unexported fields
}

func (*RollingData) MarshalJSON

func (rd *RollingData) MarshalJSON() ([]byte, error)

func (*RollingData) MilestoneValue

func (rd *RollingData) MilestoneValue(field string) float64

func (*RollingData) PeakPercentage

func (rd *RollingData) PeakPercentage() float64

func (*RollingData) PeakProportion

func (rd *RollingData) PeakProportion() float64

func (*RollingData) Refresh

func (rd *RollingData) Refresh() error

func (*RollingData) String

func (rd *RollingData) String() string

type RollingDataLoader

type RollingDataLoader interface {
	StatDataLoader
	FetchData() ([]int, error)
}

type RollingDataLoaderRedis

type RollingDataLoaderRedis struct {
	RedisKeyMaker
}

func (*RollingDataLoaderRedis) FetchData

func (rdl *RollingDataLoaderRedis) FetchData() ([]int, error)

func (*RollingDataLoaderRedis) Load

func (rdl *RollingDataLoaderRedis) Load(*Stat) (StatData, error)

func (*RollingDataLoaderRedis) String

func (rdl *RollingDataLoaderRedis) String() string

type SingleValueData

type SingleValueData struct {
	Name string
	Data int
	// contains filtered or unexported fields
}

func (*SingleValueData) MarshalJSON

func (svd *SingleValueData) MarshalJSON() ([]byte, error)

func (*SingleValueData) MilestoneValue

func (svd *SingleValueData) MilestoneValue(string) float64

func (*SingleValueData) Refresh

func (svd *SingleValueData) Refresh() error

func (*SingleValueData) String

func (svd *SingleValueData) String() string

type SingleValueDataLoader

type SingleValueDataLoader interface {
	StatDataLoader
	FetchData() (int, error)
}

type SingleValueDataLoaderRedis

type SingleValueDataLoaderRedis struct {
	RedisKeyMaker
}

func (*SingleValueDataLoaderRedis) FetchData

func (svdl *SingleValueDataLoaderRedis) FetchData() (int, error)

func (*SingleValueDataLoaderRedis) Load

func (svdl *SingleValueDataLoaderRedis) Load(stat *Stat) (StatData, error)

type Source

type Source struct {
	Name       string
	IsLive     bool
	Available  map[string][]string
	Stats      map[string]map[string]*Stat
	Milestones []*MilestoneCollection
	// contains filtered or unexported fields
}

func MakeSourcesFromConfig

func MakeSourcesFromConfig(config tomlConfig) (sources []*Source)

func (*Source) IncrementMilestonesCounter

func (s *Source) IncrementMilestonesCounter()

func (*Source) IncrementUpdatesCounter

func (s *Source) IncrementUpdatesCounter()

func (*Source) PopMilestonesCounter

func (s *Source) PopMilestonesCounter() (poppedCount int)

func (*Source) PopUpdatesCounter

func (s *Source) PopUpdatesCounter() (poppedCount int)

func (*Source) Publish

func (s *Source) Publish(hub *Hub, errors chan<- error) error

func (*Source) RefreshAll

func (s *Source) RefreshAll(errors chan<- error)

func (*Source) SendInitialDataTo

func (s *Source) SendInitialDataTo(client *Client) error

type SourceConfig

type SourceConfig struct {
	Name             string
	RedisPrefix      string
	StartTime        time.Time
	EndTime          time.Time
	IsLive           bool
	TimedStatPeriods []Period
	Stats            StatGroupConfig
	Milestones       []MilestoneConfig
}

type Stat

type Stat struct {
	sync.Mutex
	Name            string
	Depth           int
	DataLoader      StatDataLoader
	DataPointLoader StatDataPointLoader
	UpdateListener  StatUpdateListener
	// contains filtered or unexported fields
}

func MakeStatFromConfig

func MakeStatFromConfig(sourceConfig SourceConfig, statConfig StatConfig) *Stat

func (*Stat) ListenForUpdates

func (s *Stat) ListenForUpdates(min time.Duration, max time.Duration, errors chan<- error) error

func (*Stat) Load

func (s *Stat) Load() (err error)

func (*Stat) MarshalJSON

func (s *Stat) MarshalJSON() ([]byte, error)

func (*Stat) NotifyListeners

func (s *Stat) NotifyListeners()

func (*Stat) Refresh

func (s *Stat) Refresh() error

func (*Stat) RefreshData

func (s *Stat) RefreshData() error

func (*Stat) RefreshDataPoints

func (s *Stat) RefreshDataPoints() error

func (*Stat) RegisterListener

func (s *Stat) RegisterListener(listener chan<- bool)

func (*Stat) Reload

func (s *Stat) Reload() error

func (*Stat) Reset

func (s *Stat) Reset()

func (*Stat) String

func (s *Stat) String() string

type StatConfig

type StatConfig struct {
	Name       string
	Period     string
	DataType   string
	LoaderType string
}

type StatData

type StatData interface {
	Refresh() error
	json.Marshaler
	fmt.Stringer
}

type StatDataLoader

type StatDataLoader interface {
	Load(*Stat) (StatData, error)
	fmt.Stringer
}

type StatDataPointLoader

type StatDataPointLoader interface {
	DataPointNames() ([]string, error)
	NewDataLoader(StatDataLoader, string) StatDataLoader
	NewDataPointLoader(string) StatDataPointLoader
}

type StatGroupConfig

type StatGroupConfig struct {
	Proportion []StatConfig
	Rolling    []StatConfig
	Timed      []StatConfig
	Other      []StatConfig
}

type StatUpdateListener

type StatUpdateListener interface {
	Subscribe(chan<- bool)
}

type TimedData

type TimedData struct {
	StartTime time.Time
	EndTime   time.Time
	Period    *Period
	// contains filtered or unexported fields
}

func (*TimedData) MarshalJSON

func (td *TimedData) MarshalJSON() ([]byte, error)

func (*TimedData) Refresh

func (td *TimedData) Refresh() error

func (*TimedData) String

func (td *TimedData) String() string

type TimedDataLoader

type TimedDataLoader interface {
	StatDataLoader
	FetchBucket(int64) (int, error)
}

type TimedDataLoaderRedis

type TimedDataLoaderRedis struct {
	RedisKeyMaker
	StartTime time.Time
	EndTime   time.Time
	Periods   []Period
}

func (*TimedDataLoaderRedis) FetchBucket

func (tdl *TimedDataLoaderRedis) FetchBucket(bucket int64) (int, error)

func (*TimedDataLoaderRedis) Load

func (tdl *TimedDataLoaderRedis) Load(stat *Stat) (StatData, error)

type TimedDataPointLoaderRedis

type TimedDataPointLoaderRedis struct {
	RedisKeyMaker
	Periods []Period
}

func (*TimedDataPointLoaderRedis) DataPointNames

func (tdplr *TimedDataPointLoaderRedis) DataPointNames() (dpNames []string, err error)

func (*TimedDataPointLoaderRedis) NewDataLoader

func (tdplr *TimedDataPointLoaderRedis) NewDataLoader(sdl StatDataLoader, dpName string) StatDataLoader

func (*TimedDataPointLoaderRedis) NewDataPointLoader

func (tdplr *TimedDataPointLoaderRedis) NewDataPointLoader(dpName string) StatDataPointLoader

type WebsocketConfig

type WebsocketConfig struct {
	WriteWait  int
	PongWait   int
	PingPeriod int
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL