core: go.gazette.dev/core/examples/stream-sum Index | Files | Directories

package stream_sum

import "go.gazette.dev/core/examples/stream-sum"

Package stream_sum is an example application consisting of three stages:

1) A `chunker` job randomly generates a number of unique "streams", with stream content emitted across a number of interleaved data chunks.

2) A `summer` consumer accumulates stream chunks and computes a running SHA1-sum of each stream's content. When the stream is completed, the `summer` consumer emits a final sum to an output journal.

3) Having written a complete stream, the `chunker` job confirms that the correct sum is written to the output journal.

The `chunker` and `summer` tasks may be independently scaled, and are invariant to process failures and restarts.

The stream-sum example application is also a sneaky integration test: it actively verifies processing guarantees provided by Gazette, such as exactly-once semantics and bounds on end-to-end latency, and fails if those properties are not met. stream-sum is used in Gazette's continuous integration and chaos-testing suites.

Index

Package Files

stream_sum.go

Constants

const FinalSumsJournal pb.Journal = "examples/stream-sum/sums"

FinalSumsJournal to which final stream sums are written.

func GenerateAndVerifyStreams Uses

func GenerateAndVerifyStreams(ctx context.Context, cfg *ChunkerConfig) error

GenerateAndVerifyStreams is the main routine of the `chunker` job. It generates and verifies streams based on the ChunkerConfig.

type Chunk Uses

type Chunk struct {
    UUID  message.UUID
    ID    StreamID // Unique ID of the stream.
    SeqNo int      // Monotonic sequence number, starting from 1.
    Data  []byte   // Raw data included in the Value. If empty, this is the stream's final chunk.
}

Chunk is an ordered slice of stream content.

func (*Chunk) GetUUID Uses

func (c *Chunk) GetUUID() message.UUID

func (*Chunk) NewAcknowledgement Uses

func (c *Chunk) NewAcknowledgement(pb.Journal) message.Message

func (*Chunk) SetUUID Uses

func (c *Chunk) SetUUID(uuid message.UUID)

type ChunkerConfig Uses

type ChunkerConfig struct {
    Chunker struct {
        mbp.ZoneConfig
        Streams int `long:"streams" default:"-1" description:"Number of streams each worker should create. <0 for infinite"`
        Chunks  int `long:"chunks" default:"100" description:"Number of chunks per stream"`
        Workers int `long:"workers" default:"4" description:"Number of parallel workers"`
    }   `group:"Chunker" namespace:"chunker" env-namespace:"CHUNKER"`

    Broker      mbp.ClientConfig      `group:"Broker" namespace:"broker" env-namespace:"BROKER"`
    Log         mbp.LogConfig         `group:"Logging" namespace:"log" env-namespace:"LOG"`
    Diagnostics mbp.DiagnosticsConfig `group:"Debug" namespace:"debug" env-namespace:"DEBUG"`
}

ChunkerConfig is the configuration used by the `chunker` job binary.

type StreamID Uses

type StreamID [16]byte

StreamID uniquely identifies a stream.

type Sum Uses

type Sum struct {
    UUID  message.UUID
    ID    StreamID // Unique ID of the stream.
    SeqNo int      // SeqNo of last Chunk summed over.
    Value uint64   // Computed sum through SeqNo.
}

Sum represents a partial or final CRC64 sum of a stream.

func (*Sum) GetUUID Uses

func (s *Sum) GetUUID() message.UUID

func (*Sum) NewAcknowledgement Uses

func (s *Sum) NewAcknowledgement(pb.Journal) message.Message

func (*Sum) SetUUID Uses

func (s *Sum) SetUUID(uuid message.UUID)

func (*Sum) Update Uses

func (s *Sum) Update(chunk Chunk) (done bool, err error)

Update folds a Chunk into this Sum, returning whether this is the last Chunk of the Stream.

type Summer Uses

type Summer struct{}

Summer consumes stream chunks, aggregates chunk data, and emits final sums. It implements the runconsumer.Application interface.

func (Summer) ConsumeMessage Uses

func (Summer) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope, pub *message.Publisher) error

ConsumeMessage folds a Chunk into its respective partial stream sum. If the Chunk represents a stream EOF, it emits a final sum. consumer.Application implementation.

func (Summer) FinalizeTxn Uses

func (Summer) FinalizeTxn(shard consumer.Shard, store consumer.Store, _ *message.Publisher) error

FinalizeTxn marshals partial stream sums to the |store| to ensure persistence across consumer transactions. consumer.Application implementation.

func (Summer) InitApplication Uses

func (Summer) InitApplication(args runconsumer.InitArgs) error

InitApplication is a no-op, as Summer provides no client-facing APIs.

func (Summer) NewConfig Uses

func (Summer) NewConfig() runconsumer.Config

NewConfig returns a new BaseConfig.

func (Summer) NewMessage Uses

func (Summer) NewMessage(*pb.JournalSpec) (message.Message, error)

NewMessage returns a Chunk message. consumer.Application implementation.

func (Summer) NewStore Uses

func (Summer) NewStore(shard consumer.Shard, rec *recoverylog.Recorder) (consumer.Store, error)

NewStore builds a RocksDB or SQLite store for the Shard. consumer.Application implementation.

Directories

PathSynopsis
chunker
summerPackage summer runs the stream_sum.Summer consumer.

Package stream_sum imports 25 packages (graph) and is imported by 6 packages. Updated 2020-06-13. Refresh now. Tools for package owners.