scyllacdc

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2022 License: Apache-2.0 Imports: 17 Imported by: 4

README

scylla-cdc-go

Package scyllacdc is a library that helps develop applications that react to changes from Scylla's CDC.

It is recommended to get familiar with the Scylla CDC documentation first in order to understand the concepts used in the documentation of scyllacdc: https://docs.scylladb.com/using-scylla/cdc/

Documentation

For an explanation how to use the library, please look at the godoc documenation.

This repository also includes two example programs.

Documentation

Overview

Package scyllacdc is a library that helps develop applications that react to changes from Scylla's CDC.

It is recommended to get familiar with the Scylla CDC documentation first in order to understand the concepts used in the documentation of scyllacdc: https://docs.scylladb.com/using-scylla/cdc/

Overview

The library hides the complexity of reading from CDC log stemming from the need for polling for changes and handling topology changes. It reads changes from CDC logs of selected tables and propagates them to instances of ChangeConsumer - which is an interface that is meant to be implemented by the user.

Getting started

To start working with the library, you first need to implement your own logic for consuming changes. The simplest way to do it is to define a ChangeConsumerFunc which will be called for each change from the CDC log. For example:

func printerConsumer(ctx context.Context, tableName string, c scyllacdc.Change) error {
	fmt.Printf("[%s] %#v\n", tableName, c)
}

For any use case more complicated than above, you will need to define a ChangeConsumer and a ChangeConsumerFactory:

type myConsumer struct {
	id        int
	tableName string
}

func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error {
	fmt.Printf("[%d] [%s] %#v\n", mc.id, mc.tableName, change)
	return nil
}

func (mc *myConsumer) End() error {
	return nil
}

type myFactory struct {
	nextID int
}

func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (ChangeConsumer, error)
	f.nextID++
	return &myConsumer{
		id:        f.nextID-1,
		tableName: input.TableName,
	}, nil
}

Next, you need to create and run a scyllacdc.Reader object:

func main() {
	cluster := gocql.NewCluster("127.0.0.1")
	cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
	session, err := cluster.CreateSession()
	if err != nil {
		log.Fatal(err)
	}
	defer session.Close()

	cfg := &scyllacdc.ReaderConfig{
		Session:               session,
		TableNames:            []string{"my_keyspace.my_table"},
		ChangeConsumerFactory: scyllacdc.MakeChangeConsumerFactoryFromFunc(printerConsumer),
		// The above can be changed to:
		// ChangeConsumerFactory: &myFactory{},
	}

	reader, err := scyllacdc.NewReader(context.Background(), cfg)
	if err != nil {
		log.Fatal(err)
	}

	// React to Ctrl+C signal, and stop gracefully after the first signal
	// Second signal exits the process
	signalC := make(chan os.Signal)
	go func() {
		<-signalC
		reader.Stop()

		<-signalC
		os.Exit(1)
	}()
	signal.Notify(signalC, os.Interrupt)

	if err := reader.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Saving progress

The library supports saving progress and restoring from the last saved position. To enable it, you need to do two things:

First, you need to modify your consumer to regularly save progress. The consumer receives a *scyllacdc.ProgressReporter object which can be used to save progress at any point in the lifetime of the consumer.

The library itself doesn't regularly save progress - it only does it by itself when switching to the next CDC generation. Therefore, the consumer is responsible for saving the progress regularly.

Example:

type myConsumer struct {
	// PeriodicProgressReporter is a wrapper around ProgressReporter
	// which rate-limits saving the progress
	reporter *scyllacdc.PeriodicProgressReporter
}

func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error {
	// ... do work ...

	mc.reporter.Update(change.Time)
	return nil
}

func (mc *myConsumer) End() error {
	_ = mc.reporter.SaveAndStop(context.Background())
	return nil
}

type myFactory struct {
	session *gocql.Session
}

func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (ChangeConsumer, error)
	reporter := scyllacdc.NewPeriodicProgressReporter(f.session, time.Minute, input.ProgressReporter)
	reporter.Start(ctx)
	return &myConsumer{reporter: reporter}, nil
}

Then, you need to specify an appropriate ProgressManager in the configuration. ProgressManager represents a mechanism of saving and restoring progress. You can use the provided implementations (TableBackedProgressManager), or implement it yourself.

In the main function:

cfg.ProgressReporter = scyllacdc.NewTableBackedProgressManager("my_keyspace.progress_table", "my_application_name")

Processing changes

Data from the CDC log is supplied to the ChangeConsumer through Change objects, which can contain multiple ChangeRow objects. A single ChangeRow corresponds to a single, full (all columns included) row from the CDC log.

func (mc *myConsumer) Consume(ctx context.Background, change scyllacdc.Change) error {
	for _, changeRow := range change.Deltas {
		// You can access CDC columns directly via
		// GetValue, IsDeleted, GetDeletedElements
		rawValue, _ := changeRow.GetValue("col_int")
		intValue := rawValue.(*int)
		isDeleted, _ := changeRow.IsDeleted("col_int")
		if isDeleted {
			fmt.Println("Column col_int was set to null")
		} else if intValue != nil {
			fmt.Printf("Column col_int was set to %d\n", *intValue)
		}

		// You can also use convenience functions:
		// GetAtomicChange, GetListChange, GetUDTChange, etc.
		atomicChange := changeRow.GetAtomicChange("col_text")
		strValue := atomicChange.Value.(*string)
		if atomicChange.IsDeleted {
			fmt.Println("Column col_text was deleted")
		} else if strValue != nil {
			fmt.Printf("Column col_text was set to %s\n", *strValue)
		}
	}

	return nil
}

Index

Constants

View Source
const (
	PreImage                  OperationType = 0
	Update                                  = 1
	Insert                                  = 2
	RowDelete                               = 3
	PartitionDelete                         = 4
	RangeDeleteStartInclusive               = 5
	RangeDeleteStartExclusive               = 6
	RangeDeleteEndInclusive                 = 7
	RangeDeleteEndExclusive                 = 8
	PostImage                               = 9
)

Variables

View Source
var (
	ErrNoGenerationsPresent               = errors.New("there are no generations present")
	ErrNoSupportedGenerationTablesPresent = errors.New("no supported generation tables are present")
)

Functions

This section is empty.

Types

type AdvancedReaderConfig

type AdvancedReaderConfig struct {
	// ConfidenceWindowSize defines a minimal age a change must have in order
	// to be read.
	//
	// Due to the eventually consistent nature of Scylla, newer writes may
	// appear in CDC log earlier than some older writes. This can cause the
	// Reader to skip the older write, therefore the need for this parameter.
	//
	// If the parameter is left as 0, the library will automatically choose
	// a default confidence window size.
	ConfidenceWindowSize time.Duration

	// The library uses select statements to fetch changes from CDC Log tables.
	// Each select fetches changes from a single table and fetches only changes
	// from a limited set of CDC streams. If such select returns one or more
	// changes then next select to this table and set of CDC streams will be
	// issued after a delay. This parameter specifies the length of the delay.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the length of the delay.
	PostNonEmptyQueryDelay time.Duration

	// The library uses select statements to fetch changes from CDC Log tables.
	// Each select fetches changes from a single table and fetches only changes
	// from a limited set of CDC streams. If such select returns no changes then
	// next select to this table and set of CDC streams will be issued after
	// a delay. This parameter specifies the length of the delay.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the length of the delay.
	PostEmptyQueryDelay time.Duration

	// If the library tries to read from the CDC log and the read operation
	// fails, it will wait some time before attempting to read again. This
	// parameter specifies the length of the delay.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the length of the delay.
	PostFailedQueryDelay time.Duration

	// Changes are queried using select statements with restriction on the time
	// those changes appeared. The restriction is bounding the time from both
	// lower and upper bounds. This parameter defines the width of the time
	// window used for the restriction.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the size of the restriction window.
	QueryTimeWindowSize time.Duration

	// When the library starts for the first time it has to start consuming
	// changes from some point in time. This parameter defines how far in the
	// past it needs to look. If the value of the parameter is set to an hour,
	// then the library will only read historical changes that are no older than
	// an hour.
	//
	// Note of caution: data in CDC Log table is automatically deleted so
	// setting this parameter to something bigger than TTL used on CDC Log won’t
	// cause changes older than this TTL to appear.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the size of the restriction window.
	ChangeAgeLimit time.Duration
}

AdvancedReaderConfig contains advanced parameters that control behavior of the CDC Reader. It is not recommended to change them unless really necessary. They have carefully selected default values that should work for most cases. Changing these parameters need to be done carefully.

type AtomicChange

type AtomicChange struct {
	// Value contains the scalar value of the column.
	// If the column was not changed or was deleted, it will be nil.
	//
	// Type: T.
	Value interface{}

	// IsDeleted tells if this column was set to NULL by this change.
	IsDeleted bool
}

AtomicChange represents a change to a column of an atomic or a frozen type.

type Change

type Change struct {
	// Corresponds to cdc$stream_id.
	StreamID StreamID

	// Corresponds to cdc$time.
	Time gocql.UUID

	// PreImage rows of the group.
	PreImage []*ChangeRow

	// Delta rows of the group.
	Delta []*ChangeRow

	// PostImage rows of the group.
	PostImage []*ChangeRow
}

Change represents a group of rows from CDC log with the same cdc$stream_id and cdc$time timestamp.

func (*Change) GetCassandraTimestamp

func (c *Change) GetCassandraTimestamp() int64

GetCassandraTimestamp returns a timestamp of the operation suitable to put as a TIMESTAMP parameter to a DML statement (INSERT, UPDATE, DELETE).

type ChangeConsumer

type ChangeConsumer interface {
	// Processes a change from the CDC log associated with the stream of
	// the ChangeConsumer. This method is called in a sequential manner for each
	// row that appears in the stream.
	//
	// If this method returns an error, the library will stop with an error.
	Consume(ctx context.Context, change Change) error

	// Called after all rows from the stream were consumed, and the reader
	// is about to switch to a new generation, or stop execution altogether.
	//
	// If this method returns an error, the library will stop with an error.
	End() error
}

ChangeConsumer processes changes from a single stream of the CDC log.

type ChangeConsumerFactory

type ChangeConsumerFactory interface {
	// Creates a change consumer with given parameters.
	//
	// If this method returns an error, the library will stop with an error.
	CreateChangeConsumer(ctx context.Context, input CreateChangeConsumerInput) (ChangeConsumer, error)
}

ChangeConsumerFactory is used by the library to instantiate ChangeConsumer objects when the new generation starts.

func MakeChangeConsumerFactoryFromFunc

func MakeChangeConsumerFactoryFromFunc(f ChangeConsumerFunc) ChangeConsumerFactory

MakeChangeConsumerFactoryFromFunc can be used if your processing is very simple, and don't need to keep any per-stream state or save any progress. The function supplied as an argument will be shared by all consumers created by this factory, and will be called for each change in the CDC log.

Please note that the consumers created by this factory do not perform any synchronization on their own when calling supplied function, therefore you need to guarantee that calling `f` is thread safe.

type ChangeConsumerFunc

type ChangeConsumerFunc func(ctx context.Context, tableName string, change Change) error

ChangeConsumerFunc can be used in conjunction with MakeChangeConsumerFactoryFromFunc if your processing is very simple. For more information, see the description of the MakeChangeConsumerFactoryFromFunc function.

type ChangeOrEmptyNotificationConsumer added in v1.1.0

type ChangeOrEmptyNotificationConsumer interface {
	ChangeConsumer

	// Invoked upon empty results from the CDC log associated with the stream of
	// the ChangeConsumer. This method is called to acknowledge a query window
	// has been executed against the stream and the CDC log is to be considered
	// completed as of 'ackTime' param passed.
	//
	// If this method returns an error, the library will stop with an error.
	Empty(ctx context.Context, ackTime gocql.UUID) error
}

ChangeOrEmptyNotificationConsumer is an extension to the ChangeConsumer interface.

type ChangeRow

type ChangeRow struct {
	// contains filtered or unexported fields
}

ChangeRow corresponds to a single row from the CDC log.

The ChangeRow uses a slightly different representation of values than gocql's MapScan in order to faithfully represent nullability of all values:

Scalar types such as int, text etc. are represented by a pointer to their counterpart in gocql (in this case, *int and *string). The only exception is the blob, which is encoded as []byte slice - if the column was nil, then it will contain a nil slice, if the column was not nil but just empty, then the resulting slice will be empty, but not nil.

Tuple types are always represented as an []interface{} slice of values in this representation (e.g. tuple<int, text> will contain an *int and a *string). If the tuple itself was null, then it will be represented as a nil []interface{} slice.

Lists and sets are represented as slices of the corresponding type. Because lists and sets cannot contain nils, if a value was to be represented as a pointer, it will be represented as a value instead. For example, list<int> becomes []int, but list<frozen<tuple<int, text>> becomes [][]interface{} because the tuple type cannot be flattened.

Maps are represented as map[K]V, where K and V are in the "flattened" form as lists and sets.

UDTs are represented as map[string]interface{}, with values fields being represented as described here. For example, a UDT with fields (a int, b text) will be represented as a map with two values of types (*int) and (*string).

For a comprehensive guide on how to interpret data in the CDC log, see Scylla documentation about CDC.

func (*ChangeRow) Columns

func (c *ChangeRow) Columns() []gocql.ColumnInfo

Columns returns information about data columns in the cdc log table. It contains information about all columns - both with and without cdc$ prefix.

func (*ChangeRow) GetAtomicChange

func (c *ChangeRow) GetAtomicChange(column string) AtomicChange

GetAtomicChange returns a ScalarChange struct for a given column. Results are undefined if the column in the base table was not an atomic type.

func (*ChangeRow) GetDeletedElements

func (c *ChangeRow) GetDeletedElements(columnName string) (interface{}, bool)

GetDeletedElements returns which elements were deleted from the non-atomic column. This function works only for non-atomic columns

func (*ChangeRow) GetListChange

func (c *ChangeRow) GetListChange(column string) ListChange

GetListChange returns a ListChange struct for a given column. Results are undefined if the column in the base table was not a list.

func (*ChangeRow) GetMapChange

func (c *ChangeRow) GetMapChange(column string) MapChange

GetMapChange returns a MapChange struct for a given column. Results are undefined if the column in the base table was not a map.

func (*ChangeRow) GetOperation

func (c *ChangeRow) GetOperation() OperationType

GetOperation returns the type of operation this change represents.

func (*ChangeRow) GetSetChange

func (c *ChangeRow) GetSetChange(column string) SetChange

GetSetChange returns a SetChange struct for a given column. Results are undefined if the column in the base table was not a set.

func (*ChangeRow) GetTTL

func (c *ChangeRow) GetTTL() int64

GetTTL returns TTL for the operation, or 0 if no TTL was used.

func (*ChangeRow) GetType

func (c *ChangeRow) GetType(columnName string) (gocql.TypeInfo, bool)

GetType returns gocql's representation of given column type.

func (*ChangeRow) GetUDTChange

func (c *ChangeRow) GetUDTChange(column string) UDTChange

GetUDTChange returns a UDTChange struct for a given column. Results are undefined if the column in the base table was not a UDT.

func (*ChangeRow) GetValue

func (c *ChangeRow) GetValue(columnName string) (interface{}, bool)

GetValue returns value that was assigned to this specific column.

func (*ChangeRow) IsDeleted

func (c *ChangeRow) IsDeleted(columnName string) (bool, bool)

IsDeleted returns a boolean indicating if given column was set to null. This only works for clustering columns.

func (*ChangeRow) String

func (c *ChangeRow) String() string

String is needed to implement the fmt.Stringer interface.

type CreateChangeConsumerInput

type CreateChangeConsumerInput struct {
	// Name of the table from which the new ChangeConsumer will receive changes.
	TableName string

	// ID of the stream from which the new ChangeConsumer will receive changes.
	StreamID StreamID

	ProgressReporter *ProgressReporter
}

CreateChangeConsumerInput represents input to the CreateChangeConsumer function.

type ListChange

type ListChange struct {
	// AppendedElements contains values appended to the list in the form
	// of map from cell timestamps to values.
	//
	// For more information about how to interpret it, see "Advanced column"
	// types" in the CDC documentation.
	//
	// Type: map[gocql.UUID]T
	AppendedElements interface{}

	// RemovedElements contains indices of the removed elements.
	//
	// For more information about how to interpret it, see "Advanced column"
	// types" in the CDC documentation.
	//
	// Type: []gocql.UUID
	RemovedElements []gocql.UUID

	// IsReset tells if the list value was overwritten instead of being
	// appended to or removed from. If it's true, than AppendedValue will
	// contain the new state of the list (which can be NULL).
	IsReset bool
}

ListChange represents a change to a column of a type list<T>.

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

type MapChange

type MapChange struct {
	// AddedElements contains a map of elements which were added to the map
	// by the operation.
	//
	// Type: map[K]V.
	AddedElements interface{}

	// RemovedElements contains a slice of keys which were removed from the map
	// by the operation.
	// Please note that if the operation overwrote the old value of the map
	// instead of adding/removing elements, this field _will be nil_.
	// Instead, IsReset field will be set, and AddedValues will contain
	// the new state of the map.
	//
	// Type: []K
	RemovedElements interface{}

	// IsReset tells if the map value was overwritten instead of being
	// appended to or removed from. If it's true, than AddedElements will
	// contain the new state of the map (which can be NULL).
	IsReset bool
}

MapChange represents a change to a column of type map<K, V>.

type OperationType

type OperationType int8

OperationType corresponds to the cdc$operation column in CDC log, and describes the type of the operation given row represents.

For a comprehensive explanation of what each operation type means, see Scylla documentation about CDC.

func (OperationType) String

func (ot OperationType) String() string

String is needed to implement the fmt.Stringer interface.

type PeriodicProgressReporter

type PeriodicProgressReporter struct {
	// contains filtered or unexported fields
}

PeriodicProgressReporter is a wrapper around ProgressReporter which can be used to save progress in regular periods of time.

func NewPeriodicProgressReporter

func NewPeriodicProgressReporter(logger Logger, interval time.Duration, reporter *ProgressReporter) *PeriodicProgressReporter

NewPeriodicProgressReporter creates a new PeriodicProgressReporter with given report interval.

func (*PeriodicProgressReporter) SaveAndStop

func (ppr *PeriodicProgressReporter) SaveAndStop(ctx context.Context) error

SaveAndStop stops inner goroutine, waits until it finishes, and then saves the most recent progress.

func (*PeriodicProgressReporter) Start

func (ppr *PeriodicProgressReporter) Start(ctx context.Context)

Start spawns an internal goroutine and starts the progress reporting loop.

func (*PeriodicProgressReporter) Stop

func (ppr *PeriodicProgressReporter) Stop()

Stop stops inner goroutine and waits until it finishes.

func (*PeriodicProgressReporter) Update

func (ppr *PeriodicProgressReporter) Update(newTime gocql.UUID)

Update tells the PeriodicProgressReporter that a row has been processed.

type Progress

type Progress struct {
	// LastProcessedRecordTime represents the value of the cdc$time column
	// of the last processed record in the stream.
	LastProcessedRecordTime gocql.UUID
}

Progress represents the point up to which the library has processed changes in a given stream.

type ProgressManager

type ProgressManager interface {
	// GetCurrentGeneration returns the time of the generation that was
	// last saved by StartGeneration. The library will call this function
	// at the beginning in order to determine from which generation it should
	// start reading first.
	//
	// If there is no information available about the time of the generation
	// from which reading should start, GetCurrentGeneration can return
	// a zero time value. In that case, reading will start from the point
	// determined by AdvancedReaderConfig.ChangeAgeLimit.
	//
	// If this function returns an error, the library will stop with an error.
	GetCurrentGeneration(ctx context.Context) (time.Time, error)

	// StartGeneration is called after all changes have been read from the
	// previous generation and the library is about to start processing
	// the next one. The ProgressManager should save this information so that
	// GetCurrentGeneration will return it after the library is restarted.
	//
	// If this function returns an error, the library will stop with an error.
	StartGeneration(ctx context.Context, gen time.Time) error

	// GetProgress retrieves information about the progress of given stream,
	// in a given table. If there was no progress saved for this stream
	// during this generation, GetProgress can return a zero time value
	// and the library will start processing changes from the stream
	// starting from the beginning of the generation.
	//
	// This method needs to be thread-safe, as the library is allowed to
	// call it concurrently for different combinations of `table` and `streamID`.
	// The library won't issue concurrent calls to this method with the same
	// `table` and `streamID` parameters.
	//
	// If this function returns an error, the library will stop with an error.
	GetProgress(ctx context.Context, gen time.Time, table string, streamID StreamID) (Progress, error)

	// SaveProgress stores information about the last cdc log record which was
	// processed successfully. If the reader is restarted, it should resume
	// work for this stream starting from the row _after_ the last saved
	// timestamp.
	//
	// This method is only called by ChangeConsumers, indirectly through
	// the ProgressReporter struct. Within a generation, ChangeConsumers
	// are run concurrently, therefore SaveProgress should be safe to call
	// concurrently.
	//
	// Contrary to other methods, an error returned does not immediately
	// result in the library stopping with an error. The error is propagated
	// to the ChangeConsumer, and it can decide what to do with the error next.
	SaveProgress(ctx context.Context, gen time.Time, table string, streamID StreamID, progress Progress) error
}

ProgressManager allows the library to load and save progress for each stream and table separately.

type ProgressManagerWithStartTime added in v1.1.0

type ProgressManagerWithStartTime interface {
	ProgressManager

	// GetApplicationReadStartTime returns the timestamp from which
	// the application started reading data. The library uses this timestamp
	// as a lower bound to determine where it should start reading. For example,
	// if there is no generation saved or there is no progress information
	// saved for a stream, reading will be restarted from the given timestamp
	// (or higher if the generation timestamp is higher).
	//
	// If this function returns a zero timeuuid, the library will start reading
	// from `time.Now() - AdvancedReaderConfig.ChangeAgeLimit`.
	// If this function returns an error, the library will stop with an error.
	GetApplicationReadStartTime(ctx context.Context) (time.Time, error)

	// SaveApplicationReadStartTime stores information about the timestamp
	// from which the application originally started reading data.
	// It is called by the library if there was no start timestamp saved.
	//
	// If this function returns an error, the library will stop with an error.
	SaveApplicationReadStartTime(ctx context.Context, startTime time.Time) error
}

ProgressManagerWithStartTime is an extension to the ProgressManager interface.

type ProgressReporter

type ProgressReporter struct {
	// contains filtered or unexported fields
}

ProgressReporter is a helper object for the ChangeConsumer. It allows the consumer to save its progress.

func (*ProgressReporter) MarkProgress

func (pr *ProgressReporter) MarkProgress(ctx context.Context, progress Progress) error

MarkProgress saves progress for the consumer associated with the ProgressReporter.

The associated ChangeConsumer is allowed to call it anytime between its creation by ChangeConsumerFactory and the moment it is stopped (the call to (ChangeConsumer).End() finishes).

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads changes from CDC logs of the specified tables.

func NewReader

func NewReader(ctx context.Context, config *ReaderConfig) (*Reader, error)

NewReader creates a new CDC reader using the specified configuration.

func (*Reader) Run

func (r *Reader) Run(ctx context.Context) error

Run runs the CDC reader. This call is blocking and returns after an error occurs, or the reader is stopped gracefully.

func (*Reader) Stop

func (r *Reader) Stop()

Stop tells the reader to stop as soon as possible. There is no guarantee related to how much data will be processed in each stream when the reader stops. If you want to e.g. make sure that all cdc log data with timestamps up to the current moment was processed, use (*Reader).StopAt(time.Now()). This function does not wait until the reader stops.

func (*Reader) StopAt

func (r *Reader) StopAt(at time.Time)

StopAt tells the reader to stop reading changes after reaching given timestamp. Does not guarantee that the reader won't read any changes after the timestamp, but the reader will stop after all tables and streams are advanced to or past the timestamp. This function does not wait until the reader stops.

type ReaderConfig

type ReaderConfig struct {
	// An active gocql session to the cluster.
	Session *gocql.Session

	// Names of the tables for which to read changes. This should be the name
	// of the base table, not the cdc log table.
	// Can be prefixed with keyspace name.
	TableNames []string

	// Consistency to use when querying CDC log.
	// If not specified, QUORUM consistency will be used.
	Consistency gocql.Consistency

	// Creates ChangeProcessors, which process information fetched from the CDC log.
	// A callback which processes information fetched from the CDC log.
	ChangeConsumerFactory ChangeConsumerFactory

	// An object which allows the reader to read and write information about
	// current progress.
	ProgressManager ProgressManager

	// A logger. If set, it will receive log messages useful for debugging of the library.
	Logger Logger

	// Advanced parameters.
	Advanced AdvancedReaderConfig
}

ReaderConfig defines parameters used for creation of the CDC Reader object.

func (*ReaderConfig) Copy

func (rc *ReaderConfig) Copy() *ReaderConfig

Copy makes a shallow copy of the ReaderConfig.

type SetChange

type SetChange struct {
	// AddedElements contains a slice of values which were added to the set
	// by the operation. If there were any values added, it will contain
	// a slice of form []T, where T is gocql's representation of the element
	// type.
	//
	// Type: []T
	AddedElements interface{}

	// RemovedElements contains a slice of values which were removed from the set
	// by the operation. Like AddedValues, it's either a slice or a nil
	// interface.
	//
	// Please note that if the operation overwrote the old value of the set
	// instead of adding/removing elements, this field _will be nil_.
	// Instead, IsReset field will be set, and AddedValues will contain
	// the new state of the set.
	//
	// Type: []T
	RemovedElements interface{}

	// IsReset tells if the set value was overwritten instead of being
	// appended to or removed from. If it's true, than AddedElements will
	// contain the new state of the set (which can be NULL).
	IsReset bool
}

SetChange represents a change to a column of type set<T>.

type StreamID

type StreamID []byte

StreamID represents an ID of a stream from a CDC log (cdc$time column).

func (StreamID) String

func (sid StreamID) String() string

String is needed to implement the fmt.Stringer interface.

type TableBackedProgressManager

type TableBackedProgressManager struct {
	// contains filtered or unexported fields
}

TableBackedProgressManager is a ProgressManager which saves progress in a Scylla table.

The schema is as follows:

CREATE TABLE IF NOT EXISTS <table name> (
    generation timestamp,
    application_name text,
    table_name text,
    stream_id blob,
    last_timestamp timeuuid,
    current_generation timestamp,
    PRIMARY KEY ((generation, application_name, table_name, stream_id))
)

Progress for each stream is stored in a separate row, indexed by generation, application_name, table_name and stream_id.

For storing information about current generation, special rows with stream set to empty bytes is used.

func NewTableBackedProgressManager

func NewTableBackedProgressManager(session *gocql.Session, progressTableName string, applicationName string) (*TableBackedProgressManager, error)

NewTableBackedProgressManager creates a new TableBackedProgressManager.

func (*TableBackedProgressManager) GetApplicationReadStartTime added in v1.1.0

func (tbpm *TableBackedProgressManager) GetApplicationReadStartTime(ctx context.Context) (time.Time, error)

GetApplicationReadStartTime is needed to implement the ProgressManagerWithStartTime interface.

func (*TableBackedProgressManager) GetCurrentGeneration

func (tbpm *TableBackedProgressManager) GetCurrentGeneration(ctx context.Context) (time.Time, error)

GetCurrentGeneration is needed to implement the ProgressManager interface.

func (*TableBackedProgressManager) GetProgress

func (tbpm *TableBackedProgressManager) GetProgress(ctx context.Context, gen time.Time, tableName string, streamID StreamID) (Progress, error)

GetProgress is needed to implement the ProgressManager interface.

func (*TableBackedProgressManager) SaveApplicationReadStartTime added in v1.1.0

func (tbpm *TableBackedProgressManager) SaveApplicationReadStartTime(ctx context.Context, startTime time.Time) error

SaveApplicationReadStartTime is needed to implement the ProgressManagerWithStartTime interface.

func (*TableBackedProgressManager) SaveProgress

func (tbpm *TableBackedProgressManager) SaveProgress(ctx context.Context, gen time.Time, tableName string, streamID StreamID, progress Progress) error

SaveProgress is needed to implement the ProgressManager interface.

func (*TableBackedProgressManager) SetMaxConcurrency

func (tbpm *TableBackedProgressManager) SetMaxConcurrency(maxConcurrentOps int64)

SetMaxConcurrency sets the maximum allowed concurrency for write operations. By default, it's 100. This function must not be called after Reader for this manager is started.

func (*TableBackedProgressManager) SetTTL

func (tbpm *TableBackedProgressManager) SetTTL(ttl int32)

SetTTL sets the TTL used to expire progress. By default, it's 7 days.

func (*TableBackedProgressManager) StartGeneration

func (tbpm *TableBackedProgressManager) StartGeneration(ctx context.Context, gen time.Time) error

StartGeneration is needed to implement the ProgressManager interface.

type UDTChange

type UDTChange struct {
	// AddedFields contains a map of fields. Non-null value of a field
	// indicate that the field was written to, otherwise it was not written.
	AddedFields map[string]interface{}

	// RemovedFields contains names of fields which were set to null
	// by this operation.
	RemovedFields []string

	// RemovedFieldsIndices contains indices of tields which were set to null
	// by this operation.
	RemovedFieldsIndices []int16

	// IsReset tells if the UDT was overwritten instead of only some fields
	// being overwritten. If this flag is true, then nil fields in AddedFields
	// will mean that those fields should be set to null.
	IsReset bool
}

UDTChange represents a change to a column of a UDT type.

Directories

Path Synopsis
examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL