topology

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRecordContext

func NewRecordContext(record kafka.Record) context.Context

func RecordFromContext

func RecordFromContext(ctx context.Context) kafka.Record

Types

type Builder

type Builder interface {
	NewKSubTopologyBuilder(kind SubTopologyKind) SubTopologyBuilder
	RemoveSubTopology(builder SubTopologyBuilder)
	SubTopologies() SubTopologyBuilders
	Build(ctx BuilderContext) (Topology, error)
	Describe() string
	Reset(ctx BuilderContext) error
}

type BuilderContext

type BuilderContext interface {
	StoreRegistry() stores.Registry
	ProducerBuilder() kafka.ProducerBuilder
	Admin() kafka.Admin
	ApplicationId() string
	Logger() log.Logger
	MetricsReporter() metrics.Reporter
}

func NewBuilderContext

func NewBuilderContext(
	appId string,
	registry stores.Registry,
	producer kafka.ProducerBuilder,
	kafkaAdmin kafka.Admin,
	logger log.Logger,
	metricsReporter metrics.Reporter,
) BuilderContext

type ChangeLogger

type ChangeLogger interface {
	Log(ctx context.Context, key, value []byte) error
}

type ChangelogSyncer

type ChangelogSyncer interface {
	Sync(ctx context.Context, synced chan struct{}) error
	Stop() error
}

type ChangelogSyncerBuilder

type ChangelogSyncerBuilder interface {
	// Setup setups the changelog by creating changelog topics and offset stores
	Setup(ctx SubTopologyBuilderContext) error
	Build(ctx SubTopologyContext, store stores.Store) (ChangelogSyncer, error)
	BuildLogger(ctx SubTopologyContext, store string) (ChangeLogger, error)
	Internal() bool
	Topic() string
}

type CloseableNode

type CloseableNode interface {
	Node
	// Close is called once the message processing stops
	// Please refer SubTopology.Close()
	Close() error
}

type DefaultNode

type DefaultNode struct {
	// contains filtered or unexported fields
}

func (*DefaultNode) AddEdge

func (n *DefaultNode) AddEdge(node Node)

func (*DefaultNode) Edges

func (n *DefaultNode) Edges() []Node

func (*DefaultNode) Err

func (n *DefaultNode) Err(message string) error

func (*DefaultNode) Forward

func (n *DefaultNode) Forward(ctx context.Context, kIn, vIn interface{}, cont bool) (interface{}, interface{}, bool, error)

func (*DefaultNode) ForwardAll

func (n *DefaultNode) ForwardAll(ctx context.Context, kvs []KeyValPair, cont bool) (kOut interface{}, vOut interface{}, next bool, err error)

func (*DefaultNode) Id

func (n *DefaultNode) Id() NodeId

func (*DefaultNode) Ignore

func (n *DefaultNode) Ignore() (interface{}, interface{}, bool, error)

func (*DefaultNode) IgnoreAndWrapErrWith

func (n *DefaultNode) IgnoreAndWrapErrWith(err error, message string) (interface{}, interface{}, bool, error)

func (*DefaultNode) IgnoreWithError

func (n *DefaultNode) IgnoreWithError(err error) (interface{}, interface{}, bool, error)

func (*DefaultNode) NameAs

func (n *DefaultNode) NameAs(name string)

func (*DefaultNode) NodeName

func (n *DefaultNode) NodeName() string

func (*DefaultNode) ReadsFrom

func (n *DefaultNode) ReadsFrom() []string

func (*DefaultNode) SetId

func (n *DefaultNode) SetId(id NodeId)

func (*DefaultNode) Setup

func (*DefaultNode) String

func (n *DefaultNode) String() string

func (*DefaultNode) WrapErr

func (n *DefaultNode) WrapErr(err error) error

func (*DefaultNode) WrapErrWith

func (n *DefaultNode) WrapErrWith(err error, message string) error

func (*DefaultNode) WritesAt

func (n *DefaultNode) WritesAt() []string

type Edge

type Edge struct {
	// contains filtered or unexported fields
}

func NewEdge

func NewEdge(parent, node NodeBuilder) Edge

func (Edge) Node

func (e Edge) Node() NodeBuilder

func (Edge) Parent

func (e Edge) Parent() NodeBuilder

type InitableNode

type InitableNode interface {
	Node
	// Init is called once the Node build is completed and before message processing starts.
	// Please refer SubTopology.Init()
	Init(ctx NodeContext) error
}

type KeyValPair

type KeyValPair struct {
	Key   interface{}
	Value interface{}
}

type LoggableStateStore

type LoggableStateStore interface {
	StateStore
	ChangeLogger
}

type LoggableStoreBuilder

type LoggableStoreBuilder interface {
	Name() string
	NameFormatter(ctx SubTopologyContext) StateStoreNameFunc
	KeyEncoder() encoding.Encoder
	ValEncoder() encoding.Encoder
	Build(ctx SubTopologyContext) (StateStore, error)
	Changelog() ChangelogSyncerBuilder
}

type Node

type Node interface {
	NodeInfo
	Run(ctx base.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
	AddEdge(node Node)
	Edges() []Node
}

type NodeBuilder

type NodeBuilder interface {
	NodeInfo
	SetId(id NodeId)
	// Setup is called once when stream app starting (SubTopologyBuilder.Setup())
	// Eg usage: create changelog topics for node
	Setup(ctx SubTopologySetupContext) error
	// Build calls with every Consumer PartitionAssignEvent and this method is responsible to create a new instance of
	// the node
	Build(ctx SubTopologyContext) (Node, error)
}

type NodeContext

type NodeContext interface {
	SubTopologyContext
	Store(name string) StateStore
}

func NewNodeContext

func NewNodeContext(parent SubTopologyContext, subTopology SubTopology) NodeContext

type NodeId

type NodeId struct {
	// contains filtered or unexported fields
}

func NewNodeId

func NewNodeId(id int, path, subTopologyID string) NodeId

func (NodeId) Id

func (id NodeId) Id() int

func (NodeId) String

func (id NodeId) String() string

func (NodeId) SubTopologyId

func (id NodeId) SubTopologyId() string

type NodeInfo

type NodeInfo interface {
	// Id returns a unique node id within the topology
	Id() NodeId
	// Type returns type of the node eg: Joiner, Flatmap
	Type() Type
	ReadsFrom() []string
	WritesAt() []string
	NameAs(name string)
	NodeName() string
}

type RecodeContext

type RecodeContext interface {
	context.Context
}

type Sink

type Sink interface {
	Node
	Encoder() SinkEncoder
	Topic() string

	// Close closes the source buffers
	Close() error
}

type SinkBuilder

type SinkBuilder interface {
	NodeBuilder
	Topic() string
	AutoCreate() bool
}

type SinkEncoder

type SinkEncoder struct {
	Key, Value encoding.Encoder
}

type Source

type Source interface {
	Node
	NodeBuilder
	Encoder() SourceEncoder
	Topic() string
	ShouldCoPartitionedWith(source Source)
	TopicConfigs() kafka.TopicConfig
	CoPartitionedWith() Source
	RePartitionedAs() Source
	AutoCreate() bool
	Internal() bool
	InitialOffset() kafka.Offset
}

type SourceEncoder

type SourceEncoder struct {
	Key, Value encoding.Encoder
}

type State

type State struct {
	Type  StateType
	Store LoggableStateStore
}

type StateBuilder

type StateBuilder struct {
	Type  StateType
	Store LoggableStateStore
}

type StateStore

type StateStore interface {
	stores.Store
	// Flush flashes the records in buffer to stores.Store
	Flush() error
	ResetCache()
	ChangelogSyncer
}

type StateStoreNameFunc

type StateStoreNameFunc func(store string) string

type StateStoreProvider

type StateStoreProvider interface {
	Store(ctx SubTopologyContext) LoggableStateStore
}

type StateType

type StateType string
const (
	StateReadOnly  StateType = `read_only`
	StateReadWrite StateType = `read_write`
)

type SubTopology

type SubTopology interface {
	Id() SubTopologyId
	Sinks() []Sink
	Source(topic string) Source
	Nodes() []Node
	Init(ctx SubTopologyContext) error
	Close() error
	Store(name string) StateStore
	StateStores() map[string]StateStore
}

type SubTopologyBuilder

type SubTopologyBuilder interface {
	Id() SubTopologyId
	SetId(SubTopologyId)
	AddNode(builder NodeBuilder)
	AddNodeWithEdge(node, edge NodeBuilder)
	Parent(node NodeBuilder) NodeBuilder
	NodeSource(node NodeBuilder) Source
	Setup(ctx SubTopologySetupContext) error
	Build(ctx SubTopologyContext) (SubTopology, error)
	AddEdge(parent, node NodeBuilder)
	RemoveNode(node NodeBuilder)
	RemoveAll()
	Edges() []Edge
	Sources() []Source
	MergeSubTopology(subTp SubTopologyBuilder)
	Nodes() []NodeBuilder
	AddStore(builder LoggableStoreBuilder)
	StateStores() map[string]LoggableStoreBuilder
	AddSource(source Source)
	Kind() SubTopologyKind
}

type SubTopologyBuilderContext

type SubTopologyBuilderContext interface {
	BuilderContext
	MaxPartitionCount() int32
}

type SubTopologyBuilders

type SubTopologyBuilders []SubTopologyBuilder

func (SubTopologyBuilders) SourceTopics

func (b SubTopologyBuilders) SourceTopics() []string

func (SubTopologyBuilders) SourceTopicsFor

func (b SubTopologyBuilders) SourceTopicsFor(kind SubTopologyKind) []string

func (SubTopologyBuilders) Topics

func (b SubTopologyBuilders) Topics() []string

type SubTopologyContext

type SubTopologyContext interface {
	BuilderContext
	context.Context
	PartitionConsumer() kafka.PartitionConsumer
	Partition() int32
	Producer() kafka.Producer
	Logger() log.Logger
	TopicMeta() map[string]*kafka.Topic
}

func NewSubTopologyContext

func NewSubTopologyContext(
	parent context.Context,
	partition int32,
	builderCtx BuilderContext,
	producer kafka.Producer,
	partitionConsumer kafka.PartitionConsumer,
	logger log.Logger,
	topicMeta map[string]*kafka.Topic,
) SubTopologyContext

type SubTopologyId

type SubTopologyId struct {
	// contains filtered or unexported fields
}

func NewSubTopologyId

func NewSubTopologyId(id int, name string) SubTopologyId

func (SubTopologyId) Id

func (id SubTopologyId) Id() int

func (SubTopologyId) Name

func (id SubTopologyId) Name() string

func (SubTopologyId) String

func (id SubTopologyId) String() string

type SubTopologyKind

type SubTopologyKind string
const (
	KindStream      SubTopologyKind = `stream`
	KindTable       SubTopologyKind = `table`
	KindGlobalTable SubTopologyKind = `global-table`
)

type SubTopologySetupContext

type SubTopologySetupContext interface {
	BuilderContext
	MaxPartitionCount() int32
	TopicMeta() map[string]*kafka.TopicConfig
}

func NewSubTopologySetupContext

func NewSubTopologySetupContext(
	builderCtx BuilderContext,
	topicMeta map[string]*kafka.TopicConfig,
	maxPartitions int32,
) SubTopologySetupContext

type Topology

type Topology interface {
	SubTopologies() []SubTopologyBuilder
	SubTopology(id SubTopologyId) SubTopologyBuilder
	SubTopologyByTopic(topic string) SubTopologyBuilder
	StreamTopologies() SubTopologyBuilders
	SourceByTopic(topic string) Source
	GlobalTableTopologies() SubTopologyBuilders
	Describe() string
}

type Type

type Type struct {
	Name  string
	Attrs map[string]string
}

type Visualizer

type Visualizer interface {
	AddTopology(node Builder)
	Visualize() (string, error)
}

func NewTopologyVisualizer

func NewTopologyVisualizer() Visualizer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL