Documentation ¶
Index ¶
- func AddMessageToBatch(ctx context.Context, batch *Batch, msg kafka.Message, handler Handler, ...)
- func ProcessBatch(ctx context.Context, handler Handler, batch *Batch, errChan chan error)
- type Batch
- type BatchHandler
- type Consumer
- type Handler
- type Message
- type MessageConsumer
- type ObservationExtracted
- type ObservationMapper
- type ObservationStore
- type ResultWriter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch handles adding raw messages to a batch of ObservationExtracted events.
func (*Batch) Commit ¶
func (batch *Batch) Commit()
Commit is called when the batch has been processed. The last message has been released already, so at this point we just need to commit
func (*Batch) Events ¶
func (batch *Batch) Events() []*ObservationExtracted
Events returns the events currenty in the batch.
type BatchHandler ¶
type BatchHandler struct {
// contains filtered or unexported fields
}
BatchHandler handles batches of ObservationExtracted events that contain CSV row data.
func NewBatchHandler ¶
func NewBatchHandler( observationMapper ObservationMapper, observationStore ObservationStore, resultWriter ResultWriter, errorReporter reporter.ErrorReporter) *BatchHandler
NewBatchHandler returns a new BatchHandler to use the given observation mapper / store.
func (BatchHandler) Handle ¶
func (handler BatchHandler) Handle(ctx context.Context, events []*ObservationExtracted) error
Handle the given slice of ObservationExtracted events.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer consumes event messages.
type Handler ¶
type Handler interface {
Handle(ctx context.Context, events []*ObservationExtracted) error
}
Handler represents a handler for processing a batch of events.
type Message ¶
type Message interface { GetData() []byte Mark() Commit() }
Message represents a single message to be added to the batch.
type MessageConsumer ¶
type MessageConsumer interface {
Channels() *kafka.ConsumerGroupChannels
}
MessageConsumer provides a generic interface for consuming []byte messages (from Kafka)
type ObservationExtracted ¶
type ObservationExtracted struct { RowIndex int64 `avro:"row_index"` Row string `avro:"row"` InstanceID string `avro:"instance_id"` }
ObservationExtracted is the data that is output for each observation extracted.
func Unmarshal ¶
func Unmarshal(message Message) (*ObservationExtracted, error)
Unmarshal converts an event instance to []byte.
type ObservationMapper ¶
type ObservationMapper interface {
Map(ctx context.Context, row string, rowIndex int64, instanceID string) (*models.Observation, error)
}
ObservationMapper handles the conversion from row data to observation instances.
type ObservationStore ¶
type ObservationStore interface {
SaveAll(ctx context.Context, observations []*models.Observation) ([]*observation.Result, error)
}
ObservationStore handles the persistence of observations.
type ResultWriter ¶
type ResultWriter interface {
Write(ctx context.Context, results []*observation.Result)
}
ResultWriter dependency that outputs results