impalathing

package module
v0.0.0-...-dab448b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2020 License: MIT Imports: 14 Imported by: 3

README

Impalathing is a small Go wrapper library the thrift interface go Impala

It's based on hivething

Working on this you quickly realize that having strings deliminated by tabs is a ugly API... (That's the thrift side of things)

Usage

To add kerberos support this requires header files to build against the GSSAPI C library. They can be installed with:

Ubuntu: sudo apt-get install libkrb5-dev
MacOS: brew install homebrew/dupes/heimdal --without-x11
Debian: yum install -y krb5-devel

in order to use kerberos, you need an extra dependency

go get -tags kerberos github.com/beltran/gosasl

then

go build --tags=kerberos

before starting your application, you should kinit first, for example

kinit -k -t impala.keytab impala/host@DOMAIN.COM

package main

import (
    "log"
    "fmt"
    "time"
    "github.com/koblas/impalathing"
)

func main() {
    host := "impala-host"
    port := 21000

    con, err := impalathing.Connect(host, port)
    // if you use kerberos
    con, err := impalathing.Connect(host, port, impalathing.WithGSSAPISaslTransport()) 
    if err != nil {
        log.Fatal("Error connecting", err)
        return
    }

    query, err := con.Query("SELECT user_id, action, yyyymm FROM engagements LIMIT 10000")

    startTime := time.Now()
    total := 0
    for query.Next() {
        var (
            user_id     string
            action      string
            yyyymm      int
        )

        query.Scan(&user_id, &action, &yyyymm)
        total += 1

        fmt.Println(user_id, action)
    }

    log.Printf("Fetch %d rows(s) in %.2fs", total, time.Duration(time.Since(startTime)).Seconds())
}

Documentation

Index

Constants

View Source
const (
	START    = 1
	OK       = 2
	BAD      = 3
	ERROR    = 4
	COMPLETE = 5
)
View Source
const DEFAULT_MAX_LENGTH = 16384000

Variables

View Source
var (
	DefaultOptions = Options{PollIntervalSeconds: 0.1, BatchSize: 10000, ConnectionTimeout: 10000 * time.Millisecond}
)

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func Connect

func Connect(host string, port int, opts ...Option) (*Connection, error)

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) Query

func (c *Connection) Query(query string) (RowSet, error)

func (*Connection) QueryWithContext

func (c *Connection) QueryWithContext(ctx context.Context, query string) (RowSet, error)

type Option

type Option func(*Options)

func WithBatchSize

func WithBatchSize(bs int64) Option

func WithConnectionTimeout

func WithConnectionTimeout(timeout time.Duration) Option

func WithGSSAPISaslTransport

func WithGSSAPISaslTransport() Option

func WithPlainSaslTransport

func WithPlainSaslTransport(username, password string) Option

func WithPollInterval

func WithPollInterval(pollIntervalSeconds float64) Option

type Options

type Options struct {
	PollIntervalSeconds float64
	BatchSize           int64
	ConnectionTimeout   time.Duration
	SaslTransportConfig map[string]string
}

type RowSet

type RowSet interface {
	Columns() []string
	ColumnsWithContext(ctx context.Context) []string
	Next() bool
	NextWithContext(ctx context.Context) bool
	Scan(dest ...interface{}) error
	Poll() (*Status, error)
	PollWithContext(ctx context.Context) (*Status, error)
	Wait() (*Status, error)
	WaitWithContext(ctx context.Context) (*Status, error)
	FetchAll() []map[string]interface{}
	FetchAllWithContext(ctx context.Context) []map[string]interface{}
	MapScan(dest map[string]interface{}) error
}

A RowSet represents an asyncronous hive operation. You can Reattach to a previously submitted hive operation if you have a valid thrift client, and the serialized Handle() from the prior operation.

type Status

type Status struct {
	Error error
	// contains filtered or unexported fields
}

Represents job status, including success state and time the status was updated.

func (*Status) IsComplete

func (s *Status) IsComplete() bool

func (*Status) IsSuccess

func (s *Status) IsSuccess() bool

type TSaslTransport

type TSaslTransport struct {
	OpeningContext context.Context
	// contains filtered or unexported fields
}

TSaslTransport is a tranport thrift struct that uses SASL

func NewTSaslTransport

func NewTSaslTransport(trans thrift.TTransport, host string, configuration map[string]string) (*TSaslTransport, error)

NewTSaslTransport return a TSaslTransport

func (*TSaslTransport) Close

func (p *TSaslTransport) Close() (err error)

Close close a SASL transport connection

func (*TSaslTransport) Flush

func (p *TSaslTransport) Flush(ctx context.Context) (err error)

Flush the bytes in the buffer

func (*TSaslTransport) IsOpen

func (p *TSaslTransport) IsOpen() bool

IsOpen opens a SASL connection

func (*TSaslTransport) Open

func (p *TSaslTransport) Open() (err error)

Open check if a SASL transport connection is opened

func (*TSaslTransport) Read

func (p *TSaslTransport) Read(buf []byte) (l int, err error)

func (*TSaslTransport) RemainingBytes

func (p *TSaslTransport) RemainingBytes() uint64

RemainingBytes return the size of the unwrapped bytes

func (*TSaslTransport) Write

func (p *TSaslTransport) Write(buf []byte) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL