elasticsearch

package module
v0.0.0-...-2951773 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2020 License: Apache-2.0 Imports: 17 Imported by: 0

README

Sif ElasticSearch DataSource

An ElasticSearch (6/7) DataSource for Sif.

$ go get github.com/go-sif/sif-datasource-elasticsearch@master
$ go get github.com/elastic/go-elasticsearch/v7@7.x
# or
$ go get github.com/elastic/go-elasticsearch/v6@6.x

Usage

  1. Create a Schema which represents the fields you intend to extract from each document in the target index:
import (
	"github.com/go-sif/sif"
	"github.com/go-sif/sif/schema"
)

schema := schema.CreateSchema()
schema.CreateColumn("coords.x", &sif.Float64ColumnType{})
schema.CreateColumn("coords.z", &sif.Float64ColumnType{})
schema.CreateColumn("date", &sif.TimeColumnType{Format: "2006-01-02 15:04:05"})
// This datasource will automatically add the following columns to your schema:
//  - es._id (the document id)
//  - es._score (the document score)
  1. Then, define an ES query to filter data from the target index:
import (
	"github.com/go-sif/sif"
	"github.com/go-sif/sif/schema"
	es7api "github.com/elastic/go-elasticsearch/v7/esapi"
)

// ...
queryJSON := "" // no need to include index, size or scrolling
				// params, as they will be overridden by sif
// Full access to the SearchRequest object is provided for further query customization
req := &es7api.SearchRequest{Body: strings.NewReader(queryJSON)}
  1. Finally, define your configuration and create a DataFrame which can be manipulated with sif:
import (
	"github.com/go-sif/sif"
	"github.com/go-sif/sif/schema"
	esSource "github.com/go-sif/sif-datasource-elasticsearch"
	es7api "github.com/elastic/go-elasticsearch/v7/esapi"
	elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
)
// ...
conf := &esSource.DataSourceConf{
	PartitionSize: 128,
	Index:         "my_index_name",
	ScrollTimeout: 10 * time.Minute,
	ES7Query:      req,
	ES7Conf: &elasticsearch7.Config{
		Addresses: []string{"http://1.2.3.4:9200"},
	},
}

dataframe := esSource.CreateDataFrame(conf, schema)

Documentation

Overview

Package elasticsearch provides a DataSource which reads data from an ElasticSearch server

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateDataFrame

func CreateDataFrame(conf *DataSourceConf, schema sif.Schema) sif.DataFrame

CreateDataFrame is a factory for DataSources

Types

type DataSource

type DataSource struct {
	// contains filtered or unexported fields
}

DataSource is an ElasticSearch index containing documents which will be manipulating according to a DataFrame

func (*DataSource) Analyze

func (es *DataSource) Analyze() (sif.PartitionMap, error)

Analyze returns a PartitionMap, describing how the source file will be divided into Partitions

func (*DataSource) DeserializeLoader

func (es *DataSource) DeserializeLoader(bytes []byte) (sif.PartitionLoader, error)

DeserializeLoader creates a PartitionLoader for this DataSource from a serialized representation

func (*DataSource) IsStreaming

func (es *DataSource) IsStreaming() bool

IsStreaming returns false for ElasticSearch DataSources

type DataSourceConf

type DataSourceConf struct {
	PartitionSize int
	Index         string
	ScrollTimeout time.Duration
	ES6Query      *es6api.SearchRequest
	ES7Query      *es7api.SearchRequest
	ES6Conf       *elasticsearch6.Config
	ES7Conf       *elasticsearch7.Config
}

DataSourceConf configures an ElasticSearch DataSource

type PartitionLoader

type PartitionLoader struct {
	// contains filtered or unexported fields
}

PartitionLoader is capable of loading partitions of data from a file

func (*PartitionLoader) GobDecode

func (pl *PartitionLoader) GobDecode(in []byte) error

GobDecode deserializes a PartitionLoader

func (*PartitionLoader) GobEncode

func (pl *PartitionLoader) GobEncode() ([]byte, error)

GobEncode serializes a PartitionLoader

func (*PartitionLoader) Load

func (pl *PartitionLoader) Load(parser sif.DataSourceParser, widestInitialSchema sif.Schema) (sif.PartitionIterator, error)

Load is capable of loading partitions of data from a file

func (*PartitionLoader) ToString

func (pl *PartitionLoader) ToString() string

ToString returns a string representation of this PartitionLoader

type PartitionMap

type PartitionMap struct {
	// contains filtered or unexported fields
}

PartitionMap is an iterator producing a sequence of PartitionLoaders

func (*PartitionMap) HasNext

func (pm *PartitionMap) HasNext() bool

HasNext returns true iff there is another PartitionLoader remaining

func (*PartitionMap) Next

func (pm *PartitionMap) Next() sif.PartitionLoader

Next returns the next PartitionLoader for a file

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL