integ

package module
v0.0.0-...-7031e16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2022 License: MIT Imports: 17 Imported by: 0

README

go-integ (final name TBD)

Makes it simple to pull data from HTTP based data sources. Support singer and airbyte protocol, and can easily be extended to a custom protocol format.

CICD CICD Go Report Card GoDoc License Latest Version codecov

WIP! Identifiers will likely change, the structure will likely be similar though.

Example integration

Pokeapi source with (mocked) incremental sync, see more examples in /integrations folder.

package pokeapi

import (
	"github.com/ajzo90/go-integ"
	"github.com/ajzo90/go-requests"
)

var Poke = integ.NewSource(config{}).
	HttpStream(integ.Incremental("pokemon", pokemon{}), runner)

type config struct {
	Url string
}

type pokemon struct {
	Name string `json:"name"`
	Url  string `json:"url"`
}

var runner = integ.RunnerFunc(func(ctx integ.HttpContext) error {
	var cnf config
	var dummyState struct{}
	if err := ctx.Load(&cnf, &dummyState); err != nil {
		return err
	}

	req := requests.New(cnf.Url).
		Path("pokemon").
		Query("limit", "100")

	resp := new(requests.JSONResponse)
	for {
		if err := ctx.EmitBatch(req, resp, "results"); err != nil {
			return err
		} else if next := resp.String("next"); next == "" {
			return ctx.EmitState(dummyState)
		} else if req, err = requests.FromRawURL(next); err != nil {
			return err
		}
	}
})

The contract to implement

To implement a source connector you have to fulfill the Runner interface

type HttpRunner interface {
	// Run runs the sync job.
	Run(ctx HttpContext) error
}

where the Extractor argument is defined by the following interface


type HttpContext interface {
	// Load a stream with shared config and state
	Load(config, state interface{}) error

	Schema() Schema

	// EmitState emit the state
	EmitState(v interface{}) error

	// EmitBatch executes the provided request, locate the data array and emit the records
	// (likely) called multiple times in the same run
	// resp: (pre-allocated and reusable)
	// path: (path to the data array)
	EmitBatch(req *requests.Request, resp *requests.JSONResponse, path ...string) error
}

Throttling, retries and request control

github.com/ajzo90/go-requests is using a request builder where the Doer (see below, but think client abstraction) can be injected to have full control over the http details. A common use case is to use a single doer that is shared for all requests, and that can be responsible for not sending to many requests.

The requests.NewRetryer is a nice default that handles 429 related headers combined with exponential backoff/retry for common transient http statuses. The Doer interface is easy to adjust according to specific needs.

var doer = requests.NewRetryer(http.DefaultClient, requests.Logger(func (id int, err error, msg string) {
// log intermediate errors here if you like 
}))
type Doer interface {
// Do attempt to do one http request (including retries/redirects)
Do(r *http.Request) (*http.Response, error)
}

secrets and logging

...

Airbyte source example (shopify)

docker build -t airbyte-source-shopify:dev -f dockerfile-airbyte-source-shopify .
docker run --rm airbyte-source-shopify:dev spec
...

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultRetryer

func DefaultRetryer() requests.Doer

func EmitBatch

func EmitBatch(ctx context.Context, emitter ValueEmitter, req *requests.Request, resp *requests.JSONResponse, keys ...string) error

func Handler

func Handler(loaders Loaders, protos Protos) http.HandlerFunc

func Keys

func Keys(schema *jsonschema.Document) []string

func NewSource

func NewSource(config interface{}) *sourceDef

Types

type BaseProtocol

type BaseProtocol struct {
	// contains filtered or unexported fields
}

func (*BaseProtocol) Flush

func (m *BaseProtocol) Flush() error

type Command

type Command string
const (
	CmdSpec     Command = "spec"
	CmdCheck    Command = "check"
	CmdDiscover Command = "discover"
	CmdRead     Command = "read"
)

type ConnectorSpecification

type ConnectorSpecification struct {
	DocumentationURL        string               `json:"documentationUrl,omitempty"`
	SupportsIncremental     bool                 `json:"supportsIncremental"`
	ConnectionSpecification *jsonschema.Document `json:"connectionSpecification"`
}

type DbContext

type DbContext interface {
	GeneralContext
}

type DbRunner

type DbRunner interface {
	Run(ctx DbContext) error
}

type FieldDef

type FieldDef struct {
	Path          []string
	FieldEncoding string
	SortOrder     string
}

func Field

func Field(path ...string) FieldDef

func (FieldDef) Asc

func (f FieldDef) Asc() FieldDef

func (FieldDef) Desc

func (f FieldDef) Desc() FieldDef

func (FieldDef) Encoding

func (f FieldDef) Encoding(v string) FieldDef

type FsContext

type FsContext interface {
	GeneralContext
}

type FsRunner

type FsRunner interface {
	Run(ctx FsContext) error
}

type GeneralContext

type GeneralContext interface {
	// Load a stream with shared config and state
	Load(config, state interface{}) error

	Schema() Schema

	// EmitState emit the state
	EmitState(v interface{}) error

	EmitLog(v any) error

	EmitValues(v []*fastjson.Value) error

	EmitValue(v any) error
}

type GeneralRunner

type GeneralRunner interface {
	Run(ctx GeneralContext) error
}

type HttpContext

type HttpContext interface {
	GeneralContext

	// EmitBatch executes the provided request, locate the data array and emit the records
	// (likely) called multiple times in the same run
	// resp: (pre-allocated and reusable)
	// path: (path to the data array)
	EmitBatch(req *requests.Request, resp *requests.JSONResponse, keys ...string) error
}

type HttpRunner

type HttpRunner interface {
	// Run runs the sync job.
	Run(ctx HttpContext) error
}

type HttpRunnerFunc

type HttpRunnerFunc func(ctx HttpContext) error

func (HttpRunnerFunc) Run

func (r HttpRunnerFunc) Run(ctx HttpContext) error

type Loader

type Loader interface {
	Handle(ctx context.Context, cmd Command, w io.Writer, r io.Reader, protos Protos) error
}

type Loaders

type Loaders map[string]Loader

type ManualContext

type ManualContext interface {
	Stream(schema Schema) (ManualStreamContext, error)
}

type ManualRunner

type ManualRunner interface {
	Run(ctx ManualContext) error
}

type ManualRunnerFunc

type ManualRunnerFunc func(ctx ManualContext) error

func (ManualRunnerFunc) Run

type ManualStreamContext

type ManualStreamContext interface {
	EmitValues(arr []*fastjson.Value) error
	Load(a, b any) error
	EmitState(any) error
	EmitLog(any) error
}

type MaskedString

type MaskedString string

func (MaskedString) MarshalJSON

func (s MaskedString) MarshalJSON() ([]byte, error)

func (MaskedString) Masked

func (s MaskedString) Masked() string

func (MaskedString) String

func (s MaskedString) String() string

type MsgType

type MsgType string
const (
	RECORD            MsgType = "RECORD"
	STATE             MsgType = "STATE"
	LOG               MsgType = "LOG"
	CONNECTION_STATUS MsgType = "CONNECTION_STATUS"
	CATALOG           MsgType = "CATALOG"
	SPEC              MsgType = "SPEC"
	SCHEMA            MsgType = "SCHEMA"

	CONFIG   MsgType = "CONFIG"
	SETTINGS MsgType = "SETTINGS"
)

type Proto

type Proto interface {
	// Open a new stream loader. Should emit or record the schema information
	// Proto can return nil in case this stream should not be emitted
	Open(typ Schema) (StreamProto, error)

	// Close closes the current session. Flushes pending data
	Close() error

	// EmitSpec defines the available streams
	EmitSpec(ConnectorSpecification) error

	EmitStatus(v error) error // can we move this to Proto
}

func Open

func Open(r io.Reader, w io.Writer, cmd Command, protos Protos) (Proto, error)

type ProtoFn

type ProtoFn func(protocol *Protocol) Proto

type Protocol

type Protocol struct {
	Cmd Command
	// contains filtered or unexported fields
}

func (*Protocol) Encode

func (i *Protocol) Encode(v interface{}) error

func (*Protocol) Load

func (i *Protocol) Load(stream string, config, state interface{}) error

func (*Protocol) Write

func (i *Protocol) Write(b []byte) error

type Protos

type Protos map[string]ProtoFn

type Schema

type Schema struct {
	Incremental        bool
	PrimaryKey         []FieldDef
	OrderByKey         []FieldDef
	IterateByKey       []FieldDef
	CustomPrimaryKey   bool
	CustomOrderByKey   bool
	CustomIterateByKey bool
	Name               string
	GoType             interface{}
	JsonSchema         *jsonschema.Document
	Namespace          string
}

func (Schema) FieldKeys

func (s Schema) FieldKeys() []string

func (Schema) Validate

func (s Schema) Validate() error

type SchemaBuilder

type SchemaBuilder struct {
	Schema
}

func Incremental

func Incremental(name string, typ interface{}) SchemaBuilder

func NonIncremental

func NonIncremental(name string, typ interface{}) SchemaBuilder

func (SchemaBuilder) CustomIterateBy

func (s SchemaBuilder) CustomIterateBy() SchemaBuilder

func (SchemaBuilder) CustomOrderBy

func (s SchemaBuilder) CustomOrderBy() SchemaBuilder

func (SchemaBuilder) CustomPrimary

func (s SchemaBuilder) CustomPrimary() SchemaBuilder

func (SchemaBuilder) IterateBy

func (s SchemaBuilder) IterateBy(keys ...FieldDef) SchemaBuilder

func (SchemaBuilder) Namespace

func (s SchemaBuilder) Namespace(namespace string) SchemaBuilder

func (SchemaBuilder) OrderBy

func (s SchemaBuilder) OrderBy(keys ...FieldDef) SchemaBuilder

func (SchemaBuilder) Primary

func (s SchemaBuilder) Primary(keys ...FieldDef) SchemaBuilder

type Settings

type Settings struct {
	Format  string
	Streams Streams
}

type StreamProto

type StreamProto interface {
	Load(config, state interface{}) error

	EmitValues(arr []*fastjson.Value) error

	EmitState(v interface{}) error

	EmitLog(v interface{}) error

	Flush() error
}

type Streams

type Streams []Schema

type ValueEmitter

type ValueEmitter interface {
	EmitValues([]*fastjson.Value) error
}

Directories

Path Synopsis
cmd
integrations
pkg
xml

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL