dagger

package
v0.0.0-...-9bb8190 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2021 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MovingLimitRead

func MovingLimitRead(p StreamBuffer, streamID StreamID, bufName string, from <-chan Timestamp, to <-chan Timestamp, recs chan<- *Record, errc chan error)

MovingLimitRead performs asynchronous reads of the stream buffer. While the read is in progress, the from and to limits can be updated multiple times and the latest values will be used in the next read when the first completes.

func ParseComputationID

func ParseComputationID(s StreamID) (string, string, error)

ParseComputationID parses a computation definition

func ProcessMultipleProcessors

func ProcessMultipleProcessors(procs []RecordProcessor, t *Record) error

ProcessMultipleProcessors processes a single record with multiple record processors

func ProcessMultipleRecords

func ProcessMultipleRecords(tp RecordProcessor, records []*Record) error

ProcessMultipleRecords processes multiple records with a single record processor

Types

type BufferedDispatcher

type BufferedDispatcher struct {
	// contains filtered or unexported fields
}

BufferedDispatcher bufferes produced records and sends them with retrying until they are ACKed

func StartBufferedDispatcher

func StartBufferedDispatcher(compID StreamID, dispatcher RecordProcessor, sentTracker SentTracker, lwmTracker LWMTracker,
	stopCh chan struct{}) *BufferedDispatcher

StartBufferedDispatcher creates a new buffered dispatcher and starts workers that will be consuming off the queue an sending records

func (*BufferedDispatcher) ProcessRecord

func (bd *BufferedDispatcher) ProcessRecord(t *Record) error

ProcessRecord sends the record to the buffered channel

func (*BufferedDispatcher) Stop

func (bd *BufferedDispatcher) Stop()

Stop blocks until all buffered records are sent

type ComputationPlugin

type ComputationPlugin interface {
	GetInfo(definition string) (*ComputationPluginInfo, error)
	SubmitRecord(t *Record) (*ComputationPluginResponse, error)
	GetSnapshot() ([]byte, error)
	ApplySnapshot([]byte) error
	Stop() error
}

ComputationPlugin handles the running and interacting with a computation plugin process

func StartComputationPlugin

func StartComputationPlugin(name string, compID StreamID) (ComputationPlugin, error)

StartComputationPlugin starts the plugin process

type ComputationPluginInfo

type ComputationPluginInfo struct {
	Inputs   []StreamID
	Stateful bool
}

ComputationPluginInfo contains information about this computation plugin

type ComputationPluginResponse

type ComputationPluginResponse struct {
	Records []*Record
}

ComputationPluginResponse is returned from a computation plugin to the main app

type ComputationPluginState

type ComputationPluginState struct {
	State []byte
}

ComputationPluginState is state returned from a computation plugin to the main app

type Config

type Config struct {
	SubscribersTTL time.Duration
	LevelDBFile    string
}

Config is used to configure the node

func DefaultConfig

func DefaultConfig(c *cli.Context) *Config

DefaultConfig provides default config values

type ConsulConfig

type ConsulConfig struct {
	Address   string
	TTL       string
	LockDelay time.Duration
}

ConsulConfig is used to configure the Consul coordinator

type Coordinator

type Coordinator interface {
	// GetSubscribers(string) ([]string, error)
	// GetConfig(string) (string, error)
	SubscribeCoordinator
	PublishCoordinator
	TaskCoordinator
	ReplicationCoordinator
	RegisterSession() (string, error)
	RenewSession(string) error
	Start(net.Addr, chan error) error
	Stop()
}

Coordinator coordinates topics, their publishers and subscribers

func NewConsulCoordinator

func NewConsulCoordinator(customizeConfig func(*ConsulConfig)) Coordinator

NewConsulCoordinator creates a new instance of Consul coordinator

type Deduplicator

type Deduplicator interface {
	Seen(t *Record) (bool, error)
}

Deduplicator throws away duplicate records (and ACKs their senders)

func NewDeduplicator

func NewDeduplicator(computation StreamID, recordTracker ReceivedTracker) (Deduplicator, error)

NewDeduplicator initializes a new deduplicator

type Dispatcher

type Dispatcher struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Dispatcher dispatches records to registered subscribers

func NewDispatcher

func NewDispatcher(coordinator Coordinator) *Dispatcher

NewDispatcher creates a new dispatcher

func (*Dispatcher) ProcessRecord

func (d *Dispatcher) ProcessRecord(t *Record) error

ProcessRecord sends record to registered subscribers via RPC

type DispatcherConfig

type DispatcherConfig struct {
	PipeliningLimit int
}

DispatcherConfig configures the dispatcher

type GroupHandler

type GroupHandler interface {
	GetStatus() (bool, string, error)
}

GroupHandler handles leadership status of a group

type HttpAPI

type HttpAPI struct {
	// contains filtered or unexported fields
}

func NewHttpAPI

func NewHttpAPI(coordinator Coordinator, receiver Receiver, dispatcher *Dispatcher) HttpAPI

func (HttpAPI) Serve

func (api HttpAPI) Serve()

type HttpAPIConfig

type HttpAPIConfig struct {
	Port string
}

HttpAPIConfig configures the HTTP API

type InputManager

type InputManager interface {
	SubscribeTo(StreamID, Timestamp, RecordProcessor)
	UnsubscribeFrom(StreamID, RecordProcessor)
	SetTaskManager(TaskManager)
}

InputManager manages tasks' subscriptions to streams

type LWMTracker

type LWMTracker interface {
	RecordProcessor
	SentTracker
	BeforeDispatching([]*Record)
	GetCombinedLWM() Timestamp
	GetLocalLWM() Timestamp
	GetUpstreamLWM() Timestamp
}

func NewLWMTracker

func NewLWMTracker() LWMTracker

type LevelDBPersister

type LevelDBPersister struct {
	// contains filtered or unexported fields
}

LevelDBPersister is built on top of LevelDB

func (*LevelDBPersister) ApplySnapshot

func (p *LevelDBPersister) ApplySnapshot(compID StreamID, snapshot []byte) error

ApplySnapshot applies the snapshot of the computation's persisted state

func (*LevelDBPersister) Close

func (p *LevelDBPersister) Close()

Close the persister

func (*LevelDBPersister) CommitComputation

func (p *LevelDBPersister) CommitComputation(compID StreamID, in *Record, out []*Record) error

CommitComputation persists information about received and produced records atomically

func (*LevelDBPersister) GetLastTimestamp

func (p *LevelDBPersister) GetLastTimestamp(compID StreamID) (Timestamp, error)

GetLastTimestamp returns the timestamp of the latest record processed by this task

func (*LevelDBPersister) GetRecentReceived

func (p *LevelDBPersister) GetRecentReceived(comp StreamID) ([]string, error)

GetRecentReceived returns IDs of records we have recently received

func (*LevelDBPersister) GetSnapshot

func (p *LevelDBPersister) GetSnapshot(compID StreamID) ([]byte, error)

GetSnapshot returns the snapshot of the computation's persisted state

func (*LevelDBPersister) Insert

func (p *LevelDBPersister) Insert(compID StreamID, bufID string, t *Record) error

Insert inserts a received record into the ordered queue for a computation

func (*LevelDBPersister) PersistReceivedRecords

func (p *LevelDBPersister) PersistReceivedRecords(comp StreamID, records []*Record) error

PersistReceivedRecords save the info about which records we've already seen

func (*LevelDBPersister) ReadBuffer

func (p *LevelDBPersister) ReadBuffer(compID StreamID, bufID string,
	from Timestamp, to Timestamp, recCh chan<- *Record, errc chan<- error,
	readCompletedCh chan struct{})

ReadBuffer returns a piece of the input buffer between specified timestamps

func (*LevelDBPersister) ReceivedAlready

func (p *LevelDBPersister) ReceivedAlready(comp StreamID, t *Record) (bool, error)

ReceivedAlready returns whether we've seen this record before

func (*LevelDBPersister) SentSuccessfuly

func (p *LevelDBPersister) SentSuccessfuly(compID StreamID, t *Record) error

SentSuccessfuly deletes the production from the DB after it's been ACKed

type LinearizedRecordProcessor

type LinearizedRecordProcessor interface {
	ProcessRecordLinearized(*Record) error
}

LinearizedRecordProcessor is an object capable of processing a list of ordered records

type Linearizer

type Linearizer struct {
	LWM Timestamp
	// contains filtered or unexported fields
}

Linearizer buffers records and forwards them to the next RecordProcessor sorted by timestamp, while making sure all the records in a certain time frame have already arrived

func NewLinearizer

func NewLinearizer(compID StreamID, store StreamBuffer, lwmTracker LWMTracker) *Linearizer

NewLinearizer creates a new linearizer for a certain computation

func (*Linearizer) ProcessRecord

func (l *Linearizer) ProcessRecord(t *Record) error

ProcessRecord inserts the record into the buffer sorted by timestamps

func (*Linearizer) Run

func (l *Linearizer) Run(errc chan error)

Run forwards the records from the buffer to the next RecordProcessor when LWM tells us no more records will arrive in the forwarded time frame

func (*Linearizer) SetProcessor

func (l *Linearizer) SetProcessor(ltp LinearizedRecordProcessor)

SetProcessor sets the processor that received linearized tuiples

func (*Linearizer) SetStartLWM

func (l *Linearizer) SetStartLWM(time Timestamp)

SetStartLWM sets the LWM at which we start processing

func (*Linearizer) Stop

func (l *Linearizer) Stop()

Stop stops the linearizer

type MockCoordinator

type MockCoordinator struct {
	// contains filtered or unexported fields
}

Mock of Coordinator interface

func NewMockCoordinator

func NewMockCoordinator(ctrl *gomock.Controller) *MockCoordinator

func (*MockCoordinator) AcquireTask

func (_m *MockCoordinator) AcquireTask(_param0 StreamID) (bool, error)

func (*MockCoordinator) CheckpointPosition

func (_m *MockCoordinator) CheckpointPosition(_param0 StreamID, _param1 Timestamp) error

func (*MockCoordinator) DeregisterAsPublisher

func (_m *MockCoordinator) DeregisterAsPublisher(_param0 StreamID) error

func (*MockCoordinator) EXPECT

func (_m *MockCoordinator) EXPECT() *_MockCoordinatorRecorder

func (*MockCoordinator) EnsurePublisherNum

func (_m *MockCoordinator) EnsurePublisherNum(_param0 StreamID, _param1 int, _param2 chan struct{}) chan error

func (*MockCoordinator) GetSubscriberPosition

func (_m *MockCoordinator) GetSubscriberPosition(_param0 StreamID, _param1 string) (Timestamp, error)

func (*MockCoordinator) GetSubscribers

func (_m *MockCoordinator) GetSubscribers(_param0 StreamID) ([]string, error)

func (*MockCoordinator) JoinGroup

func (_m *MockCoordinator) JoinGroup(_param0 StreamID) (GroupHandler, error)

func (*MockCoordinator) RegisterAsPublisher

func (_m *MockCoordinator) RegisterAsPublisher(_param0 StreamID) error

func (*MockCoordinator) RegisterAsPublisherWithSession

func (_m *MockCoordinator) RegisterAsPublisherWithSession(_param0 string, _param1 StreamID) error

func (*MockCoordinator) RegisterSession

func (_m *MockCoordinator) RegisterSession() (string, error)

func (*MockCoordinator) ReleaseTask

func (_m *MockCoordinator) ReleaseTask(_param0 StreamID) (bool, error)

func (*MockCoordinator) RenewSession

func (_m *MockCoordinator) RenewSession(_param0 string) error

func (*MockCoordinator) Start

func (_m *MockCoordinator) Start(_param0 net.Addr, _param1 chan error) error

func (*MockCoordinator) Stop

func (_m *MockCoordinator) Stop()

func (*MockCoordinator) SubscribeTo

func (_m *MockCoordinator) SubscribeTo(_param0 StreamID, _param1 Timestamp) error

func (*MockCoordinator) TaskAcquired

func (_m *MockCoordinator) TaskAcquired(_param0 StreamID) error

func (*MockCoordinator) UnsubscribeFrom

func (_m *MockCoordinator) UnsubscribeFrom(_param0 StreamID) error

func (*MockCoordinator) WatchSubscriberPosition

func (_m *MockCoordinator) WatchSubscriberPosition(_param0 StreamID, _param1 string, _param2 chan struct{}) (chan Timestamp, chan error)

func (*MockCoordinator) WatchSubscribers

func (_m *MockCoordinator) WatchSubscribers(_param0 StreamID, _param1 chan struct{}) (chan string, chan string, chan error)

func (*MockCoordinator) WatchTagMatch

func (_m *MockCoordinator) WatchTagMatch(_param0 StreamID, _param1 chan string, _param2 chan string, _param3 chan error)

func (*MockCoordinator) WatchTasks

func (_m *MockCoordinator) WatchTasks(_param0 chan struct{}) (chan []string, chan error)

type MockInputManager

type MockInputManager struct {
	// contains filtered or unexported fields
}

Mock of InputManager interface

func NewMockInputManager

func NewMockInputManager(ctrl *gomock.Controller) *MockInputManager

func (*MockInputManager) EXPECT

func (_m *MockInputManager) EXPECT() *_MockInputManagerRecorder

func (*MockInputManager) SetTaskManager

func (_m *MockInputManager) SetTaskManager(_param0 TaskManager)

func (*MockInputManager) SubscribeTo

func (_m *MockInputManager) SubscribeTo(_param0 StreamID, _param1 Timestamp, _param2 RecordProcessor)

func (*MockInputManager) UnsubscribeFrom

func (_m *MockInputManager) UnsubscribeFrom(_param0 StreamID, _param1 RecordProcessor)

type MockTask

type MockTask struct {
	// contains filtered or unexported fields
}

Mock of Task interface

func NewMockTask

func NewMockTask(ctrl *gomock.Controller) *MockTask

func (*MockTask) EXPECT

func (_m *MockTask) EXPECT() *_MockTaskRecorder

func (*MockTask) GetSnapshot

func (_m *MockTask) GetSnapshot() ([]byte, error)

func (*MockTask) ProcessRecord

func (_m *MockTask) ProcessRecord(_param0 *Record) error

func (*MockTask) Run

func (_m *MockTask) Run(_param0 chan error)

func (*MockTask) Stop

func (_m *MockTask) Stop()

func (*MockTask) Sync

func (_m *MockTask) Sync() (Timestamp, error)

type MockTaskStarter

type MockTaskStarter struct {
	// contains filtered or unexported fields
}

Mock of TaskStarter interface

func NewMockTaskStarter

func NewMockTaskStarter(ctrl *gomock.Controller) *MockTaskStarter

func (*MockTaskStarter) EXPECT

func (_m *MockTaskStarter) EXPECT() *_MockTaskStarterRecorder

func (*MockTaskStarter) StartTask

func (_m *MockTaskStarter) StartTask(_param0 StreamID, _param1 string, _param2 string) (*TaskInfo, error)

type MultiSentTracker

type MultiSentTracker struct {
	// contains filtered or unexported fields
}

MultiSentTracker enables notifying multiple SentTrackers of a successfuly sent record

func (MultiSentTracker) SentSuccessfuly

func (st MultiSentTracker) SentSuccessfuly(compID StreamID, t *Record) error

SentSuccessfuly notifies multiple senttrackers of successfuly sent record

type NewSubscriber

type NewSubscriber struct {
	Addr string
	From Timestamp
}

NewSubscriber encapsulates information about a new subscription

type Persister

type Persister interface {
	Close()
	TaskPersister
	StreamBuffer
	SentTracker
	ReceivedTracker
}

Persister takes care of persisting in-flight records and computation state

func NewPersister

func NewPersister(customizeConfig func(*PersisterConfig)) (Persister, error)

NewPersister initializes and returns a new Persister instance

type PersisterConfig

type PersisterConfig struct {
	Dir string
}

PersisterConfig configures the persistent storage

type PublishCoordinator

type PublishCoordinator interface {
	GetSubscribers(streamID StreamID) ([]string, error) // DEPRECATE
	WatchSubscribers(StreamID, chan struct{}) (chan string, chan string, chan error)
	GetSubscriberPosition(StreamID, string) (Timestamp, error)
	WatchSubscriberPosition(StreamID, string, chan struct{}) (chan Timestamp, chan error)
	RegisterAsPublisher(streamID StreamID) error
	RegisterAsPublisherWithSession(session string, streamID StreamID) error
	DeregisterAsPublisher(streamID StreamID) error
}

PublishCoordinator handles the coordination of publishing a stream

type RPCHandler

type RPCHandler struct {
	// contains filtered or unexported fields
}

RPCHandler handles incoming RPC calls

func (*RPCHandler) SubmitRecord

func (rpci *RPCHandler) SubmitRecord(t *Record, reply *string) error

SubmitRecord submits a new record into the worker process

func (*RPCHandler) Sync

func (rpci *RPCHandler) Sync(compID StreamID, reply *[]byte) error

Sync is the RPC method called by slave workers wanting to sync a computation

type ReceivedTracker

type ReceivedTracker interface {
	PersistReceivedRecords(StreamID, []*Record) error
	GetRecentReceived(StreamID) ([]string, error)
	ReceivedAlready(StreamID, *Record) (bool, error)
}

ReceivedTracker persists info about which records we've already seen

type Receiver

type Receiver interface {
	InputManager
	ListenAddr() net.Addr
	Listen(chan error)
}

Receiver receives new records via incoming RPC calls

func NewReceiver

func NewReceiver(coordinator Coordinator, customizeConfig func(*ReceiverConfig)) Receiver

NewReceiver initializes a new receiver

type ReceiverConfig

type ReceiverConfig struct {
	Addr string
	Port string
}

ReceiverConfig configures the RPC receiver

type Record

type Record struct {
	ID string `json:"id"`

	StreamID  StreamID    `json:"stream_id"`
	LWM       Timestamp   `json:"lwm"`
	Timestamp Timestamp   `json:"timestamp"`
	Data      interface{} `json:"data"`
}

Record is the atomic unit of data flowing through Dagger

func CreateRecord

func CreateRecord(streamID StreamID, data string) (*Record, error)

CreateRecord creates a new record with given stream ID and data

func CreateRecordFromJSON

func CreateRecordFromJSON(b []byte) (*Record, error)

CreateRecordFromJSON parses a complete record from JSON and adds LWM and ID

func (*Record) String

func (t *Record) String() string

type RecordProcessor

type RecordProcessor interface {
	ProcessRecord(*Record) error
}

RecordProcessor is an object capable of processing a record

type ReplicationCoordinator

type ReplicationCoordinator interface {
	JoinGroup(streamID StreamID) (GroupHandler, error)
}

ReplicationCoordinator coordinates replication of records onto multiple computations on multiple hosts for high availability

type SentTracker

type SentTracker interface {
	SentSuccessfuly(StreamID, *Record) error
}

SentTracker deletes production entries that have been ACKed from the DB

type Session

type Session interface {
	ID() string
	Renew() error
}

Session represents a coordination session. As long as the session doesn't expire, other agents expect data from us.

type StreamBuffer

type StreamBuffer interface {
	Insert(compID StreamID, bufID string, t *Record) error
	ReadBuffer(compID StreamID, bufID string, from Timestamp, to Timestamp, recCh chan<- *Record, errc chan<- error, readCompleted chan struct{})
}

StreamBuffer persistently "buffers" records sorted by timestamp

type StreamDispatcher

type StreamDispatcher struct {
	// contains filtered or unexported fields
}

StreamDispatcher dispatches records from a single stream to registered subscribers

func NewStreamDispatcher

func NewStreamDispatcher(streamID StreamID, coordinator Coordinator,
	persister Persister, lwmTracker LWMTracker, groupHandler GroupHandler,
	customizeConfig func(*DispatcherConfig)) *StreamDispatcher

func (*StreamDispatcher) ProcessRecord

func (sd *StreamDispatcher) ProcessRecord(t *Record) error

func (*StreamDispatcher) Run

func (sd *StreamDispatcher) Run(errc chan error)

func (*StreamDispatcher) Stop

func (sd *StreamDispatcher) Stop()

type StreamID

type StreamID string

StreamID identifies a stream of records

func StripTags

func StripTags(sid StreamID) StreamID

StripTags removes the kv pairs encoded in StreamID

func UnparseTags

func UnparseTags(topic StreamID, tags Tags) StreamID

UnparseTags creates a string representation of a StreamID from a topic name and a list of tags

type StreamIterator

type StreamIterator struct {
	// contains filtered or unexported fields
}

func (*StreamIterator) Dispatch

func (si *StreamIterator) Dispatch(startAt Timestamp)

func (*StreamIterator) ProcessRecord

func (si *StreamIterator) ProcessRecord(t *Record) error

func (*StreamIterator) Stop

func (si *StreamIterator) Stop()

type SubscribeCoordinator

type SubscribeCoordinator interface {
	SubscribeTo(streamID StreamID, from Timestamp) error
	EnsurePublisherNum(StreamID, int, chan struct{}) chan error
	CheckpointPosition(streamID StreamID, from Timestamp) error
	UnsubscribeFrom(streamID StreamID) error
	WatchTagMatch(topic StreamID, addedRet chan string, droppedRet chan string, errc chan error)
}

SubscribeCoordinator handles the act of subscribing to a stream

type Tags

type Tags map[string]string

Tags are key-value metadata attached to a stream

func ParseTags

func ParseTags(s StreamID) Tags

ParseTags returns kv pairs encoded in stream ID

type Task

type Task interface {
	RecordProcessor
	GetSnapshot() ([]byte, error)
	Sync() (Timestamp, error)
	Run(chan error)
	Stop()
}

Task is a unit of computation that consumes and/or produces a stream and can be replicated and synced across workers

type TaskCoordinator

type TaskCoordinator interface {
	// WatchTasks creates a watcher that watches when new tasks show up or
	// are dropped
	WatchTasks(chan struct{}) (chan []string, chan error)
	// AcquireTask tries to take a task from the task list. If another worker
	// manages to take the task, the call returns false.
	AcquireTask(StreamID) (bool, error)
	// TaskAcquired marks the task as successfuly acquired
	TaskAcquired(StreamID) error
	// ReleaseTask releases the lock on the task so others can try to acquire it
	ReleaseTask(StreamID) (bool, error)
}

TaskCoordinator allows watching and taking new available tasks

type TaskInfo

type TaskInfo struct {
	StreamID StreamID
	Task     Task
	Inputs   []StreamID
	From     Timestamp
}

TaskInfo summarizes the newly created task with info about its input and position up to which it has already processed data

func NewMatchTask

func NewMatchTask(c Coordinator, r InputManager, sid StreamID, definition string) (*TaskInfo, error)

NewMatchTask creates a new internal matching task that automatically subscribes to new matching streams

type TaskManager

type TaskManager interface {
	ManageTasks() error
	GetTaskSnapshot(StreamID) ([]byte, error)
	Stop()
}

TaskManager manages tasks

func NewTaskManager

func NewTaskManager(coordinator Coordinator, receiver InputManager, taskStarter TaskStarter) TaskManager

NewTaskManager creates a new task manager

type TaskPersister

type TaskPersister interface {
	CommitComputation(compID StreamID, in *Record, out []*Record) error
	GetLastTimestamp(compID StreamID) (Timestamp, error)
	GetSnapshot(compID StreamID) ([]byte, error)
	ApplySnapshot(compID StreamID, snapshot []byte) error
}

TaskPersister enables tasks to commit and restore their state

type TaskStarter

type TaskStarter interface {
	StartTask(StreamID, string, string) (*TaskInfo, error)
}

TaskStarter sets up a task from a given stream ID

func NewTaskStarter

func NewTaskStarter(c Coordinator, p Persister, dispatcherConfig func(*DispatcherConfig)) TaskStarter

type Timestamp

type Timestamp int64

Timestamp represents the timestamp of a dagger record

func TSFromString

func TSFromString(ts string) Timestamp

TSFromString parses a stringified int64 into a dagger timestamp

func TSFromTime

func TSFromTime(ts time.Time) Timestamp

TSFromTime converts a Go's time object into a dagger timestamp

func (Timestamp) String

func (t Timestamp) String() string

func (Timestamp) ToTime

func (t Timestamp) ToTime() time.Time

ToTime converts a timestamp into a Go's time object

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL