internal

package
v0.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxObjectsInBatch   int = 1000
	MaxBatchRequestSize int = 2 * 1024 * 1024
)
View Source
const MaxBatchSize = 10000

Variables

This section is empty.

Functions

func Convert added in v0.17.0

func Convert(s StreamProperty, value sqltypes.Value) (interface{}, error)

Convert will turn the mysql representation of a value into its equivalent JSONSchema compatible representation.

func Parse added in v0.11.0

func Parse[T any](path string, obj T) (T, error)

func ParseContents added in v0.11.0

func ParseContents[T any](content []byte, obj T) (T, error)

func QueryResultToRecords

func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{}

func Sync

func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDatabase PlanetScaleDatabase, logger Logger, source PlanetScaleSource, catalog Catalog, state *State, recordWriter RecordWriter, tabletType psdbconnect.TabletType) error

func TabletTypeToString

func TabletTypeToString(t psdbconnect.TabletType) string

Types

type BatchResponse

type BatchResponse struct {
	Status  string `json:"status"`
	Message string `json:"message"`
}

type Bookmark

type Bookmark struct {
	Cursor string `json:"last_record"`
}

type Catalog

type Catalog struct {
	Streams []Stream `json:"streams,omitempty"`
}

type DiscoverSettings added in v0.7.0

type DiscoverSettings struct {
	AutoSelectTables      bool
	ExcludedTables        []string
	UseIncrementalSync    bool
	TreatTinyIntAsBoolean bool
}

type ImportBatch

type ImportBatch struct {
	// The name of the destination table the data is being pushed to.
	// Table names must be unique in each destination schema, or loading issues will occur.
	Table string `json:"table_name"`

	// A Schema object containing the JSON schema describing the record(s) in the Message object’s data property.
	// Records must conform to this schema or an error will be returned when the request is sent.
	Schema StreamSchema `json:"schema"`

	// An array of Message objects, each representing a record to be upserted into the table.
	Messages []ImportMessage `json:"messages"`

	// An array of strings representing the Primary Key fields in the source table.
	// Each field in the list must be the name of a top-level property defined in the Schema object.
	// Primary Key fields cannot be contained in an object or an array.
	PrimaryKeys []string `json:"key_names"`
}

ImportBatch is an object containing a table name, a table schema, and message objects representing records to be pushed to Stitch.

func (*ImportBatch) SizeOf added in v0.12.0

func (imb *ImportBatch) SizeOf() int

type ImportMessage

type ImportMessage struct {
	// This will always be upsert.
	Action string `json:"action"`

	// An integer that tells the Import API the order in which
	// data points in the request body should be considered for loading.
	// This data will be stored in the destination table in the _sdc_sequence column.
	EmittedAt int64 `json:"sequence"`

	// The record to be upserted into a table.
	// The record data must conform to the JSON schema contained in the request’s Schema object.
	Data map[string]interface{} `json:"data"`
}

ImportMessage contains information about a record to be upserted into a table.

type Logger

type Logger interface {
	Log(message string)
	Info(message string)
	Error(message string)
	Schema(Catalog) error
	StreamSchema(Stream) error
	Record(Record, Stream) error
	State(State) error
	Flush(Stream) error
}

func NewLogger

func NewLogger(component string, stdout io.Writer, stderr io.Writer) Logger

func NewTestLogger added in v0.14.0

func NewTestLogger() Logger

type Metadata

type Metadata struct {
	Metadata NodeMetadata `json:"metadata"`
}

func NewMetadata

func NewMetadata(autoSelect bool) Metadata

type MetadataCollection

type MetadataCollection []Metadata

func (MetadataCollection) GetPropertyMap

func (m MetadataCollection) GetPropertyMap() map[string]Metadata

GetPropertyMap takes a MetadataCollection which is a flat slice of metadata values and turns it into a map of property name to metadata item input:

{
       "metadata":
       {
           "inclusion": "available",
           "breadcrumb":
           [ "properties", "dept_no" ]
       }
   },
   {
       "metadata":
       {
           "inclusion": "available",
           "breadcrumb": [ "properties", "dept_name"]
       }
   }

] output :

{
       "dept_no":
       {
           "metadata":
           {
               "inclusion": "available",
               "breadcrumb":
               [
                   "properties",
                   "dept_no"
               ]
           }
       },
       "dept_name":
       {
           "metadata":
           {
               "inclusion": "available",
               "breadcrumb":
               [
                   "properties",
                   "dept_name"
               ]
           }
       }
   }

func (MetadataCollection) GetSelectedProperties added in v0.14.0

func (m MetadataCollection) GetSelectedProperties() []string

type NodeMetadata

type NodeMetadata struct {
	// Either true or false. Indicates that this node in the schema has been selected by the user for replication.
	Selected bool `json:"selected"`

	// Either FULL_TABLE, INCREMENTAL, or LOG_BASED. The replication method to use for a stream.
	ReplicationMethod string `json:"replication-method,omitempty"`

	// The name of a property in the source to use as a "bookmark".
	// For example, this will often be an "updated-at" field or an auto-incrementing primary key (requires replication-method).
	ReplicationKey string `json:"replication-key,omitempty"`

	// Either available, automatic, or unsupported.
	// 1. "available" means the field is available for selection,
	// and the tap will only emit values for that field if it is marked with "selected": true.
	// 2. "automatic" means that the tap will emit values for the field.
	// 3. "unsupported" means that the field exists in the source data but the tap is unable to provide it.
	Inclusion string `json:"inclusion,omitempty"`

	// Either true or false.
	// Indicates if a node in the schema should be replicated if
	// a user has not expressed any opinion on whether or not to replicate it.
	SelectedByDefault bool `json:"selected-by-default,omitempty"`

	// List of the fields that could be used as replication keys.
	ValidReplicationKeys []string `json:"valid-replication-keys,omitempty"`

	// Used to force the replication method to either FULL_TABLE or INCREMENTAL.
	ForcedReplicationMethod string `json:"forced-replication-method,omitempty"`

	// List of key properties for a database table.
	TableKeyProperties []string `json:"table-key-properties,omitempty"`

	// The name of the stream.
	SchemaName string `json:"schema-name,omitempty"`

	// Either true or false. Indicates whether a stream corresponds to a database view.
	IsView bool `json:"is-view,omitempty"`

	// Name of database.
	DatabaseName string `json:"database-name,omitempty"`

	// Represents the datatype of a database column.
	SqlDataType string `json:"sql-datatype,omitempty"`

	// The breadcrumb object defines the path into the schema to the node to which the metadata belongs.
	//  Metadata for a stream will have an empty breadcrumb.
	// example for a stream: "breadcrumb": []
	// example for a property: "breadcrumb": ["properties", "id"]
	BreadCrumb []string `json:"breadcrumb"`
}

NodeMetadata represents the metadata for a given database object an example is : "metadata": [

  {
    "metadata": {
      "inclusion": "available",
      "table-key-properties": ["id"],
      "selected": true,
      "valid-replication-keys": ["date_modified"],
      "schema-name": "users",
    },
    "breadcrumb": []
  },
  {
    "metadata": {
      "inclusion": "automatic"
    },
    "breadcrumb": ["properties", "id"]
  },
  {
    "metadata": {
      "inclusion": "available",
      "selected": true
    },
    "breadcrumb": ["properties", "name"]
  },
  {
    "metadata": {
      "inclusion": "automatic"
    },
    "breadcrumb": ["properties", "date_modified"]
  }
]

type OnCursor added in v0.14.0

type OnCursor func(*psdbconnect.TableCursor) error

type OnResult added in v0.14.0

type OnResult func(*sqltypes.Result) error

type PlanetScaleDatabase

type PlanetScaleDatabase interface {
	CanConnect(ctx context.Context, ps PlanetScaleSource) error
	Read(ctx context.Context, params ReadParams) (*SerializedCursor, error)
	Close() error
}

PlanetScaleDatabase is a general purpose interface that defines all the data access methods needed for the PlanetScale Singer Tap to function.

type PlanetScaleEdgeDatabase

type PlanetScaleEdgeDatabase struct {
	Logger Logger
	Mysql  PlanetScaleEdgeMysqlAccess
	// contains filtered or unexported fields
}

PlanetScaleEdgeDatabase is an implementation of the PlanetScaleDatabase interface defined above. It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and the grpc API for incrementally syncing rows from PlanetScale.

func (PlanetScaleEdgeDatabase) CanConnect

func (PlanetScaleEdgeDatabase) Close

func (p PlanetScaleEdgeDatabase) Close() error

func (PlanetScaleEdgeDatabase) Read

Read streams rows from a table given a starting cursor. 1. We will get the latest vgtid for a given table in a shard when a sync session starts. 2. This latest vgtid is now the stopping point for this sync session. 3. Ask vstream to stream from the last known vgtid 4. When we reach the stopping point, read all rows available at this vgtid 5. End the stream when (a) a vgtid newer than latest vgtid is encountered or (b) the timeout kicks in.

type PlanetScaleEdgeMysqlAccess

type PlanetScaleEdgeMysqlAccess interface {
	PingContext(context.Context, PlanetScaleSource) error
	GetTableNames(context.Context, PlanetScaleSource) ([]string, error)
	GetTableSchema(context.Context, PlanetScaleSource, string, bool) (map[string]StreamProperty, error)
	GetTablePrimaryKeys(context.Context, PlanetScaleSource, string) ([]string, error)
	GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
	GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error)
	Close() error
}

type PlanetScaleSource

type PlanetScaleSource struct {
	Host     string `json:"host"`
	Database string `json:"database"`
	Username string `json:"username"`
	Password string `json:"password"`
	Shards   string `json:"shards"`
}

PlanetScaleSource defines a configured Singer Tap Source for a PlanetScale database

func (PlanetScaleSource) DSN

DSN returns a DataSource that mysql libraries can use to connect to a PlanetScale database.

func (PlanetScaleSource) GetInitialState

func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards []string) (ShardStates, error)

GetInitialState will return the initial/blank state for a given keyspace in all of its shards. This state can be round-tripped safely with Singer.

type ReadParams added in v0.14.0

type ReadParams struct {
	Source            PlanetScaleSource
	Table             Stream
	LastKnownPosition *psdbconnect.TableCursor
	Columns           []string
	OnResult          OnResult
	OnCursor          OnCursor
	TabletType        psdbconnect.TabletType
	Cells             []string
}

type Record

type Record struct {
	// a constant with value "Record"
	Type string `json:"type"`

	// The string name of the stream
	Stream string `json:"stream"`

	// The time this record was observed in the source.
	// This should be an RFC3339 formatted date-time, like "2017-11-20T16:45:33.000Z".
	TimeExtracted string `json:"time_extracted"`

	// A JSON map containing a streamed data point
	Data map[string]interface{} `json:"record"`
}

Record messages contain the data from the data stream. example:

{
 "type": "RECORD",
 "stream": "users",
 "time_extracted": "2017-11-20T16:45:33.000Z",
 "record": {
   "id": 0,
   "name": "Chris"
 }
}

func NewRecord

func NewRecord() Record

type RecordWriter added in v0.14.0

type RecordWriter interface {
	Flush(stream Stream) error
	Record(record Record, stream Stream) error
	State(state State) error
}

func NewHttpRecordWriter added in v0.14.0

func NewHttpRecordWriter(batchSize int, apiURL, apiToken, stateFileDir string, logger StatusLogger) RecordWriter

type SerializedCursor

type SerializedCursor struct {
	Cursor string `json:"cursor"`
}

func TableCursorToSerializedCursor

func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*SerializedCursor, error)

func (SerializedCursor) SerializedCursorToTableCursor

func (s SerializedCursor) SerializedCursorToTableCursor() (*psdbconnect.TableCursor, error)

type ShardStates

type ShardStates struct {
	Shards map[string]*SerializedCursor `json:"shards"`
}

type State

type State struct {
	Streams map[string]ShardStates `json:"bookmarks"`
}

State represents any previously known state about the last sync operation example :

{
 "bookmarks":
 {
   "branch_query":
   {
     "shards":
     {
       "80-c0":
       {
         "cursor": "Base64-encoded-tablecursor"
       }
     }
   },
   "branch_query_tag":
   {
     "shards":
     {
       "-40":
       {
         "cursor": "Base64-encoded-tablecursor"
       },
       "c0-":
       {
         "cursor": "Base64-encoded-tablecursor"
       },
       "40-80":
       {
        "cursor": "Base64-encoded-tablecursor"
       },
       "80-c0":
       {
         "cursor": "Base64-encoded-tablecursor"
       }
     }
   }
 }
}

func ParseSavedState added in v0.11.0

func ParseSavedState(stateFilePath string) (*State, error)

type StateMessage

type StateMessage struct {
	Type  string `json:"type"`
	Value State  `json:"value"`
}

type StatusLogger added in v0.14.0

type StatusLogger interface {
	Log(message string)
	Info(message string)
	Error(message string)
}

type Stream

type Stream struct {
	// Type is a constant of value "SCHEMA"
	Type string `json:"type"`

	// The name of the stream.
	Name string `json:"stream"`

	// The unique identifier for the stream.
	// This is allowed to be different from the name of the stream
	// in order to allow for sources that have duplicate stream names.
	ID string `json:"tap_stream_id"`

	// The JSON schema for the stream.
	Schema StreamSchema `json:"schema"`

	// For a database source, the name of the table.
	TableName string `json:"table-name"`

	// Each piece of metadata has the following canonical shape:
	//{
	//  "metadata" : {
	//    "selected" : true,
	//    "some-other-metadata" : "whatever"
	//  },
	//  "breadcrumb" : ["properties", "some-field-name"]
	//}
	Metadata MetadataCollection `json:"metadata"`

	// A list of strings indicating which properties make up the primary key for this stream.
	// Each item in the list must be the name of a top-level property defined in the schema
	KeyProperties []string `json:"key_properties"`

	// A list of strings indicating which properties the tap is using as bookmarks.
	// Each item in the list must be the name of a top-level property defined in the schema.
	CursorProperties []string `json:"bookmark_properties"`
}

Stream represents the JSONSchema definition for a given database object. example:

{
 "streams": [
   {
     "tap_stream_id": "users",
     "stream": "users",
     "schema": {
       "type": ["null", "object"],
       "additionalProperties": false,
       "properties": {
         "id": {
           "type": [
             "null",
             "string"
           ],
         },
         "name": {
           "type": [
             "null",
             "string"
           ],
         },
         "date_modified": {
           "type": [
             "null",
             "string"
           ],
           "format": "date-time",
         }
       }
     }
   }
 ]
}

func (*Stream) GenerateMetadata

func (s *Stream) GenerateMetadata(keyProperties []string, autoSelect, useIncrementalSync bool) error

func (*Stream) GetTableMetadata

func (s *Stream) GetTableMetadata() (*Metadata, error)

GetTableMetadata iterates the Metadata collection for a stream and returns the metadata item that is associated with the stream.

func (*Stream) IncrementalSyncRequested

func (s *Stream) IncrementalSyncRequested() bool

type StreamProperty

type StreamProperty struct {
	Types        []string `json:"type"`
	CustomFormat string   `json:"format,omitempty"`
}

func (StreamProperty) IsBoolean added in v0.17.0

func (s StreamProperty) IsBoolean() bool

func (StreamProperty) IsDateTime added in v0.17.0

func (s StreamProperty) IsDateTime() bool

func (StreamProperty) IsInteger added in v0.17.0

func (s StreamProperty) IsInteger() bool

func (StreamProperty) IsNumber added in v0.17.0

func (s StreamProperty) IsNumber() bool

type StreamSchema

type StreamSchema struct {
	Type                    []string                  `json:"type"`
	HasAdditionalProperties bool                      `json:"additionalProperties"`
	Properties              map[string]StreamProperty `json:"properties"`
}

type VitessTablet

type VitessTablet struct {
	Cell                 string
	Keyspace             string
	Shard                string
	TabletType           string
	State                string
	Alias                string
	Hostname             string
	PrimaryTermStartTime string
}

type WrappedState added in v0.11.0

type WrappedState struct {
	Value State `json:"value"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL