import "go.gazette.dev/core/examples/stream-sum"
Package stream_sum is an example application consisting of three stages:
1) A `chunker` job randomly generates a number of unique "streams", with stream content emitted across a number of interleaved data chunks.
2) A `summer` consumer accumulates stream chunks and computes a running SHA1-sum of each stream's content. When the stream is completed, the `summer` consumer emits a final sum to an output journal.
3) Having written a complete stream, the `chunker` job confirms that the correct sum is written to the output journal.
The `chunker` and `summer` tasks may be independently scaled, and are invariant to process failures and restarts.
The stream-sum example application is also a sneaky integration test: it actively verifies processing guarantees provided by Gazette, such as exactly-once semantics and bounds on end-to-end latency, and fails if those properties are not met. stream-sum is used in Gazette's continuous integration and chaos-testing suites.
FinalSumsJournal to which final stream sums are written.
func GenerateAndVerifyStreams(ctx context.Context, cfg *ChunkerConfig) error
GenerateAndVerifyStreams is the main routine of the `chunker` job. It generates and verifies streams based on the ChunkerConfig.
type Chunk struct { UUID message.UUID ID StreamID // Unique ID of the stream. SeqNo int // Monotonic sequence number, starting from 1. Data []byte // Raw data included in the Value. If empty, this is the stream's final chunk. }
Chunk is an ordered slice of stream content.
type ChunkerConfig struct { Chunker struct { mbp.ZoneConfig Streams int `long:"streams" default:"-1" description:"Number of streams each worker should create. <0 for infinite"` Chunks int `long:"chunks" default:"100" description:"Number of chunks per stream"` Workers int `long:"workers" default:"4" description:"Number of parallel workers"` } `group:"Chunker" namespace:"chunker" env-namespace:"CHUNKER"` Broker mbp.ClientConfig `group:"Broker" namespace:"broker" env-namespace:"BROKER"` Log mbp.LogConfig `group:"Logging" namespace:"log" env-namespace:"LOG"` Diagnostics mbp.DiagnosticsConfig `group:"Debug" namespace:"debug" env-namespace:"DEBUG"` }
ChunkerConfig is the configuration used by the `chunker` job binary.
StreamID uniquely identifies a stream.
type Sum struct { UUID message.UUID ID StreamID // Unique ID of the stream. SeqNo int // SeqNo of last Chunk summed over. Value uint64 // Computed sum through SeqNo. }
Sum represents a partial or final CRC64 sum of a stream.
Update folds a Chunk into this Sum, returning whether this is the last Chunk of the Stream.
type Summer struct{}
Summer consumes stream chunks, aggregates chunk data, and emits final sums. It implements the runconsumer.Application interface.
func (Summer) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope, pub *message.Publisher) error
ConsumeMessage folds a Chunk into its respective partial stream sum. If the Chunk represents a stream EOF, it emits a final sum. consumer.Application implementation.
FinalizeTxn marshals partial stream sums to the |store| to ensure persistence across consumer transactions. consumer.Application implementation.
func (Summer) InitApplication(args runconsumer.InitArgs) error
InitApplication is a no-op, as Summer provides no client-facing APIs.
func (Summer) NewConfig() runconsumer.Config
NewConfig returns a new BaseConfig.
NewMessage returns a Chunk message. consumer.Application implementation.
NewStore builds a RocksDB or SQLite store for the Shard. consumer.Application implementation.
Path | Synopsis |
---|---|
chunker | |
summer | Package summer runs the stream_sum.Summer consumer. |
Package stream_sum imports 25 packages (graph) and is imported by 6 packages. Updated 2020-06-13. Refresh now. Tools for package owners.