Documentation ¶
Index ¶
- func MovingLimitRead(p StreamBuffer, streamID StreamID, bufName string, from <-chan Timestamp, ...)
- func ParseComputationID(s StreamID) (string, string, error)
- func ProcessMultipleProcessors(procs []RecordProcessor, t *Record) error
- func ProcessMultipleRecords(tp RecordProcessor, records []*Record) error
- type BufferedDispatcher
- type ComputationPlugin
- type ComputationPluginInfo
- type ComputationPluginResponse
- type ComputationPluginState
- type Config
- type ConsulConfig
- type Coordinator
- type Deduplicator
- type Dispatcher
- type DispatcherConfig
- type GroupHandler
- type HttpAPI
- type HttpAPIConfig
- type InputManager
- type LWMTracker
- type LevelDBPersister
- func (p *LevelDBPersister) ApplySnapshot(compID StreamID, snapshot []byte) error
- func (p *LevelDBPersister) Close()
- func (p *LevelDBPersister) CommitComputation(compID StreamID, in *Record, out []*Record) error
- func (p *LevelDBPersister) GetLastTimestamp(compID StreamID) (Timestamp, error)
- func (p *LevelDBPersister) GetRecentReceived(comp StreamID) ([]string, error)
- func (p *LevelDBPersister) GetSnapshot(compID StreamID) ([]byte, error)
- func (p *LevelDBPersister) Insert(compID StreamID, bufID string, t *Record) error
- func (p *LevelDBPersister) PersistReceivedRecords(comp StreamID, records []*Record) error
- func (p *LevelDBPersister) ReadBuffer(compID StreamID, bufID string, from Timestamp, to Timestamp, ...)
- func (p *LevelDBPersister) ReceivedAlready(comp StreamID, t *Record) (bool, error)
- func (p *LevelDBPersister) SentSuccessfuly(compID StreamID, t *Record) error
- type LinearizedRecordProcessor
- type Linearizer
- type MockCoordinator
- func (_m *MockCoordinator) AcquireTask(_param0 StreamID) (bool, error)
- func (_m *MockCoordinator) CheckpointPosition(_param0 StreamID, _param1 Timestamp) error
- func (_m *MockCoordinator) DeregisterAsPublisher(_param0 StreamID) error
- func (_m *MockCoordinator) EXPECT() *_MockCoordinatorRecorder
- func (_m *MockCoordinator) EnsurePublisherNum(_param0 StreamID, _param1 int, _param2 chan struct{}) chan error
- func (_m *MockCoordinator) GetSubscriberPosition(_param0 StreamID, _param1 string) (Timestamp, error)
- func (_m *MockCoordinator) GetSubscribers(_param0 StreamID) ([]string, error)
- func (_m *MockCoordinator) JoinGroup(_param0 StreamID) (GroupHandler, error)
- func (_m *MockCoordinator) RegisterAsPublisher(_param0 StreamID) error
- func (_m *MockCoordinator) RegisterAsPublisherWithSession(_param0 string, _param1 StreamID) error
- func (_m *MockCoordinator) RegisterSession() (string, error)
- func (_m *MockCoordinator) ReleaseTask(_param0 StreamID) (bool, error)
- func (_m *MockCoordinator) RenewSession(_param0 string) error
- func (_m *MockCoordinator) Start(_param0 net.Addr, _param1 chan error) error
- func (_m *MockCoordinator) Stop()
- func (_m *MockCoordinator) SubscribeTo(_param0 StreamID, _param1 Timestamp) error
- func (_m *MockCoordinator) TaskAcquired(_param0 StreamID) error
- func (_m *MockCoordinator) UnsubscribeFrom(_param0 StreamID) error
- func (_m *MockCoordinator) WatchSubscriberPosition(_param0 StreamID, _param1 string, _param2 chan struct{}) (chan Timestamp, chan error)
- func (_m *MockCoordinator) WatchSubscribers(_param0 StreamID, _param1 chan struct{}) (chan string, chan string, chan error)
- func (_m *MockCoordinator) WatchTagMatch(_param0 StreamID, _param1 chan string, _param2 chan string, _param3 chan error)
- func (_m *MockCoordinator) WatchTasks(_param0 chan struct{}) (chan []string, chan error)
- type MockInputManager
- func (_m *MockInputManager) EXPECT() *_MockInputManagerRecorder
- func (_m *MockInputManager) SetTaskManager(_param0 TaskManager)
- func (_m *MockInputManager) SubscribeTo(_param0 StreamID, _param1 Timestamp, _param2 RecordProcessor)
- func (_m *MockInputManager) UnsubscribeFrom(_param0 StreamID, _param1 RecordProcessor)
- type MockTask
- type MockTaskStarter
- type MultiSentTracker
- type NewSubscriber
- type Persister
- type PersisterConfig
- type PublishCoordinator
- type RPCHandler
- type ReceivedTracker
- type Receiver
- type ReceiverConfig
- type Record
- type RecordProcessor
- type ReplicationCoordinator
- type SentTracker
- type Session
- type StreamBuffer
- type StreamDispatcher
- type StreamID
- type StreamIterator
- type SubscribeCoordinator
- type Tags
- type Task
- type TaskCoordinator
- type TaskInfo
- type TaskManager
- type TaskPersister
- type TaskStarter
- type Timestamp
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MovingLimitRead ¶
func MovingLimitRead(p StreamBuffer, streamID StreamID, bufName string, from <-chan Timestamp, to <-chan Timestamp, recs chan<- *Record, errc chan error)
MovingLimitRead performs asynchronous reads of the stream buffer. While the read is in progress, the from and to limits can be updated multiple times and the latest values will be used in the next read when the first completes.
func ParseComputationID ¶
ParseComputationID parses a computation definition
func ProcessMultipleProcessors ¶
func ProcessMultipleProcessors(procs []RecordProcessor, t *Record) error
ProcessMultipleProcessors processes a single record with multiple record processors
func ProcessMultipleRecords ¶
func ProcessMultipleRecords(tp RecordProcessor, records []*Record) error
ProcessMultipleRecords processes multiple records with a single record processor
Types ¶
type BufferedDispatcher ¶
type BufferedDispatcher struct {
// contains filtered or unexported fields
}
BufferedDispatcher bufferes produced records and sends them with retrying until they are ACKed
func StartBufferedDispatcher ¶
func StartBufferedDispatcher(compID StreamID, dispatcher RecordProcessor, sentTracker SentTracker, lwmTracker LWMTracker, stopCh chan struct{}) *BufferedDispatcher
StartBufferedDispatcher creates a new buffered dispatcher and starts workers that will be consuming off the queue an sending records
func (*BufferedDispatcher) ProcessRecord ¶
func (bd *BufferedDispatcher) ProcessRecord(t *Record) error
ProcessRecord sends the record to the buffered channel
func (*BufferedDispatcher) Stop ¶
func (bd *BufferedDispatcher) Stop()
Stop blocks until all buffered records are sent
type ComputationPlugin ¶
type ComputationPlugin interface { GetInfo(definition string) (*ComputationPluginInfo, error) SubmitRecord(t *Record) (*ComputationPluginResponse, error) GetSnapshot() ([]byte, error) ApplySnapshot([]byte) error Stop() error }
ComputationPlugin handles the running and interacting with a computation plugin process
func StartComputationPlugin ¶
func StartComputationPlugin(name string, compID StreamID) (ComputationPlugin, error)
StartComputationPlugin starts the plugin process
type ComputationPluginInfo ¶
ComputationPluginInfo contains information about this computation plugin
type ComputationPluginResponse ¶
type ComputationPluginResponse struct {
Records []*Record
}
ComputationPluginResponse is returned from a computation plugin to the main app
type ComputationPluginState ¶
type ComputationPluginState struct {
State []byte
}
ComputationPluginState is state returned from a computation plugin to the main app
type Config ¶
Config is used to configure the node
func DefaultConfig ¶
DefaultConfig provides default config values
type ConsulConfig ¶
ConsulConfig is used to configure the Consul coordinator
type Coordinator ¶
type Coordinator interface { // GetSubscribers(string) ([]string, error) // GetConfig(string) (string, error) SubscribeCoordinator PublishCoordinator TaskCoordinator ReplicationCoordinator RegisterSession() (string, error) RenewSession(string) error Start(net.Addr, chan error) error Stop() }
Coordinator coordinates topics, their publishers and subscribers
func NewConsulCoordinator ¶
func NewConsulCoordinator(customizeConfig func(*ConsulConfig)) Coordinator
NewConsulCoordinator creates a new instance of Consul coordinator
type Deduplicator ¶
Deduplicator throws away duplicate records (and ACKs their senders)
func NewDeduplicator ¶
func NewDeduplicator(computation StreamID, recordTracker ReceivedTracker) (Deduplicator, error)
NewDeduplicator initializes a new deduplicator
type Dispatcher ¶
Dispatcher dispatches records to registered subscribers
func NewDispatcher ¶
func NewDispatcher(coordinator Coordinator) *Dispatcher
NewDispatcher creates a new dispatcher
func (*Dispatcher) ProcessRecord ¶
func (d *Dispatcher) ProcessRecord(t *Record) error
ProcessRecord sends record to registered subscribers via RPC
type DispatcherConfig ¶
type DispatcherConfig struct {
PipeliningLimit int
}
DispatcherConfig configures the dispatcher
type GroupHandler ¶
GroupHandler handles leadership status of a group
type HttpAPI ¶
type HttpAPI struct {
// contains filtered or unexported fields
}
func NewHttpAPI ¶
func NewHttpAPI(coordinator Coordinator, receiver Receiver, dispatcher *Dispatcher) HttpAPI
type HttpAPIConfig ¶
type HttpAPIConfig struct {
Port string
}
HttpAPIConfig configures the HTTP API
type InputManager ¶
type InputManager interface { SubscribeTo(StreamID, Timestamp, RecordProcessor) UnsubscribeFrom(StreamID, RecordProcessor) SetTaskManager(TaskManager) }
InputManager manages tasks' subscriptions to streams
type LWMTracker ¶
type LWMTracker interface { RecordProcessor SentTracker BeforeDispatching([]*Record) GetCombinedLWM() Timestamp GetLocalLWM() Timestamp GetUpstreamLWM() Timestamp }
func NewLWMTracker ¶
func NewLWMTracker() LWMTracker
type LevelDBPersister ¶
type LevelDBPersister struct {
// contains filtered or unexported fields
}
LevelDBPersister is built on top of LevelDB
func (*LevelDBPersister) ApplySnapshot ¶
func (p *LevelDBPersister) ApplySnapshot(compID StreamID, snapshot []byte) error
ApplySnapshot applies the snapshot of the computation's persisted state
func (*LevelDBPersister) CommitComputation ¶
func (p *LevelDBPersister) CommitComputation(compID StreamID, in *Record, out []*Record) error
CommitComputation persists information about received and produced records atomically
func (*LevelDBPersister) GetLastTimestamp ¶
func (p *LevelDBPersister) GetLastTimestamp(compID StreamID) (Timestamp, error)
GetLastTimestamp returns the timestamp of the latest record processed by this task
func (*LevelDBPersister) GetRecentReceived ¶
func (p *LevelDBPersister) GetRecentReceived(comp StreamID) ([]string, error)
GetRecentReceived returns IDs of records we have recently received
func (*LevelDBPersister) GetSnapshot ¶
func (p *LevelDBPersister) GetSnapshot(compID StreamID) ([]byte, error)
GetSnapshot returns the snapshot of the computation's persisted state
func (*LevelDBPersister) Insert ¶
func (p *LevelDBPersister) Insert(compID StreamID, bufID string, t *Record) error
Insert inserts a received record into the ordered queue for a computation
func (*LevelDBPersister) PersistReceivedRecords ¶
func (p *LevelDBPersister) PersistReceivedRecords(comp StreamID, records []*Record) error
PersistReceivedRecords save the info about which records we've already seen
func (*LevelDBPersister) ReadBuffer ¶
func (p *LevelDBPersister) ReadBuffer(compID StreamID, bufID string, from Timestamp, to Timestamp, recCh chan<- *Record, errc chan<- error, readCompletedCh chan struct{})
ReadBuffer returns a piece of the input buffer between specified timestamps
func (*LevelDBPersister) ReceivedAlready ¶
func (p *LevelDBPersister) ReceivedAlready(comp StreamID, t *Record) (bool, error)
ReceivedAlready returns whether we've seen this record before
func (*LevelDBPersister) SentSuccessfuly ¶
func (p *LevelDBPersister) SentSuccessfuly(compID StreamID, t *Record) error
SentSuccessfuly deletes the production from the DB after it's been ACKed
type LinearizedRecordProcessor ¶
LinearizedRecordProcessor is an object capable of processing a list of ordered records
type Linearizer ¶
type Linearizer struct { LWM Timestamp // contains filtered or unexported fields }
Linearizer buffers records and forwards them to the next RecordProcessor sorted by timestamp, while making sure all the records in a certain time frame have already arrived
func NewLinearizer ¶
func NewLinearizer(compID StreamID, store StreamBuffer, lwmTracker LWMTracker) *Linearizer
NewLinearizer creates a new linearizer for a certain computation
func (*Linearizer) ProcessRecord ¶
func (l *Linearizer) ProcessRecord(t *Record) error
ProcessRecord inserts the record into the buffer sorted by timestamps
func (*Linearizer) Run ¶
func (l *Linearizer) Run(errc chan error)
Run forwards the records from the buffer to the next RecordProcessor when LWM tells us no more records will arrive in the forwarded time frame
func (*Linearizer) SetProcessor ¶
func (l *Linearizer) SetProcessor(ltp LinearizedRecordProcessor)
SetProcessor sets the processor that received linearized tuiples
func (*Linearizer) SetStartLWM ¶
func (l *Linearizer) SetStartLWM(time Timestamp)
SetStartLWM sets the LWM at which we start processing
type MockCoordinator ¶
type MockCoordinator struct {
// contains filtered or unexported fields
}
Mock of Coordinator interface
func NewMockCoordinator ¶
func NewMockCoordinator(ctrl *gomock.Controller) *MockCoordinator
func (*MockCoordinator) AcquireTask ¶
func (_m *MockCoordinator) AcquireTask(_param0 StreamID) (bool, error)
func (*MockCoordinator) CheckpointPosition ¶
func (_m *MockCoordinator) CheckpointPosition(_param0 StreamID, _param1 Timestamp) error
func (*MockCoordinator) DeregisterAsPublisher ¶
func (_m *MockCoordinator) DeregisterAsPublisher(_param0 StreamID) error
func (*MockCoordinator) EXPECT ¶
func (_m *MockCoordinator) EXPECT() *_MockCoordinatorRecorder
func (*MockCoordinator) EnsurePublisherNum ¶
func (_m *MockCoordinator) EnsurePublisherNum(_param0 StreamID, _param1 int, _param2 chan struct{}) chan error
func (*MockCoordinator) GetSubscriberPosition ¶
func (_m *MockCoordinator) GetSubscriberPosition(_param0 StreamID, _param1 string) (Timestamp, error)
func (*MockCoordinator) GetSubscribers ¶
func (_m *MockCoordinator) GetSubscribers(_param0 StreamID) ([]string, error)
func (*MockCoordinator) JoinGroup ¶
func (_m *MockCoordinator) JoinGroup(_param0 StreamID) (GroupHandler, error)
func (*MockCoordinator) RegisterAsPublisher ¶
func (_m *MockCoordinator) RegisterAsPublisher(_param0 StreamID) error
func (*MockCoordinator) RegisterAsPublisherWithSession ¶
func (_m *MockCoordinator) RegisterAsPublisherWithSession(_param0 string, _param1 StreamID) error
func (*MockCoordinator) RegisterSession ¶
func (_m *MockCoordinator) RegisterSession() (string, error)
func (*MockCoordinator) ReleaseTask ¶
func (_m *MockCoordinator) ReleaseTask(_param0 StreamID) (bool, error)
func (*MockCoordinator) RenewSession ¶
func (_m *MockCoordinator) RenewSession(_param0 string) error
func (*MockCoordinator) Start ¶
func (_m *MockCoordinator) Start(_param0 net.Addr, _param1 chan error) error
func (*MockCoordinator) Stop ¶
func (_m *MockCoordinator) Stop()
func (*MockCoordinator) SubscribeTo ¶
func (_m *MockCoordinator) SubscribeTo(_param0 StreamID, _param1 Timestamp) error
func (*MockCoordinator) TaskAcquired ¶
func (_m *MockCoordinator) TaskAcquired(_param0 StreamID) error
func (*MockCoordinator) UnsubscribeFrom ¶
func (_m *MockCoordinator) UnsubscribeFrom(_param0 StreamID) error
func (*MockCoordinator) WatchSubscriberPosition ¶
func (_m *MockCoordinator) WatchSubscriberPosition(_param0 StreamID, _param1 string, _param2 chan struct{}) (chan Timestamp, chan error)
func (*MockCoordinator) WatchSubscribers ¶
func (_m *MockCoordinator) WatchSubscribers(_param0 StreamID, _param1 chan struct{}) (chan string, chan string, chan error)
func (*MockCoordinator) WatchTagMatch ¶
func (_m *MockCoordinator) WatchTagMatch(_param0 StreamID, _param1 chan string, _param2 chan string, _param3 chan error)
func (*MockCoordinator) WatchTasks ¶
func (_m *MockCoordinator) WatchTasks(_param0 chan struct{}) (chan []string, chan error)
type MockInputManager ¶
type MockInputManager struct {
// contains filtered or unexported fields
}
Mock of InputManager interface
func NewMockInputManager ¶
func NewMockInputManager(ctrl *gomock.Controller) *MockInputManager
func (*MockInputManager) EXPECT ¶
func (_m *MockInputManager) EXPECT() *_MockInputManagerRecorder
func (*MockInputManager) SetTaskManager ¶
func (_m *MockInputManager) SetTaskManager(_param0 TaskManager)
func (*MockInputManager) SubscribeTo ¶
func (_m *MockInputManager) SubscribeTo(_param0 StreamID, _param1 Timestamp, _param2 RecordProcessor)
func (*MockInputManager) UnsubscribeFrom ¶
func (_m *MockInputManager) UnsubscribeFrom(_param0 StreamID, _param1 RecordProcessor)
type MockTask ¶
type MockTask struct {
// contains filtered or unexported fields
}
Mock of Task interface
func NewMockTask ¶
func NewMockTask(ctrl *gomock.Controller) *MockTask
func (*MockTask) GetSnapshot ¶
func (*MockTask) ProcessRecord ¶
type MockTaskStarter ¶
type MockTaskStarter struct {
// contains filtered or unexported fields
}
Mock of TaskStarter interface
func NewMockTaskStarter ¶
func NewMockTaskStarter(ctrl *gomock.Controller) *MockTaskStarter
func (*MockTaskStarter) EXPECT ¶
func (_m *MockTaskStarter) EXPECT() *_MockTaskStarterRecorder
type MultiSentTracker ¶
type MultiSentTracker struct {
// contains filtered or unexported fields
}
MultiSentTracker enables notifying multiple SentTrackers of a successfuly sent record
func (MultiSentTracker) SentSuccessfuly ¶
func (st MultiSentTracker) SentSuccessfuly(compID StreamID, t *Record) error
SentSuccessfuly notifies multiple senttrackers of successfuly sent record
type NewSubscriber ¶
NewSubscriber encapsulates information about a new subscription
type Persister ¶
type Persister interface { Close() TaskPersister StreamBuffer SentTracker ReceivedTracker }
Persister takes care of persisting in-flight records and computation state
func NewPersister ¶
func NewPersister(customizeConfig func(*PersisterConfig)) (Persister, error)
NewPersister initializes and returns a new Persister instance
type PersisterConfig ¶
type PersisterConfig struct {
Dir string
}
PersisterConfig configures the persistent storage
type PublishCoordinator ¶
type PublishCoordinator interface { GetSubscribers(streamID StreamID) ([]string, error) // DEPRECATE WatchSubscribers(StreamID, chan struct{}) (chan string, chan string, chan error) GetSubscriberPosition(StreamID, string) (Timestamp, error) WatchSubscriberPosition(StreamID, string, chan struct{}) (chan Timestamp, chan error) RegisterAsPublisher(streamID StreamID) error RegisterAsPublisherWithSession(session string, streamID StreamID) error DeregisterAsPublisher(streamID StreamID) error }
PublishCoordinator handles the coordination of publishing a stream
type RPCHandler ¶
type RPCHandler struct {
// contains filtered or unexported fields
}
RPCHandler handles incoming RPC calls
func (*RPCHandler) SubmitRecord ¶
func (rpci *RPCHandler) SubmitRecord(t *Record, reply *string) error
SubmitRecord submits a new record into the worker process
type ReceivedTracker ¶
type ReceivedTracker interface { PersistReceivedRecords(StreamID, []*Record) error GetRecentReceived(StreamID) ([]string, error) ReceivedAlready(StreamID, *Record) (bool, error) }
ReceivedTracker persists info about which records we've already seen
type Receiver ¶
type Receiver interface { InputManager ListenAddr() net.Addr Listen(chan error) }
Receiver receives new records via incoming RPC calls
func NewReceiver ¶
func NewReceiver(coordinator Coordinator, customizeConfig func(*ReceiverConfig)) Receiver
NewReceiver initializes a new receiver
type ReceiverConfig ¶
ReceiverConfig configures the RPC receiver
type Record ¶
type Record struct { ID string `json:"id"` StreamID StreamID `json:"stream_id"` LWM Timestamp `json:"lwm"` Timestamp Timestamp `json:"timestamp"` Data interface{} `json:"data"` }
Record is the atomic unit of data flowing through Dagger
func CreateRecord ¶
CreateRecord creates a new record with given stream ID and data
func CreateRecordFromJSON ¶
CreateRecordFromJSON parses a complete record from JSON and adds LWM and ID
type RecordProcessor ¶
RecordProcessor is an object capable of processing a record
type ReplicationCoordinator ¶
type ReplicationCoordinator interface {
JoinGroup(streamID StreamID) (GroupHandler, error)
}
ReplicationCoordinator coordinates replication of records onto multiple computations on multiple hosts for high availability
type SentTracker ¶
SentTracker deletes production entries that have been ACKed from the DB
type Session ¶
Session represents a coordination session. As long as the session doesn't expire, other agents expect data from us.
type StreamBuffer ¶
type StreamBuffer interface { Insert(compID StreamID, bufID string, t *Record) error ReadBuffer(compID StreamID, bufID string, from Timestamp, to Timestamp, recCh chan<- *Record, errc chan<- error, readCompleted chan struct{}) }
StreamBuffer persistently "buffers" records sorted by timestamp
type StreamDispatcher ¶
type StreamDispatcher struct {
// contains filtered or unexported fields
}
StreamDispatcher dispatches records from a single stream to registered subscribers
func NewStreamDispatcher ¶
func NewStreamDispatcher(streamID StreamID, coordinator Coordinator, persister Persister, lwmTracker LWMTracker, groupHandler GroupHandler, customizeConfig func(*DispatcherConfig)) *StreamDispatcher
func (*StreamDispatcher) ProcessRecord ¶
func (sd *StreamDispatcher) ProcessRecord(t *Record) error
func (*StreamDispatcher) Run ¶
func (sd *StreamDispatcher) Run(errc chan error)
func (*StreamDispatcher) Stop ¶
func (sd *StreamDispatcher) Stop()
type StreamID ¶
type StreamID string
StreamID identifies a stream of records
func UnparseTags ¶
UnparseTags creates a string representation of a StreamID from a topic name and a list of tags
type StreamIterator ¶
type StreamIterator struct {
// contains filtered or unexported fields
}
func (*StreamIterator) Dispatch ¶
func (si *StreamIterator) Dispatch(startAt Timestamp)
func (*StreamIterator) ProcessRecord ¶
func (si *StreamIterator) ProcessRecord(t *Record) error
func (*StreamIterator) Stop ¶
func (si *StreamIterator) Stop()
type SubscribeCoordinator ¶
type SubscribeCoordinator interface { SubscribeTo(streamID StreamID, from Timestamp) error EnsurePublisherNum(StreamID, int, chan struct{}) chan error CheckpointPosition(streamID StreamID, from Timestamp) error UnsubscribeFrom(streamID StreamID) error WatchTagMatch(topic StreamID, addedRet chan string, droppedRet chan string, errc chan error) }
SubscribeCoordinator handles the act of subscribing to a stream
type Task ¶
type Task interface { RecordProcessor GetSnapshot() ([]byte, error) Sync() (Timestamp, error) Run(chan error) Stop() }
Task is a unit of computation that consumes and/or produces a stream and can be replicated and synced across workers
type TaskCoordinator ¶
type TaskCoordinator interface { // WatchTasks creates a watcher that watches when new tasks show up or // are dropped WatchTasks(chan struct{}) (chan []string, chan error) // AcquireTask tries to take a task from the task list. If another worker // manages to take the task, the call returns false. AcquireTask(StreamID) (bool, error) // TaskAcquired marks the task as successfuly acquired TaskAcquired(StreamID) error // ReleaseTask releases the lock on the task so others can try to acquire it ReleaseTask(StreamID) (bool, error) }
TaskCoordinator allows watching and taking new available tasks
type TaskInfo ¶
TaskInfo summarizes the newly created task with info about its input and position up to which it has already processed data
func NewMatchTask ¶
func NewMatchTask(c Coordinator, r InputManager, sid StreamID, definition string) (*TaskInfo, error)
NewMatchTask creates a new internal matching task that automatically subscribes to new matching streams
type TaskManager ¶
TaskManager manages tasks
func NewTaskManager ¶
func NewTaskManager(coordinator Coordinator, receiver InputManager, taskStarter TaskStarter) TaskManager
NewTaskManager creates a new task manager
type TaskPersister ¶
type TaskPersister interface { CommitComputation(compID StreamID, in *Record, out []*Record) error GetLastTimestamp(compID StreamID) (Timestamp, error) GetSnapshot(compID StreamID) ([]byte, error) ApplySnapshot(compID StreamID, snapshot []byte) error }
TaskPersister enables tasks to commit and restore their state
type TaskStarter ¶
TaskStarter sets up a task from a given stream ID
func NewTaskStarter ¶
func NewTaskStarter(c Coordinator, p Persister, dispatcherConfig func(*DispatcherConfig)) TaskStarter
type Timestamp ¶
type Timestamp int64
Timestamp represents the timestamp of a dagger record
func TSFromString ¶
TSFromString parses a stringified int64 into a dagger timestamp
func TSFromTime ¶
TSFromTime converts a Go's time object into a dagger timestamp