spream

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2023 License: MIT Imports: 8 Imported by: 0

README

spream

Test Go Reference

Cloud Spanner Change Streams Subscriber for Go

Sypnosis

This library is an implementation to subscribe a change stream's records of Google Cloud Spanner in Go. It is heavily inspired by the SpannerIO connector of the Apache Beam SDK and is compatible with the PartitionMetadata data model.

Motivation

To read a change streams, Google Cloud offers Dataflow connector as a scalable and reliable solution, but in some cases the abstraction and capabilities of Dataflow pipelines can be too much (or is simply too expensive). For more flexibility, use the change stream API directly, but it is a bit complex. This library aims to make reading change streams more flexible and casual, while maintaining an easily transition to the use of Dataflow connectors as needed.

Example Usage

package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"os"
	"os/signal"
	"sync"

	"cloud.google.com/go/spanner"
	"github.com/toga4/spream"
	"github.com/toga4/spream/partitionstorage"
)

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer stop()

	database := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "foo-project", "foo-instance", "foo-database")
	spannerClient, err := spanner.NewClient(ctx, database)
	if err != nil {
		panic(err)
	}
	defer spannerClient.Close()

	partitionMetadataTableName := "PartitionMetadata_FooStream"
	partitionStorage := partitionstorage.NewSpanner(spannerClient, partitionMetadataTableName)
	if err := partitionStorage.CreateTableIfNotExists(ctx); err != nil {
		panic(err)
	}

	changeStreamName := "FooStream"
	subscriber := spream.NewSubscriber(spannerClient, changeStreamName, partitionStorage)

	fmt.Fprintf(os.Stderr, "Reading the stream...\n")
	logger := &Logger{out: os.Stdout}
	if err := subscriber.Subscribe(ctx, logger); err != nil && !errors.Is(ctx.Err(), context.Canceled) {
		panic(err)
	}
}

type Logger struct {
	out io.Writer
	mu  sync.Mutex
}

func (l *Logger) Consume(change *spream.DataChangeRecord) error {
	l.mu.Lock()
	defer l.mu.Unlock()
	return json.NewEncoder(l.out).Encode(change)
}

CLI

Use the CLI as a tool for tracking change streams or as a more detailed implementation example.

Installation
$ go install github.com/toga4/spream/cmd/spream@latest
Usage
Usage: spream [OPTIONS...]

Options:
  -d, --database (required)     Database name of change stream with the form 'projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID'.
  -s, --stream   (required)     Change stream name
  -t, --metadata-table          Table name for partition metadata             (default: store partition metadata on memory, not Cloud Spanner)
  --start                       Start timestamp with RFC3339 format           (default: current timestamp)
  --end                         End timestamp with RFC3339 format             (default: indefinite)
  --heartbeat-interval          Heartbeat interval with time.Duration format  (default: 10s)
  --priority [high|medium|low]  Request priority for Cloud Spanner            (default: high)
  --metadata-database           Database name of partition metadata table     (default: same as database option)
  -h, --help                    Print this message
Example
$ spream -d projects/my-project/instances/my-instance/databases/my-database -s SingerStream
Waiting changes...
{"commit_timestamp":"2023-01-08T05:47:57.998479Z","record_sequence":"00000000","server_transaction_id":"ODIzNDU0OTc2NzUxOTc0NTU1OQ==","is_last_record_in_transaction_in_partition":true,"table_name":"Singers","column_types":[{"name":"SingerId","type":{"code":"INT64"},"is_primary_key":true,"ordinal_position":1},{"name":"Name","type":{"code":"STRING"},"ordinal_position":2}],"mods":[{"keys":{"SingerId":"1"},"new_values":{"Name":"foo"}}],"mod_type":"INSERT","value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"","is_system_transaction":false}
{"commit_timestamp":"2023-01-08T05:47:58.766575Z","record_sequence":"00000000","server_transaction_id":"MjQ3ODQzMDcxOTMwNjcyODg4Nw==","is_last_record_in_transaction_in_partition":true,"table_name":"Singers","column_types":[{"name":"SingerId","type":{"code":"INT64"},"is_primary_key":true,"ordinal_position":1},{"name":"Name","type":{"code":"STRING"},"ordinal_position":2}],"mods":[{"keys":{"SingerId":"1"},"new_values":{"Name":"bar"},"old_values":{"Name":"foo"}}],"mod_type":"UPDATE","value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"","is_system_transaction":false}
{"commit_timestamp":"2023-01-08T05:47:59.117807Z","record_sequence":"00000000","server_transaction_id":"ODkwNDMzNDgxMDU2NzAwMDM2MA==","is_last_record_in_transaction_in_partition":true,"table_name":"Singers","column_types":[{"name":"SingerId","type":{"code":"INT64"},"is_primary_key":true,"ordinal_position":1},{"name":"Name","type":{"code":"STRING"},"ordinal_position":2}],"mods":[{"keys":{"SingerId":"1"},"old_values":{"Name":"bar"}}],"mod_type":"DELETE","value_capture_type":"OLD_AND_NEW_VALUES","number_of_records_in_transaction":1,"number_of_partitions_in_transaction":1,"transaction_tag":"","is_system_transaction":false}

Credits

Heavily inspired by below projects.

Documentation

Index

Examples

Constants

View Source
const (
	ModType_INSERT = "INSERT"
	ModType_UPDATE = "UPDATE"
	ModType_DELETE = "DELETE"
)
View Source
const (
	RootPartitionToken = "Parent0"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ChildPartition

type ChildPartition struct {
	Token                 string   `spanner:"token" json:"token"`
	ParentPartitionTokens []string `spanner:"parent_partition_tokens" json:"parent_partition_tokens"`
}

type ChildPartitionsRecord

type ChildPartitionsRecord struct {
	StartTimestamp  time.Time         `spanner:"start_timestamp" json:"start_timestamp"`
	RecordSequence  string            `spanner:"record_sequence" json:"record_sequence"`
	ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"`
}

type ColumnType

type ColumnType struct {
	Name            string `json:"name"`
	Type            Type   `json:"type"`
	IsPrimaryKey    bool   `json:"is_primary_key,omitempty"`
	OrdinalPosition int64  `json:"ordinal_position"`
}

ColumnType is the metadata of the column.

type Consumer added in v1.0.0

type Consumer interface {
	Consume(change *DataChangeRecord) error
}

Consumer is the interface to consume the DataChangeRecord.

Consume might be called from multiple goroutines and must be re-entrant safe.

type ConsumerFunc added in v1.0.0

type ConsumerFunc func(*DataChangeRecord) error

ConsumerFunc type is an adapter to allow the use of ordinary functions as Consumer.

func (ConsumerFunc) Consume added in v1.0.0

func (f ConsumerFunc) Consume(change *DataChangeRecord) error

Consume calls f(change).

type DataChangeRecord

type DataChangeRecord struct {
	CommitTimestamp                      time.Time     `json:"commit_timestamp"`
	RecordSequence                       string        `json:"record_sequence"`
	ServerTransactionID                  string        `json:"server_transaction_id"`
	IsLastRecordInTransactionInPartition bool          `json:"is_last_record_in_transaction_in_partition"`
	TableName                            string        `json:"table_name"`
	ColumnTypes                          []*ColumnType `json:"column_types"`
	Mods                                 []*Mod        `json:"mods"`
	ModType                              ModType       `json:"mod_type"`
	ValueCaptureType                     string        `json:"value_capture_type"`
	NumberOfRecordsInTransaction         int64         `json:"number_of_records_in_transaction"`
	NumberOfPartitionsInTransaction      int64         `json:"number_of_partitions_in_transaction"`
	TransactionTag                       string        `json:"transaction_tag"`
	IsSystemTransaction                  bool          `json:"is_system_transaction"`
}

DataChangeRecord is the change set of the table.

type HeartbeatRecord

type HeartbeatRecord struct {
	Timestamp time.Time `spanner:"timestamp" json:"timestamp"`
}

type Mod

type Mod struct {
	Keys      map[string]interface{} `json:"keys,omitempty"`
	NewValues map[string]interface{} `json:"new_values,omitempty"`
	OldValues map[string]interface{} `json:"old_values,omitempty"`
}

Mod contains the keys and the values of the changed records.

type ModType

type ModType string

type Option

type Option interface {
	Apply(*config)
}

func WithEndTimestamp

func WithEndTimestamp(endTimestamp time.Time) Option

WithEndTimestamp set the end timestamp option for read change streams.

The value must be within the retention period of the change stream and must be after the start timestamp. If not set, read latest changes until canceled.

func WithHeartbeatInterval added in v1.0.0

func WithHeartbeatInterval(heartbeatInterval time.Duration) Option

WithHeartbeatInterval set the heartbeat interval for read change streams.

Default value is 10 seconds.

func WithSpannerRequestPriotiry added in v1.0.0

func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) Option

WithSpannerRequestPriotiry set the request priority option for read change streams.

Default value is unspecified, equivalent to high.

func WithStartTimestamp added in v1.0.0

func WithStartTimestamp(startTimestamp time.Time) Option

WithStartTimestamp set the start timestamp option for read change streams.

The value must be within the retention period of the change stream and before the current time. Default value is current timestamp.

type PartitionMetadata added in v1.0.0

type PartitionMetadata struct {
	PartitionToken  string     `spanner:"PartitionToken" json:"partition_token"`
	ParentTokens    []string   `spanner:"ParentTokens" json:"parent_tokens"`
	StartTimestamp  time.Time  `spanner:"StartTimestamp" json:"start_timestamp"`
	EndTimestamp    time.Time  `spanner:"EndTimestamp" json:"end_timestamp"`
	HeartbeatMillis int64      `spanner:"HeartbeatMillis" json:"heartbeat_millis"`
	State           State      `spanner:"State" json:"state"`
	Watermark       time.Time  `spanner:"Watermark" json:"watermark"`
	CreatedAt       time.Time  `spanner:"CreatedAt" json:"created_at"`
	ScheduledAt     *time.Time `spanner:"ScheduledAt" json:"scheduled_at,omitempty"`
	RunningAt       *time.Time `spanner:"RunningAt" json:"running_at,omitempty"`
	FinishedAt      *time.Time `spanner:"FinishedAt" json:"finished_at,omitempty"`
}

PartitionMetadata contains partition tokens and timestamps that have already been read from the stream partition.

func (*PartitionMetadata) IsRootPartition added in v1.0.0

func (p *PartitionMetadata) IsRootPartition() bool

IsRootPartition returns true if this is root partition.

type PartitionStorage added in v1.0.0

type PartitionStorage interface {
	GetUnfinishedMinWatermarkPartition(ctx context.Context) (*PartitionMetadata, error)
	GetInterruptedPartitions(ctx context.Context) ([]*PartitionMetadata, error)
	InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error
	GetSchedulablePartitions(ctx context.Context, minWatermark time.Time) ([]*PartitionMetadata, error)
	AddChildPartitions(ctx context.Context, parentPartition *PartitionMetadata, childPartitionsRecord *ChildPartitionsRecord) error
	UpdateToScheduled(ctx context.Context, partitions []*PartitionMetadata) error
	UpdateToRunning(ctx context.Context, partition *PartitionMetadata) error
	UpdateToFinished(ctx context.Context, partition *PartitionMetadata) error
	UpdateWatermark(ctx context.Context, partition *PartitionMetadata, watermark time.Time) error
}

PartitionStorage is an interface for storing and reading PartitionMetadata.

type State added in v1.0.0

type State string
const (
	StateCreated   State = "CREATED"
	StateScheduled State = "SCHEDULED"
	StateRunning   State = "RUNNING"
	StateFinished  State = "FINISHED"
)

type Subscriber added in v1.0.0

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber subscribes change stream.

func NewSubscriber added in v1.0.0

func NewSubscriber(
	client *spanner.Client,
	streamName string,
	partitionStorage PartitionStorage,
	options ...Option,
) *Subscriber

NewSubscriber creates a new subscriber of change streams.

Example
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"os"
	"os/signal"
	"sync"

	"cloud.google.com/go/spanner"
	"github.com/toga4/spream"
	"github.com/toga4/spream/partitionstorage"
)

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer stop()

	database := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "foo-project", "foo-instance", "foo-database")
	spannerClient, err := spanner.NewClient(ctx, database)
	if err != nil {
		panic(err)
	}
	defer spannerClient.Close()

	changeStreamName := "FooStream"
	subscriber := spream.NewSubscriber(spannerClient, changeStreamName, partitionstorage.NewInmemory())

	fmt.Fprintf(os.Stderr, "Reading the stream...\n")

	var mu sync.Mutex
	if err := subscriber.SubscribeFunc(ctx, func(change *spream.DataChangeRecord) error {
		mu.Lock()
		defer mu.Unlock()
		return json.NewEncoder(os.Stdout).Encode(change)
	}); err != nil && !errors.Is(ctx.Err(), context.Canceled) {
		panic(err)
	}
}
Output:

Example (WithOptions)
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"os"
	"os/signal"
	"sync"
	"time"

	"cloud.google.com/go/spanner"
	"cloud.google.com/go/spanner/apiv1/spannerpb"
	"github.com/toga4/spream"
	"github.com/toga4/spream/partitionstorage"
)

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer stop()

	database := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "foo-project", "foo-instance", "foo-database")
	spannerClient, err := spanner.NewClient(ctx, database)
	if err != nil {
		panic(err)
	}
	defer spannerClient.Close()

	partitionMetadataTableName := "PartitionMetadata_FooStream"
	partitionStorage := partitionstorage.NewSpanner(spannerClient, partitionMetadataTableName)
	if err := partitionStorage.CreateTableIfNotExists(ctx); err != nil {
		panic(err)
	}

	changeStreamName := "FooStream"
	subscriber := spream.NewSubscriber(
		spannerClient,
		changeStreamName,
		partitionStorage,
		spream.WithStartTimestamp(time.Now().Add(-time.Hour)),  // Start subscribing from 1 hour ago.
		spream.WithEndTimestamp(time.Now().Add(5*time.Minute)), // Stop subscribing after 5 minutes.
		spream.WithHeartbeatInterval(3*time.Second),
		spream.WithSpannerRequestPriotiry(spannerpb.RequestOptions_PRIORITY_MEDIUM),
	)

	logger := &Logger{out: os.Stdout}
	if err := subscriber.Subscribe(ctx, logger); err != nil && !errors.Is(ctx.Err(), context.Canceled) {
		panic(err)
	}
}

type Logger struct {
	out io.Writer
	mu  sync.Mutex
}

func (l *Logger) Consume(change *spream.DataChangeRecord) error {
	l.mu.Lock()
	defer l.mu.Unlock()
	return json.NewEncoder(l.out).Encode(change)
}
Output:

func (*Subscriber) Subscribe added in v1.0.0

func (s *Subscriber) Subscribe(ctx context.Context, consumer Consumer) error

Subscribe starts subscribing to the change stream.

func (*Subscriber) SubscribeFunc added in v1.0.0

func (s *Subscriber) SubscribeFunc(ctx context.Context, f ConsumerFunc) error

SubscribeFunc is an adapter to allow the use of ordinary functions as Consumer.

function might be called from multiple goroutines and must be re-entrant safe.

type Type

type Type struct {
	Code             TypeCode `json:"code"`
	ArrayElementType TypeCode `json:"array_element_type,omitempty"`
}

Type is the type of the column.

type TypeCode

type TypeCode string
const (
	TypeCode_NONE      TypeCode = ""
	TypeCode_BOOL      TypeCode = "BOOL"
	TypeCode_INT64     TypeCode = "INT64"
	TypeCode_FLOAT64   TypeCode = "FLOAT64"
	TypeCode_TIMESTAMP TypeCode = "TIMESTAMP"
	TypeCode_DATE      TypeCode = "DATE"
	TypeCode_STRING    TypeCode = "STRING"
	TypeCode_BYTES     TypeCode = "BYTES"
	TypeCode_NUMERIC   TypeCode = "NUMERIC"
	TypeCode_JSON      TypeCode = "JSON"
	TypeCode_ARRAY     TypeCode = "ARRAY"
)

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL