datasource

package
v0.0.0-...-c484601 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2015 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// default schema Refresh Interval
	SchemaRefreshInterval = -time.Minute * 5

	// Static list of common field names for describe header
	DescribeCols    = []string{"Field", "Type", "Null", "Key", "Default", "Extra"}
	DescribeHeaders = NewDescribeHeaders()
)
View Source
var (

	// Some common errors
	ErrNotFound = fmt.Errorf("Not Found")
)

Functions

func NewNestedContextReader

func NewNestedContextReader(readers []expr.ContextReader, ts time.Time) expr.ContextReader

NewNestedContextReader provides a context reader which is a composite of ordered child readers the first reader with a key will be used

func Register

func Register(name string, source DataSource)

Register makes a datasource available by the provided name. If Register is called twice with the same name or if source is nil, it panics.

func SourceIterChannel

func SourceIterChannel(iter Iterator, filter expr.Node, sigCh <-chan bool) <-chan Message

Open a go routine to run this source iteration until signal/complete

Types

type Aggregations

type Aggregations interface {
	DataSource
	Aggregate(expr.SqlStatement) error
}

type ContextSimple

type ContextSimple struct {
	Data map[string]value.Value
	// contains filtered or unexported fields
}

func NewContextSimple

func NewContextSimple() *ContextSimple

func NewContextSimpleData

func NewContextSimpleData(data map[string]value.Value) *ContextSimple

func NewContextSimpleTs

func NewContextSimpleTs(data map[string]value.Value, ts time.Time) *ContextSimple

func (*ContextSimple) All

func (m *ContextSimple) All() map[string]value.Value

func (*ContextSimple) Body

func (m *ContextSimple) Body() interface{}

func (*ContextSimple) Commit

func (m *ContextSimple) Commit(rowInfo []expr.SchemaInfo, row expr.RowWriter) error

func (*ContextSimple) Delete

func (m *ContextSimple) Delete(row map[string]value.Value) error

func (ContextSimple) Get

func (m ContextSimple) Get(key string) (value.Value, bool)

func (*ContextSimple) Id

func (m *ContextSimple) Id() uint64

func (*ContextSimple) Put

func (*ContextSimple) Row

func (m *ContextSimple) Row() map[string]value.Value

func (*ContextSimple) Ts

func (m *ContextSimple) Ts() time.Time

type ContextUrlValues

type ContextUrlValues struct {
	Data url.Values
	// contains filtered or unexported fields
}

func NewContextUrlValues

func NewContextUrlValues(uv url.Values) *ContextUrlValues

func NewContextUrlValuesTs

func NewContextUrlValuesTs(uv url.Values, ts time.Time) *ContextUrlValues

func (*ContextUrlValues) Body

func (m *ContextUrlValues) Body() interface{}

func (*ContextUrlValues) Delete

func (m *ContextUrlValues) Delete(delRow map[string]value.Value) error

func (ContextUrlValues) Get

func (m ContextUrlValues) Get(key string) (value.Value, bool)

func (*ContextUrlValues) Id

func (m *ContextUrlValues) Id() uint64

func (ContextUrlValues) Put

func (ContextUrlValues) Row

func (m ContextUrlValues) Row() map[string]value.Value

func (*ContextUrlValues) String

func (m *ContextUrlValues) String() string

func (ContextUrlValues) Ts

func (m ContextUrlValues) Ts() time.Time

type ContextWriterEmpty

type ContextWriterEmpty struct{}

func (*ContextWriterEmpty) Delete

func (m *ContextWriterEmpty) Delete(delRow map[string]value.Value) error

func (*ContextWriterEmpty) Put

type CsvDataSource

type CsvDataSource struct {
	// contains filtered or unexported fields
}

Csv DataStoure, implements qlbridge DataSource to scan through data

  • very, very naive scanner, forward only single pass
  • can open a file with .Open()
  • if FROM name in sql is "stdin" or "stdio" will open from stdin
  • assumes comma delimited

func NewCsvSource

func NewCsvSource(table string, indexCol int, ior io.Reader, exit <-chan bool) (*CsvDataSource, error)

Csv reader assumes we are getting first row as headers

func (*CsvDataSource) Close

func (m *CsvDataSource) Close() error

func (*CsvDataSource) Columns

func (m *CsvDataSource) Columns() []string

func (*CsvDataSource) CreateIterator

func (m *CsvDataSource) CreateIterator(filter expr.Node) Iterator

func (*CsvDataSource) MesgChan

func (m *CsvDataSource) MesgChan(filter expr.Node) <-chan Message

func (*CsvDataSource) Next

func (m *CsvDataSource) Next() Message

func (*CsvDataSource) Open

func (m *CsvDataSource) Open(connInfo string) (SourceConn, error)

func (*CsvDataSource) Tables

func (m *CsvDataSource) Tables() []string

type DataSource

type DataSource interface {
	Tables() []string
	Open(connInfo string) (SourceConn, error)
	Close() error
}

A datasource is most likely a database, file, api, in-mem data etc something that provides data rows. If the source is a sql database it can do its own planning/implementation.

However sources do not have to implement all features of a database scan/seek/sort/filter/group/aggregate, in which case we will use our own execution engine to "Polyfill" the features

Minimum Features:

  • Scanning: iterate through messages/rows, use expr to evaluate this is the minium we need to implement sql select
  • Schema Tables: at a minium tables available, the column level data can be introspected so is optional

Planning:

  • ?? Accept() or VisitSelect() not yet implemented

Optional Select Features:

  • Seek ie, key-value lookup, or indexed rows
  • Projection ie, selecting specific fields
  • Where filtering response
  • GroupBy
  • Aggregations ie, count(*), avg() etc
  • Sort sort response, very important for fast joins

Non Select based Sql DML Operations:

  • Deletion: (sql delete) Delete() DeleteExpression()
  • Upsert Interface (sql Update, Upsert, Insert) Put() PutMulti()

DDL/Schema Operations

  • schema discovery
  • create
  • index

type DataSourceFeatures

type DataSourceFeatures struct {
	Features *Features
	DataSource
}

func NewFeaturedSource

func NewFeaturedSource(src DataSource) *DataSourceFeatures

type DataSources

type DataSources struct {
	// contains filtered or unexported fields
}

Our internal map of different types of datasources that are registered for our runtime system to use

func DataSourcesRegistry

func DataSourcesRegistry() *DataSources

get registry of all datasource types

func (*DataSources) Get

func (m *DataSources) Get(sourceType string) *DataSourceFeatures

func (*DataSources) String

func (m *DataSources) String() string

type Deletion

type Deletion interface {
	// Delete using this key
	Delete(driver.Value) (int, error)
	// Delete with given expression
	DeleteExpression(expr.Node) (int, error)
}

type Features

type Features struct {
	SourcePlanner  bool
	Scanner        bool
	Seeker         bool
	WhereFilter    bool
	GroupBy        bool
	Sort           bool
	Aggregations   bool
	Projection     bool
	SourceMutation bool
	Upsert         bool
	PatchWhere     bool
	Deletion       bool
}

We do type introspection in advance to speed up runtime feature detection for datasources

func NewFeatures

func NewFeatures(src DataSource) *Features

type Field

type Field struct {
	Name               string
	Description        string
	Data               FieldData
	Length             uint32
	Type               value.ValueType
	DefaultValueLength uint64
	DefaultValue       driver.Value
	Indexed            bool
}

Field Describes the column info, name, data type, defaults, index

func NewDescribeHeaders

func NewDescribeHeaders() []*Field

func NewField

func NewField(name string, valType value.ValueType, size int, description string) *Field

type FieldData

type FieldData []byte

type GroupBy

type GroupBy interface {
	DataSource
	GroupBy(expr.SqlStatement) error
}

type Iterator

type Iterator interface {
	Next() Message
}

simple iterator interface for paging through a datastore Messages/rows

  • used for scanning
  • for datasources that implement exec.Visitor() (ie, select) this represents the alreader filtered, calculated rows

type JsonHelperScannable

type JsonHelperScannable u.JsonHelper

func (*JsonHelperScannable) MarshalJSON

func (m *JsonHelperScannable) MarshalJSON() ([]byte, error)

func (*JsonHelperScannable) Scan

func (m *JsonHelperScannable) Scan(src interface{}) error

func (*JsonHelperScannable) UnmarshalJSON

func (m *JsonHelperScannable) UnmarshalJSON(data []byte) error

Unmarshall bytes into this typed struct

func (JsonHelperScannable) Value

func (m JsonHelperScannable) Value() (driver.Value, error)

This is the go sql/driver interface we need to implement to allow conversion back forth

type JsonWrapper

type JsonWrapper json.RawMessage

func (*JsonWrapper) MarshalJSON

func (m *JsonWrapper) MarshalJSON() ([]byte, error)

func (*JsonWrapper) Scan

func (m *JsonWrapper) Scan(src interface{}) error

func (*JsonWrapper) Unmarshal

func (m *JsonWrapper) Unmarshal(v interface{}) error

func (*JsonWrapper) UnmarshalJSON

func (m *JsonWrapper) UnmarshalJSON(data []byte) error

Unmarshall bytes into this typed struct

func (JsonWrapper) Value

func (m JsonWrapper) Value() (driver.Value, error)

This is the go sql/driver interface we need to implement to allow conversion back forth

type Key

type Key interface {
	Key() driver.Value
}

Key interface is the Unique Key identifying a row

func KeyFromWhere

func KeyFromWhere(wh expr.Node) Key

Given a Where expression, lets try to create a key which

requires form    `idenity = "value"`

type KeyCol

type KeyCol struct {
	Name string
	Val  driver.Value
}

Variety of Key Types

func NewKeyCol

func NewKeyCol(name string, val driver.Value) KeyCol

func (KeyCol) Key

func (m KeyCol) Key() driver.Value

type KeyInt

type KeyInt struct {
	Id int
}

Variety of Key Types

func NewKeyInt

func NewKeyInt(key int) KeyInt

func (*KeyInt) Key

func (m *KeyInt) Key() driver.Value

type KeyInt64

type KeyInt64 struct {
	Id int64
}

Variety of Key Types

func NewKeyInt64

func NewKeyInt64(key int64) KeyInt64

func (*KeyInt64) Key

func (m *KeyInt64) Key() driver.Value

type Message

type Message interface {
	Id() uint64
	Body() interface{}
}

represents a message, the Id() method provides a consistent uint64 which can be used by consistent-hash algorithms for topologies that split messages up amongst multiple machines

Body() returns interface allowing this to be generic structure for routing

see "https://github.com/mdmarek/topo" AND http://github.com/lytics/grid

type Mutator

type Mutator interface {
	Upsert
	Deletion
}

type NestedContextReader

type NestedContextReader struct {
	// contains filtered or unexported fields
}

func (*NestedContextReader) Get

func (n *NestedContextReader) Get(key string) (value.Value, bool)

func (*NestedContextReader) Row

func (n *NestedContextReader) Row() map[string]value.Value

func (*NestedContextReader) Ts

func (n *NestedContextReader) Ts() time.Time

type NodeConfig

type NodeConfig struct {
	Name     string       `json:"name"`     // Name of this Node optional
	Source   string       `json:"source"`   // Name of source this node belongs to
	Address  string       `json:"address"`  // host/ip
	Settings u.JsonHelper `json:"settings"` // Arbitrary settings
}

Nodes are Servers

  • this represents a single source type
  • may have config info in Settings
  • user = username
  • password = password
  • idleconns = # of idle connections

type PatchWhere

type PatchWhere interface {
	PatchWhere(ctx context.Context, where expr.Node, patch interface{}) (int64, error)
}

Patch Where, pass through where expression to underlying datasource

Used for update statements WHERE x = y

type Projection

type Projection interface {
	// Describe the Columns etc
	Projection() (*expr.Projection, error)
}

Some data sources that implement more features, can provide

their own projection.

type RuntimeSchema

type RuntimeSchema struct {
	Sources *DataSources // All registered DataSources

	DisableRecover bool // If disableRecover=true, we will not capture/suppress panics
	// contains filtered or unexported fields
}

The RuntimeSchema provides info on available datasources

given connection info, get datasource

func NewRuntimeSchema

func NewRuntimeSchema() *RuntimeSchema

func (*RuntimeSchema) Conn

func (m *RuntimeSchema) Conn(db string) SourceConn

Get connection for given Database

@db      database name

func (*RuntimeSchema) DataSource

func (m *RuntimeSchema) DataSource(connInfo string) DataSource

given connection info, get datasource

@connInfo =    csv:///dev/stdin
               mockcsv

func (*RuntimeSchema) SetConnInfo

func (m *RuntimeSchema) SetConnInfo(connInfo string)

Our RunTime configuration possibly only supports a single schema/connection info. for example, the sql/driver interface, so will be set here.

@connInfo =    csv:///dev/stdin

type Scanner

type Scanner interface {
	SchemaColumns
	SourceConn
	// create a new iterator for underlying datasource
	CreateIterator(filter expr.Node) Iterator
	MesgChan(filter expr.Node) <-chan Message
}

A scanner, most basic of data sources, just iterate through

rows without any optimizations

type Schema

type Schema struct {
	Name          string                   `json:"name"`
	SourceSchemas map[string]*SourceSchema // map[source_name]:Source Schemas
	// contains filtered or unexported fields
}

Schema is a "Virtual" Schema Database. Made up of

  • Multiple DataSource(s) (each may be discrete source type)
  • each datasource supplies tables to the virtual table pool
  • each table from each source must be unique (or aliased)

func NewSchema

func NewSchema(schemaName string) *Schema

func (*Schema) AddSourceSchema

func (m *Schema) AddSourceSchema(ss *SourceSchema)

func (*Schema) AddTableName

func (m *Schema) AddTableName(tableName string, ss *SourceSchema)

func (*Schema) Current

func (m *Schema) Current() bool

Is this schema uptodate?

func (*Schema) RefreshSchema

func (m *Schema) RefreshSchema()

func (*Schema) Since

func (m *Schema) Since(dur time.Duration) bool

Is this schema object within time window described by @dur time ago ?

func (*Schema) Table

func (m *Schema) Table(tableName string) (*Table, error)

func (*Schema) Tables

func (m *Schema) Tables() []string

type SchemaColumns

type SchemaColumns interface {
	Columns() []string
}

Interface for a data source exposing column positions for []driver.Value iteration

type SchemaConfig

type SchemaConfig struct {
	Name       string   `json:"name"`    // Virtual Schema Name, must be unique
	Sources    []string `json:"sources"` // List of sources , the names of the "Db" in source
	NodeConfig []string `json:"-"`       // List of backend Servers
}

A Schema is a Virtual Schema, and may have multiple backend's

type SchemaProvider

type SchemaProvider interface {
	DataSource
	Table(table string) (*Table, error)
}

A backend data source provider that also provides schema

type Seeker

type Seeker interface {
	DataSource
	// Just because we have Get, Multi-Get, doesn't mean we can seek all
	// expressions, find out with CanSeek for given expression
	CanSeek(*expr.SqlSelect) bool
	Get(key driver.Value) (Message, error)
	MultiGet(keys []driver.Value) ([]Message, error)
}

Interface for Seeking row values instead of scanning (ie, Indexed)

type Sort

type Sort interface {
	DataSource
	Sort(expr.SqlStatement) error
}

type SourceConfig

type SourceConfig struct {
	Name         string        `json:"name"`           // Name
	SourceType   string        `json:"type"`           // [mysql,elasticsearch,csv,etc] Name in DataSource Registry
	TablesToLoad []string      `json:"tables_to_load"` // if non empty, only load these tables
	Nodes        []*NodeConfig `json:"nodes"`          // List of nodes
	Settings     u.JsonHelper  `json:"settings"`       // Arbitrary settings specific to each source type
}

Config for Source are storage/database/csvfiles

  • this represents a single source type
  • may have more than one node
  • belongs to a Schema ( or schemas)

func NewSourceConfig

func NewSourceConfig(name, sourceType string) *SourceConfig

func (*SourceConfig) Init

func (m *SourceConfig) Init()

func (*SourceConfig) String

func (m *SourceConfig) String() string

type SourceConn

type SourceConn interface {
	Close() error
}

DataSource Connection, only one guaranteed feature, although

should implement many more (scan, seek, etc)

func OpenConn

func OpenConn(sourceName, sourceConfig string) (SourceConn, error)

Open a datasource

sourcename = "csv", "elasticsearch"

type SourceMutation

type SourceMutation interface {
	Create(tbl *Table, stmt expr.SqlStatement) (Mutator, error)
}

SourceMutation, is a statefull connetion similar to Open() connection for select

  • accepts the tble used in this upsert/insert/update

type SourcePlanner

type SourcePlanner interface {
	// Accept a sql statement, to plan the execution ideally, this would be done
	// by planner but, we need source specific planners, as each backend has different features
	Accept(expr.SubVisitor) (Scanner, error)
}

Some sources can do their own planning for sub-select statements

type SourceSchema

type SourceSchema struct {
	Name       string              // Source specific Schema name, generally underlying db name
	Conf       *SourceConfig       // source configuration
	Schema     *Schema             // Schema this is participating in
	Nodes      []*NodeConfig       // List of nodes config
	DSFeatures *DataSourceFeatures // The datasource Interface
	DS         DataSource          // This datasource Interface
	// contains filtered or unexported fields
}

SourceSchema is a schema for a single DataSource (elasticsearch, mysql, filesystem, elasticsearch)

each DataSource would have multiple tables

func NewSourceSchema

func NewSourceSchema(name, sourceType string) *SourceSchema

func (*SourceSchema) AddTable

func (m *SourceSchema) AddTable(tbl *Table)

func (*SourceSchema) AddTableName

func (m *SourceSchema) AddTableName(tableName string)

func (*SourceSchema) Table

func (m *SourceSchema) Table(tableName string) (*Table, error)

func (*SourceSchema) Tables

func (m *SourceSchema) Tables() []string

type SourceSelectPlanner

type SourceSelectPlanner interface {
	// Accept a sql statement, to plan the execution ideally, this would be done
	// by planner but, we need source specific planners, as each backend has different features
	//Accept(expr.Visitor) (Scanner, error)
	VisitSelect(stmt *expr.SqlSelect) (interface{}, error)
}

Some sources can do their own planning

type SqlDriverMessage

type SqlDriverMessage struct {
	Vals  []driver.Value
	IdVal uint64
}

func (*SqlDriverMessage) Body

func (m *SqlDriverMessage) Body() interface{}

func (*SqlDriverMessage) Id

func (m *SqlDriverMessage) Id() uint64

type SqlDriverMessageMap

type SqlDriverMessageMap struct {
	IdVal uint64 // id()
	// contains filtered or unexported fields
}

func NewSqlDriverMessageMap

func NewSqlDriverMessageMap(id uint64, row []driver.Value, colindex map[string]int) *SqlDriverMessageMap

func NewSqlDriverMessageMapEmpty

func NewSqlDriverMessageMapEmpty() *SqlDriverMessageMap

func NewSqlDriverMessageMapVals

func NewSqlDriverMessageMapVals(id uint64, row []driver.Value, cols []string) *SqlDriverMessageMap

func (*SqlDriverMessageMap) Body

func (m *SqlDriverMessageMap) Body() interface{}

func (*SqlDriverMessageMap) Copy

func (*SqlDriverMessageMap) Get

func (m *SqlDriverMessageMap) Get(key string) (value.Value, bool)

func (*SqlDriverMessageMap) Id

func (m *SqlDriverMessageMap) Id() uint64

func (*SqlDriverMessageMap) Key

func (m *SqlDriverMessageMap) Key() string

func (*SqlDriverMessageMap) Row

func (m *SqlDriverMessageMap) Row() map[string]value.Value

func (*SqlDriverMessageMap) SetKey

func (m *SqlDriverMessageMap) SetKey(key string)

func (*SqlDriverMessageMap) SetKeyHashed

func (m *SqlDriverMessageMap) SetKeyHashed(key string)

func (*SqlDriverMessageMap) SetRow

func (m *SqlDriverMessageMap) SetRow(row []driver.Value)

func (*SqlDriverMessageMap) Ts

func (m *SqlDriverMessageMap) Ts() time.Time

func (*SqlDriverMessageMap) Values

func (m *SqlDriverMessageMap) Values() []driver.Value

type StringArray

type StringArray []string

func (*StringArray) MarshalJSON

func (m *StringArray) MarshalJSON() ([]byte, error)

func (*StringArray) Scan

func (m *StringArray) Scan(src interface{}) error

func (*StringArray) UnmarshalJSON

func (m *StringArray) UnmarshalJSON(data []byte) error

func (StringArray) Value

func (m StringArray) Value() (driver.Value, error)

type Table

type Table struct {
	Name           string            // Name of table lowercased
	NameOriginal   string            // Name of table
	FieldPositions map[string]int    // Maps name of column to ordinal position in array of []driver.Value's
	Fields         []*Field          // List of Fields, in order
	FieldMap       map[string]*Field // List of Fields, in order
	DescribeValues [][]driver.Value  // The Values that will be output for Describe
	Schema         *Schema           // The schema this is member of
	SourceSchema   *SourceSchema     // The source schema this is member of
	Charset        uint16            // Character set, default = utf8
	// contains filtered or unexported fields
}

Table represents traditional definition of Database Table

It belongs to a Schema and can be used to
create a Datasource used to read this table

func NewTable

func NewTable(table string, s *SourceSchema) *Table

func (*Table) AddField

func (m *Table) AddField(fld *Field)

func (*Table) AddFieldType

func (m *Table) AddFieldType(name string, valType value.ValueType)

func (*Table) AddValues

func (m *Table) AddValues(values []driver.Value)

func (*Table) Columns

func (m *Table) Columns() []string

func (*Table) Current

func (m *Table) Current() bool

Is this schema object current?

func (*Table) FieldNamesPositions

func (m *Table) FieldNamesPositions() map[string]int

List of Field Names and ordinal position in Column list

func (*Table) HasField

func (m *Table) HasField(name string) bool

func (*Table) SetColumns

func (m *Table) SetColumns(cols []string)

func (*Table) SetRefreshed

func (m *Table) SetRefreshed()

update the refreshed date to now

func (*Table) Since

func (m *Table) Since(dur time.Duration) bool

Is this schema object within time window described by @dur time ago ?

type TimeValue

type TimeValue time.Time

func (*TimeValue) MarshalJSON

func (m *TimeValue) MarshalJSON() ([]byte, error)

func (*TimeValue) Scan

func (m *TimeValue) Scan(src interface{}) error

func (TimeValue) Time

func (m TimeValue) Time() time.Time

func (*TimeValue) Unmarshal

func (m *TimeValue) Unmarshal(v interface{}) error

func (*TimeValue) UnmarshalJSON

func (m *TimeValue) UnmarshalJSON(data []byte) error

func (TimeValue) Value

func (m TimeValue) Value() (driver.Value, error)

type Upsert

type Upsert interface {
	Put(ctx context.Context, key Key, value interface{}) (Key, error)
	PutMulti(ctx context.Context, keys []Key, src interface{}) ([]Key, error)
}

Mutation interface for Put

  • assumes datasource understands key(s?)

type UrlValuesMsg

type UrlValuesMsg struct {
	// contains filtered or unexported fields
}

func NewUrlValuesMsg

func NewUrlValuesMsg(id uint64, body *ContextUrlValues) *UrlValuesMsg

func (*UrlValuesMsg) Body

func (m *UrlValuesMsg) Body() interface{}

func (*UrlValuesMsg) Id

func (m *UrlValuesMsg) Id() uint64

func (*UrlValuesMsg) String

func (m *UrlValuesMsg) String() string

type ValueContextWrapper

type ValueContextWrapper struct {
	*SqlDriverMessage
	// contains filtered or unexported fields
}

func NewValueContextWrapper

func NewValueContextWrapper(msg *SqlDriverMessage, cols map[string]*expr.Column) *ValueContextWrapper

func (*ValueContextWrapper) Get

func (m *ValueContextWrapper) Get(key string) (value.Value, bool)

func (*ValueContextWrapper) Row

func (m *ValueContextWrapper) Row() map[string]value.Value

func (*ValueContextWrapper) Ts

func (m *ValueContextWrapper) Ts() time.Time

type WhereFilter

type WhereFilter interface {
	DataSource
	Filter(expr.SqlStatement) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL