drill

package module
v0.0.0-...-47a1d48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2020 License: Apache-2.0 Imports: 22 Imported by: 0

README

go-drill

codecov CI Test Smoke Test License

go-drill is a highly efficient Pure Go Client and Sql driver for Apache Drill. It differs from other clients / drivers by using the native Protobuf API to communicate with Drill instead of the REST API. As a result this becomes significatly more performant when dealing with large amounts of data. The raw bytes that are returned from Drill aren't copied, but are instead interpreted and used in place via slices for efficiency.

Currently it only supports either no authentication or authentication via SASL gssapi-krb5.

In addition, the sql driver expects to connect to a zookeeper quorum to find the drillbits, though you can connect directly to a drillbit via the Client.

Install

Client
go get -u github.com/zeroshade/go-drill
Driver
go get -u github.com/zeroshade/go-drill/driver

Usage

The driver can be used like any normal Golang SQL driver:

import (
  "strings"
  "database/sql"

  _ "github.com/zeroshade/go-drill/driver"
)

func main() {
  props := []string{
    "zk=zookeeper1,zookeeper2,zookeeper3",
    "auth=kerberos",
    "service=<krb_service_name>",
    "cluster=<clustername>",
  }

  db, err := sql.Open("drill", strings.Join(props, ";"))
}

Alternately, you can just use the client directly:

import (
  "context"

  "github.com/zeroshade/go-drill"
)

func main() {
  // create client, doesn't connect yet
  cl := drill.NewClient(drill.Options{/* fill out options */}, "zookeeper1", "zookeeper2", "zookeeper3")

  // connect the client
  err := cl.Connect(context.Background())
  // if there was any issue connecting, err will contain the error, otherwise will
  // be nil if successfully connected
}

Developing

Refreshing the Protobuf Definitions

A command is provided to easily refresh the protobuf definitions, provided you have protoc already on your PATH. The source should be in a directory structure like .../github.com/zeroshade/go-drill/ for development, allowing usage of go generate which will run the command.

Alternatively, the provided command drillProto can be used manually via go run ./internal/cmd/drillProto from the root of the source directory.

$ go run ./internal/cmd/drillProto -h
Drill Proto.

Usage:
        drillProto -h | --help
        drillProto download [-o PATH]
        drillProto fixup [-o PATH]
        drillProto gen [-o PATH] ROOTPATH
        drillProto runall [-o PATH] ROOTPATH

Arguments:
        ROOTPATH  location of the root output for the generated .go files

Options:
        -h --help           Show this screen.
        -o PATH --out PATH  .proto destination path [default: protobuf]

drillProto download will simply download the .proto files to the specified path from the apache drill github repo.

drillProto fixup adds the option go_package = "github.com/zeroshade/go-drill/internal/rpc/proto/..." to each file.

drillProto gen will generate the .pb.go files from the protobuf files, using the provided ROOTPATH as the root output where it will write the files in the structure of <ROOTPATH>/github.com/zeroshade/go-drill/internal/rpc/proto/....

drillProto runall does all of the steps in order as one command.

Regenerate the data vector handling

Running go generate ./internal/data will regenerate the .gen.go files from their templates.

Documentation

Overview

Package drill is a highly efficient Pure Go client and driver for Apache Drill.

A driver for the database/sql package is also provided via the driver subpackage. This can be used like so:

 import (
   "database/sql"

   _ "github.com/zeroshade/go-drill/driver"
 )

 func main() {
   props := []string{
     "zk=zookeeper1,zookeeper2,zookeeper3",
     "auth=kerberos",
     "service=<krb_service_name>",
     "cluster=<clustername>",
   }

   db, err := sql.Open("drill", strings.Join(props, ";"))
}

Also, currently logging of the internals can be turned on via the environment variable GO_DRILL_LOG_LEVEL. This uses github.com/rs/zerolog to do the logging so anything that is valid to pass to the zerolog.ParseLevel function is valid as a value for the environment variable.

Index

Examples

Constants

Variables

View Source
var (
	// This is returned when we get a query result status of Cancelled
	ErrQueryCancelled = errors.New("drill: query cancelled")
	// This is wrapped by the returned error if the query failed
	ErrQueryFailed = errors.New("drill: query failed")
	// This is returned if the query failed for some unknown reason
	ErrQueryUnknownState = errors.New("drill: query unknown state")
)

These error types are potentially returned by calls to Next. If the query fails, the ErrQueryFailed type will be wrapped by the returned error, allowing usage like so:

err := handle.Next()
if errors.Is(err, ErrQueryFailed) {
  err.Error() contains the actual error message
  // handle query failure
}

Functions

This section is empty.

Types

type Client

type Client struct {
	// Modifying the options after connecting does not affect the current connection
	// it will only affect future connections of this client.
	Opts Options

	ZkNodes []string
	// contains filtered or unexported fields
}

A Client is used for communicating to a drill cluster.

After creating a client via one of the NewDrillClient functions, one of the Connect functions can be called to actually connect to the cluster.

func NewClient

func NewClient(opts Options, zk ...string) *Client

NewClient initializes a Drill Client with the given options but does not actually connect yet. It also allows specifying the zookeeper cluster nodes here.

func NewDirectClient

func NewDirectClient(opts Options, host string, port int32) *Client

NewDirectClient initializes a Drill Client which connects to an endpoint directly rather than relying on ZooKeeper for finding drill bits.

func (*Client) Close

func (d *Client) Close() error

Close the connection and cleanup the background goroutines

func (*Client) Connect

func (d *Client) Connect(ctx context.Context) error

Connect attempts to use the current ZooKeeper cluster in order to find a drill bit to connect to. This will also populate the internal listing of drill bits from zookeeper.

As with ConnectEndpoint, the context provided will be passed to DialContext and will not be stored in the client.

func (*Client) ConnectEndpoint

func (d *Client) ConnectEndpoint(ctx context.Context, e Drillbit) error

ConnectEndpoint connects to the provided endpoint directly rather than looking for drillbits via zookeeper. This is also used by the normal connect setup to connect to the desired drillbit once it has been chosen from the zookeeper information.

The provided context object will be passed to DialContext to control the deadlines for the socket connection, it will not be saved into the client.

func (*Client) ConnectWithZK

func (d *Client) ConnectWithZK(ctx context.Context, zkNode ...string) error

ConnectWithZK overrides the current stored zookeeper cluster in the client, and uses the passed list of nodes to find a drillbit. This will replace the stored zookeeper nodes in the client with the new set provided.

As with ConnectEndpoint, the context provided will be passed to DialContext and will not be stored in the client.

func (*Client) ExecuteStmt

func (d *Client) ExecuteStmt(hndl PreparedHandle) (DataHandler, error)

ExecuteStmt runs the passed prepared statement against the cluster and returns a handle to the results in the same way that SubmitQuery does.

func (*Client) GetCatalogs

func (d *Client) GetCatalogs(pattern string, escape *string) ([]*user.CatalogMetadata, error)

GetCatalogs uses the given pattern to search and return the catalogs available on the server. For drill, this is always only "DRILL". The syntax of the pattern is equivalent to using a LIKE sql expression. If there is no need to escape characters in the search filter, pass nil for the second argument, otherwise it should point to a string consisting of the characters used for escaping in the pattern.

func (*Client) GetColumns

func (d *Client) GetColumns(catalogPattern, schemaPattern, tablePattern, columnPattern string, escape *string) ([]*user.ColumnMetadata, error)

GetColumns returns the metadata for all the columns from all the tables which fit the provided filter patterns.

The syntax for the filter pattern is the same as for GetCatalogs.

func (*Client) GetEndpoint

func (d *Client) GetEndpoint() Drillbit

GetEndpoint returns the currently configured endpoint that the client either is connected to or will connect to if Connect is called (in the case of a Direct connection client). Returns nil for a client using Zookeeper that hasn't connected yet, as the endpoint is only determined when connecting in that case.

func (*Client) GetMetadata

func (d *Client) GetMetadata() (*user.ServerMeta, error)

GetMetadata returns a structure consisting of all of the Drill Server metadata including what sql keywords are supported, escape characters, max lengths etc.

func (*Client) GetSchemas

func (d *Client) GetSchemas(catalogPattern, schemaPattern string, escape *string) ([]*user.SchemaMetadata, error)

GetSchemas returns all the schemas which fit the filter patterns provided.

The syntax for the filter pattern is the same as for GetCatalogs.

func (*Client) GetTables

func (d *Client) GetTables(catalogPattern, schemaPattern, tablePattern string, escape *string, tableTypes ...string) ([]*user.TableMetadata, error)

GetTables returns the metadata for all the tables which fit the filter patterns provided and are of the table types passed in.

The syntax for the filter pattern is the same as for GetCatalogs.

func (*Client) NewConnection

func (d *Client) NewConnection(ctx context.Context) (Conn, error)

NewConnection will use the stored zookeeper quorum nodes and drill bit information to find the next drill bit to connect to in order to spread out the load.

The client returned from this will already be connected using the same options and zookeeper cluster as the current Client, just picking a different endpoint to connect to.

func (*Client) Ping

func (d *Client) Ping(ctx context.Context) error

Ping sends a ping to the server via this connection and waits for a Pong response. Returns database/sql/driver.ErrBadConn if it fails or nil if it succeeds.

func (*Client) PrepareQuery

func (d *Client) PrepareQuery(plan string) (PreparedHandle, error)

PrepareQuery creates a prepared sql statement and returns a handle to it. This handle can be used with any client connected to the same cluster in order to actually execute it with ExecuteStmt.

func (*Client) SubmitQuery

func (d *Client) SubmitQuery(t QueryType, plan string) (DataHandler, error)

SubmitQuery submits the specified query and query type returning a handle to the results This only blocks long enough to receive a Query ID from the cluster, it does not wait for the results to start coming in. The result handle can be used to do that.

If the query fails, this will not error but rather you'd retrieve that failure from the result handle itself.

type Conn

type Conn interface {
	ConnectEndpoint(context.Context, Drillbit) error
	Connect(context.Context) error
	ConnectWithZK(context.Context, ...string) error
	GetEndpoint() Drillbit
	Ping(context.Context) error
	SubmitQuery(QueryType, string) (DataHandler, error)
	PrepareQuery(string) (PreparedHandle, error)
	ExecuteStmt(PreparedHandle) (DataHandler, error)
	NewConnection(context.Context) (Conn, error)
	Close() error
}

A Conn represents a single connection to a drill bit. This interface is useful for things consuming the Client to maintain a separation so that it is easy to mock out for testing.

type DataHandler

type DataHandler interface {
	Next() (*RecordBatch, error)
	Cancel()
	GetCols() []string
	GetRecordBatch() *RecordBatch
	Close() error
}

A DataHandler is an object that allows iterating through record batches as the data comes in, or cancelling a running query.

type DataVector

type DataVector data.DataVector

type Drillbit

type Drillbit interface {
	GetAddress() string
	GetUserPort() int32
}

A Drillbit represents a single endpoint in the cluster

type NullableDataVector

type NullableDataVector data.NullableDataVector

type Options

type Options struct {
	// the default Schema to use
	Schema string
	// true if expected to use encryption for communication
	SaslEncrypt bool
	// the HOST portion to use for the spn to authenticate with, if _HOST or
	// empty, will use the address of the drillbit that is connected to
	ServiceHost string
	// the krb service name to use for authentication
	ServiceName string
	// what authentication mechanism to use, currently only supports kerberos
	// or no auth
	Auth string
	// the Drill clusters name which is used by ZooKeeper to store the endpoint
	// information
	ClusterName string
	// use this instead of ClusterName to fully specify the Zookeeper path instead
	// of using the /drill prefix
	ZKPath string
	// whether or not the server should support complex types such as List
	SupportComplexTypes bool
	// what Application Name to use for connecting to the server
	ApplicationName string
	// the username to authenticate as
	User string
	// Password to use for PLAIN auth
	Passwd string
	// the heartbeatfrequency to use, if nil then will use the default (15 seconds)
	// set to 0 to disable it.
	HeartbeatFreq *time.Duration
}

Options for a Drill Connection

type PreparedHandle

type PreparedHandle interface{}

A PreparedHandle is an opaque handle for a Prepared Statement.

This does not contain a reference to the client it originally came from as it can be passed to any valid drill client in order to execute it as long as it is connected to the same server cluster. This is because the prepared statement information is stored on the server, this object contains the server handle needed to execute the statement.

Keep in mind that Apache Drill *does not* support parameters in prepared statements

type QueryType

type QueryType shared.QueryType

type RecordBatch

type RecordBatch struct {
	Def  *shared.RecordBatchDef
	Vecs []DataVector
}

A RecordBatch represents the data and meta information for one group of rows.

How to Interpret

The Vecs are the columns in a result set as Drill returns data in a column oriented fashion. Each datavector has been processed from the results Drill returned and may or may not be able to contain nulls depending on whether or not the column was considered OPTIONAL. Every vector can return its reflect.Type, and can retrieve data for an index as an interface{}. To get the raw value without needing an interface you'll need to cast away from the DataVector interface to the underlying type.

The Def object describes the structure of this batch, giving the Record Count, or the AffectedRowsCount. It also allows access to the meta data for the serialized fields, as per the Protobuf definitions. Through the fields we can get the types, the column names, and information about how it was serialized.

Future Looking

Eventually this will likely hide the protobuf implementation behind an interface, but for now it was easier to just expose the protobuf definitions.

type ResultHandle

type ResultHandle struct {
	// contains filtered or unexported fields
}

A ResultHandle is an opaque handle for a given result set, implementing the DataHandler interface.

It contains a channel over which the client will send the data as it comes in allowing results to be streamed as they are retrieved. It also contains a handle to the original client that the result came from.

func (*ResultHandle) Cancel

func (r *ResultHandle) Cancel()

Cancel will cancel the currently calculating query. Calls to Next can still be used to drain any remaining record batches.

func (*ResultHandle) Close

func (r *ResultHandle) Close() error

Close the channel and remove the query handler from the client

func (*ResultHandle) GetCols

func (r *ResultHandle) GetCols() []string

GetCols grabs the column names from the record batch definition.

This will potentially block on the data channel if we have not yet recieved the first record batch. If the query failed or otherwise then this will return an empty slice.

Example
// using sample nation data set from Drill repo
rh := getTestResultHandle()

cols := rh.GetCols()
for _, c := range cols {
	fmt.Println(c)
}
Output:

N_NATIONKEY
N_NAME
N_REGIONKEY
N_COMMENT

func (*ResultHandle) GetRecordBatch

func (r *ResultHandle) GetRecordBatch() *RecordBatch

GetRecordBatch will return the current record batch, if we have not yet recieved one it will attempt to check the channel and block for the first batch. If this returns nil then that means there is no data and calling Next will return the status.

Example
// using sample nation data set from Drill repo
rh := getTestResultHandle()

// at the start the record batch is nil, so the first call will grab from the channel
rb := rh.GetRecordBatch()
fmt.Printf("Num Cols: %d\n", len(rb.Def.Field))
fmt.Printf("Rows in this Batch: %d\n", rb.Def.GetRecordCount())

for idx, f := range rb.Def.Field {
	fmt.Printf("Col %d: %s\n", idx, f.NamePart.GetName())
}

// we didn't call Next, so GetRecordBatch still returns the same batch we're on
batch := rh.GetRecordBatch()
fmt.Println(rb == batch)
Output:

Num Cols: 4
Rows in this Batch: 9
Col 0: N_NATIONKEY
Col 1: N_NAME
Col 2: N_REGIONKEY
Col 3: N_COMMENT
true

func (*ResultHandle) Next

func (r *ResultHandle) Next() (*RecordBatch, error)

Next checks the data channel for the next record batch, dropping the reference to the current record batch.

Returns

Next will return nil if it is successful in retrieving another record batch. Otherwise it will return io.EOF if the query completed successfully and there are no more record batches.

If there are no more record batches and the query did not complete successfully, it will return either an error wrapping ErrQueryFailed, or one of the other error types.

Example
// using sample nation data set from Drill repo
rh := getTestResultHandle()

// iterate the batches
batch, err := rh.Next()
for ; err == nil; batch, err = rh.Next() {
	for i := int32(0); i < batch.Def.GetRecordCount(); i++ {
		for _, v := range batch.Vecs {
			val := v.Value(uint(i))
			switch t := val.(type) {
			case []byte:
				fmt.Print("|", string(t))
			default:
				fmt.Print("|", t)
			}
		}
		fmt.Println("|")
	}
}

fmt.Println(err == io.EOF)
Output:

|0|ALGERIA|0| haggle. carefully final deposits detect slyly agai|
|1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon|
|2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special |
|3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold|
|4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d|
|5|ETHIOPIA|0|ven packages wake quickly. regu|
|6|FRANCE|3|refully final requests. regular, ironi|
|7|GERMANY|3|l platelets. regular accounts x-ray: unusual, regular acco|
|8|INDIA|2|ss excuses cajole slyly across the packages. deposits print aroun|
|9|INDONESIA|2| slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull|
|10|IRAN|4|efully alongside of the slyly final dependencies. |
|11|IRAQ|4|nic deposits boost atop the quickly final requests? quickly regula|
|12|JAPAN|2|ously. final, express gifts cajole a|
|13|JORDAN|4|ic deposits are blithely about the carefully regular pa|
|14|KENYA|0| pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t|
|15|MOROCCO|0|rns. blithely bold courts among the closely regular packages use furiously bold platelets?|
|16|MOZAMBIQUE|0|s. ironic, unusual asymptotes wake blithely r|
|17|PERU|1|platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun|
|18|CHINA|2|c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos|
|19|ROMANIA|3|ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account|
|20|SAUDI ARABIA|4|ts. silent requests haggle. closely express packages sleep across the blithely|
|21|VIETNAM|2|hely enticingly express accounts. even, final |
|22|RUSSIA|3| requests against the platelets use never according to the quickly regular pint|
|23|UNITED KINGDOM|3|eans boost carefully special requests. accounts are. carefull|
|24|UNITED STATES|1|y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be|
true
Example (Cancelled)
dc := make(chan *queryData)
rh := &ResultHandle{dataChannel: dc}

go func() {
	defer close(dc)
	dc <- &queryData{typ: int32(user.RpcType_QUERY_RESULT), msg: cancelledResult}
}()

_, err := rh.Next()
if err == ErrQueryCancelled {
	fmt.Println(err.Error())
}
Output:

drill: query cancelled
Example (Queryfailed)
dc := make(chan *queryData)
rh := &ResultHandle{dataChannel: dc}

go func() {
	defer close(dc)
	dc <- &queryData{typ: int32(user.RpcType_QUERY_RESULT), msg: failureResult}
}()

_, err := rh.Next()
if errors.Is(err, ErrQueryFailed) {
	fmt.Println(err.Error())
}

// calling Next again with the closed channel just returns the same error
_, err2 := rh.Next()
fmt.Println(err.Error() == err2.Error())
Output:

drill: query failed: Failure Error Test
true

Directories

Path Synopsis
internal
data
Code generated by numeric_vec_typemap.gen.go.tmpl.
Code generated by numeric_vec_typemap.gen.go.tmpl.
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL