imports

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2021 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Create, manage and work import jobs. Used to backfill existing Postgres data into sinks, and should be paired with a logical subscription to ensure all data is captured.

Index

Constants

This section is empty.

Variables

View Source
var NoPrimaryKeyError = fmt.Errorf("no primary key found")

Functions

This section is empty.

Types

type Import

type Import struct {
	Schema            string
	TableName         string
	PrimaryKey        string
	PrimaryKeyScanner textTranscodingScanner
	Relation          *logical.Relation
	Scanners          []decode.Scanner
	Destinations      []interface{}
	Cursor            interface{}
}

Import is built for each job in the database, having resolved contextual information that can help run the job from the database whenever the job was enqueued.

func Build

func Build(ctx context.Context, logger kitlog.Logger, decoder decode.Decoder, tx querier, job model.ImportJobs) (*Import, error)

Build queries the database for information required to perform an import, given an import job to process.

type Importer

type Importer interface {
	Do(ctx context.Context, logger kitlog.Logger, tx pgx.Tx, job model.ImportJobs) error
}

func NewImporter

func NewImporter(sink generic.Sink, decoder decode.Decoder, opts ImporterOptions) Importer

type ImporterOptions

type ImporterOptions struct {
	SnapshotTimeout time.Duration
	BatchLimit      int
	BufferSize      int
}

func (*ImporterOptions) Bind

func (opt *ImporterOptions) Bind(cmd *kingpin.CmdClause, prefix string) *ImporterOptions

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(logger kitlog.Logger, db *sql.DB, opts ManagerOptions) *Manager

func (*Manager) Manage

func (m *Manager) Manage(ctx context.Context, sub subscription.Subscription) error

func (*Manager) Reconcile

func (m *Manager) Reconcile(ctx context.Context, sub subscription.Subscription) ([]model.ImportJobs, error)

Reconcile creates import jobs for tables registered in the subscription that have not yet been imported.

func (*Manager) Shutdown

func (m *Manager) Shutdown(ctx context.Context) error

type ManagerOptions

type ManagerOptions struct {
	PollInterval time.Duration
}

func (*ManagerOptions) Bind

func (opt *ManagerOptions) Bind(cmd *kingpin.CmdClause, prefix string) *ManagerOptions

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(logger kitlog.Logger, db *sql.DB, opts WorkerOptions) *Worker

func (Worker) AcquireAndWork

func (w Worker) AcquireAndWork(ctx context.Context, importer Importer) (*model.ImportJobs, error)

AcquireAndWork finds a job and works it. The method is public to make testing easy, and it should normally be called indirectly via a worker's Start method.

func (Worker) Shutdown

func (w Worker) Shutdown(ctx context.Context) error

func (Worker) Start

func (w Worker) Start(ctx context.Context, importer Importer) error

Start begin working the queue, using the given Importer to process jobs

type WorkerOptions

type WorkerOptions struct {
	SubscriptionID   string        // fixes this worker to only work jobs associated with the current subscription
	PollInterval     time.Duration // interval between polling for new jobs
	RetryInterval    time.Duration // retry interval for the exponential backoff
	RetryExponent    int           // retry exponent to calculate backoff
	MaxRetryInterval time.Duration // maximum interval between retries
}

func (*WorkerOptions) Bind

func (opt *WorkerOptions) Bind(cmd *kingpin.CmdClause, prefix string) *WorkerOptions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL