godatabend

package module
v0.5.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: Apache-2.0 Imports: 35 Imported by: 4

README

databend-go

Golang driver for databend cloud

Installation

go get github.com/datafuselabs/databend-go

Key features

  • Supports native Databend HTTP client-server protocol
  • Compatibility with database/sql

Examples

Connecting

Connection can be achieved either via a DSN string with the format https://user:password@host/database?<query_option>=<value> and sql/Open method such as https://username:password@tenant--warehousename.ch.datafusecloud.com/test.

import (
"database/sql"
_ "github.com/datafuselabs/databend-go"
)

func ConnectDSN() error {
dsn, cfg, err := getDSN()
if err != nil {
log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err)
}
conn, err := sql.Open("databend", dsn)
if err != nil {
return err
}
return conn.Ping()
}

Connection Settings

If you are using the databend cloud you can get the connection settings using the following way.

  • host - the connect host such as tenant--warehousename.ch.datafusecloud.com that you can get from databend cloud as follows: image

  • username/password - auth credentials that you can get from databend cloud connect page as above

  • database - select the current default database

Execution

Once a connection has been obtained, users can issue sql statements for execution via the Exec method.

    dsn, cfg, err := getDSN()
if err != nil {
log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err)
}
conn, err := sql.Open("databend", dsn)
if err != nil {
fmt.Println(err)
}
conn.Exec(`DROP TABLE IF EXISTS data`)
_, err = conn.Exec(`
    CREATE TABLE IF NOT EXISTS  data(
        Col1 TINYINT,
        Col2 VARCHAR
    )`)
if err != nil {
fmt.Println(err)
}
_, err = conn.Exec("INSERT INTO data VALUES (1, 'test-1')")

Batch Insert

If the create table SQL is CREATE TABLE test ( i64 Int64, u64 UInt64, f64 Float64, s String, s2 String, a16 Array(Int16), a8 Array(UInt8), d Date, t DateTime) you can use the next code to batch insert data:

package main

import (
	"database/sql"
	"fmt"

	_ "github.com/datafuselabs/databend-go"
)

func main() {
	conn, err := sql.Open("databend", "http://databend:databend@localhost:8000/default?sslmode=disable")
	tx, err := conn.Begin()
	if err != nil {
		fmt.Println(err)
	}
	batch, err := tx.Prepare(fmt.Sprintf("INSERT INTO %s VALUES", "test"))
	for i := 0; i < 10; i++ {
		_, err = batch.Exec(
			"1234",
			"2345",
			"3.1415",
			"test",
			"test2",
			"[4, 5, 6]",
			"[1, 2, 3]",
			"2021-01-01",
			"2021-01-01 00:00:00",
		)
	}
	err = tx.Commit()
}

Querying Row/s

Querying a single row can be achieved using the QueryRow method. This returns a *sql.Row, on which Scan can be invoked with pointers to variables into which the columns should be marshaled.

package main

import (
	"database/sql"
	"fmt"

	_ "github.com/datafuselabs/databend-go"
)

func main() {
	// create table data (col1 uint8, col2 string);
	// insert into data values(1,'col2');
	conn, err := sql.Open("databend", "http://databend:databend@localhost:8000/default?sslmode=disable")
	if err != nil {
		fmt.Println(err)
	}
	row := conn.QueryRow("SELECT * FROM data")
	var (
		col1 uint8
		col2 string
	)
	if err := row.Scan(&col1, &col2); err != nil {
		fmt.Println(err)
	}
	fmt.Println(col2)
}

Iterating multiple rows requires the Query method. This returns a *sql.Rows struct on which Next can be invoked to iterate through the rows. QueryContext equivalent allows passing of a context.

package main

import (
	"database/sql"
	"fmt"

	_ "github.com/datafuselabs/databend-go"
)

func main() {
	// create table data (col1 uint8, col2 string);
	// insert into data values(1,'col2');
	conn, err := sql.Open("databend", "http://databend:databend@localhost:8000/default?sslmode=disable")
	if err != nil {
		fmt.Println(err)
	}
	row, err := conn.Query("SELECT * FROM data")
	var (
		col1 uint8
		col2 string
	)
	for row.Next() {
		if err := row.Scan(&col1, &col2); err != nil {
			fmt.Println(err)
		}
		fmt.Println(col2)
	}
}

Type Mapping

The following table outlines the mapping between Databend types and Go types:

Databend Type Go Type
TINYINT int8
SMALLINT int16
INT int32
BIGINT int64
TINYINT UNSIGNED uint8
SMALLINT UNSIGNED uint16
INT UNSIGNED uint32
BIGINT UNSIGNED uint64
Float32 float32
Float64 float64
Bitmap string
Decimal decimal.Decimal
String string
Date time.Time
DateTime time.Time
Array(T) string
Tuple(T1, T2, ...) string
Variant string

Compatibility

  • If databend version >= v0.9.0 or later, you need to use databend-go version >= v0.3.0.

Documentation

Index

Constants

View Source
const (
	DatabendTenantHeader    = "X-DATABEND-TENANT"
	DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE"
	DatabendQueryIDHeader   = "X-DATABEND-QUERY-ID"
	Authorization           = "Authorization"
	WarehouseRoute          = "X-DATABEND-ROUTE"
	UserAgent               = "User-Agent"
)
View Source
const (
	EMPTY_FIELD_AS string = "empty_field_as"
	PURGE          string = "purge"
)
View Source
const DBSessionIDKey contextKey = "LOG_SESSION_ID"

DBSessionIDKey is context key of session id

View Source
const SFSessionUserKey contextKey = "LOG_USER"

SFSessionUserKey is context key of user id of a session

View Source
const (
	SSL_MODE_DISABLE = "disable"
)

Variables

View Source
var (
	ProvisionWarehouseTimeout = "ProvisionWarehouseTimeout"

	ErrDoRequest    = errors.New("DoReqeustFailed")
	ErrReadResponse = errors.New("ReadResponseFailed")
)
View Source
var (
	ErrPlaceholderCount = errors.New("databend: wrong placeholder count")
	ErrNoLastInsertID   = errors.New("no LastInsertId available")
	ErrNoRowsAffected   = errors.New("no RowsAffected available")
)
View Source
var LogKeys = [...]contextKey{DBSessionIDKey, SFSessionUserKey}

LogKeys these keys in context should be included in logging messages when using logger.WithContext

Functions

func Array

func Array(v interface{}) driver.Valuer

Array wraps slice or array into driver.Valuer interface to allow pass through it from database/sql

func DBCallerPrettyfier

func DBCallerPrettyfier(frame *runtime.Frame) (string, string)

DBCallerPrettyfier to provide base file name and function name from calling frame used in SFLogger

func Date

func Date(t time.Time) driver.Valuer

Date returns date for t

func Decimal128

func Decimal128(v interface{}, s int32) driver.Valuer

Decimal128 converts value to Decimal128 of precision S. The value can be a number or a string. The S (scale) parameter specifies the number of decimal places.

func Decimal32

func Decimal32(v interface{}, s int32) driver.Valuer

Decimal32 converts value to Decimal32 of precision S. The value can be a number or a string. The S (scale) parameter specifies the number of decimal places.

func Decimal64

func Decimal64(v interface{}, s int32) driver.Valuer

Decimal64 converts value to Decimal64 of precision S. The value can be a number or a string. The S (scale) parameter specifies the number of decimal places.

func DeregisterTLSConfig

func DeregisterTLSConfig(key string)

DeregisterTLSConfig removes the tls.Config associated with key.

func IP

func IP(i net.IP) driver.Valuer

IP returns compatible database format for net.IP

func IsAuthFailed

func IsAuthFailed(err error) bool

func IsNotFound

func IsNotFound(err error) bool

func IsProxyErr

func IsProxyErr(err error) bool

func Map

func Map(v interface{}) driver.Valuer

func NewAPIError

func NewAPIError(hint string, status int, respBuf []byte) error

func NewAPIHttpClientFromConfig added in v0.5.9

func NewAPIHttpClientFromConfig(cfg *Config) *http.Client

func RegisterTLSConfig

func RegisterTLSConfig(key string, config *tls.Config) error

RegisterTLSConfig registers a custom tls.Config to be used with sql.Open.

func SetLogger

func SetLogger(inLogger *DBLogger)

SetLogger set a new logger of SFLogger interface for godatabend

func Tuple

func Tuple(v interface{}) driver.Valuer

Tuple converts a struct into a tuple struct{A string, B int}{"a", 1} -> ("a", 1)

func UInt64

func UInt64(u uint64) driver.Valuer

UInt64 returns uint64

Types

type APIClient

type APIClient struct {
	SessionID string
	QuerySeq  int64

	WaitTimeSeconds      int64
	MaxRowsInBuffer      int64
	MaxRowsPerPage       int64
	PresignedURLDisabled bool
	EmptyFieldAs         string
	// contains filtered or unexported fields
}

func NewAPIClientFromConfig

func NewAPIClientFromConfig(cfg *Config) *APIClient

func (*APIClient) CloseQuery added in v0.5.9

func (c *APIClient) CloseQuery(ctx context.Context, response *QueryResponse) error

func (*APIClient) DoRetry added in v0.5.9

func (c *APIClient) DoRetry(f retry.RetryableFunc, t RequestType) error

func (*APIClient) GetPresignedURL

func (c *APIClient) GetPresignedURL(ctx context.Context, stage *StageLocation) (*PresignedResponse, error)

func (*APIClient) GetQueryID added in v0.5.8

func (c *APIClient) GetQueryID() string

func (*APIClient) InsertWithStage

func (c *APIClient) InsertWithStage(ctx context.Context, sql string, stage *StageLocation, fileFormatOptions, copyOptions map[string]string) (*QueryResponse, error)

func (*APIClient) KillQuery

func (c *APIClient) KillQuery(ctx context.Context, response *QueryResponse) error

func (*APIClient) NewDefaultCSVFormatOptions added in v0.5.6

func (c *APIClient) NewDefaultCSVFormatOptions() map[string]string

func (*APIClient) NewDefaultCopyOptions added in v0.5.4

func (c *APIClient) NewDefaultCopyOptions() map[string]string

func (*APIClient) NextQuery added in v0.5.8

func (c *APIClient) NextQuery()

func (*APIClient) PollQuery added in v0.5.9

func (c *APIClient) PollQuery(ctx context.Context, nextURI string) (*QueryResponse, error)

func (*APIClient) PollUntilQueryEnd added in v0.5.9

func (c *APIClient) PollUntilQueryEnd(ctx context.Context, resp *QueryResponse) (*QueryResponse, error)

func (*APIClient) QuerySync

func (c *APIClient) QuerySync(ctx context.Context, query string, args []driver.Value) (*QueryResponse, error)

func (*APIClient) StartQuery added in v0.5.9

func (c *APIClient) StartQuery(ctx context.Context, query string, args []driver.Value) (*QueryResponse, error)

func (*APIClient) UploadToStage

func (c *APIClient) UploadToStage(ctx context.Context, stage *StageLocation, input *bufio.Reader, size int64) error

func (*APIClient) UploadToStageByAPI

func (c *APIClient) UploadToStageByAPI(stage *StageLocation, input *bufio.Reader) error

func (*APIClient) UploadToStageByPresignURL

func (c *APIClient) UploadToStageByPresignURL(ctx context.Context, stage *StageLocation, input *bufio.Reader, size int64) error

type APIError

type APIError struct {
	RespBody   APIErrorResponseBody
	RespText   string
	StatusCode int
	Hint       string
}

func (APIError) Error

func (e APIError) Error() string

type APIErrorResponseBody

type APIErrorResponseBody struct {
	Error   string `json:"error"`
	Message string `json:"message"`
}

func RespBody

func RespBody(err error) APIErrorResponseBody

type AccessTokenLoader

type AccessTokenLoader interface {
	// LoadAccessToken is called whenever a new request is made to the server.
	LoadAccessToken(ctx context.Context, forceRotate bool) (string, error)
}

AccessTokenLoader is used on Bearer authentication. The token may have a limited lifetime, you can rotate your token by this interface.

type AuthMethod

type AuthMethod string
const (
	AuthMethodUserPassword AuthMethod = "userPassword"
	AuthMethodAccessToken  AuthMethod = "accessToken"
)

type Batch

type Batch interface {
	AppendToFile(v []driver.Value) error
	BatchInsert() error
}

type Config

type Config struct {
	Tenant    string // Tenant
	Warehouse string // Warehouse
	User      string // Username
	Password  string // Password (requires User)
	Database  string // Database name

	Role string // Role is the databend role you want to use for the current connection

	AccessToken       string
	AccessTokenFile   string // path to file containing access token, it can be used to rotate access token
	AccessTokenLoader AccessTokenLoader

	Host    string
	Timeout time.Duration
	/* Pagination params: WaitTimeSecs,  MaxRowsInBuffer, MaxRowsPerPage
	Pagination: critical conditions for each HTTP request to return (before all remaining result is ready to return)
	Related docs:https://databend.rs/doc/integrations/api/rest#query-request
	*/
	WaitTimeSecs    int64
	MaxRowsInBuffer int64
	MaxRowsPerPage  int64
	Location        *time.Location
	Debug           bool
	GzipCompression bool
	Params          map[string]string
	TLSConfig       string
	SSLMode         string

	// track the progress of query execution
	StatsTracker QueryStatsTracker

	// used on the storage which does not support presigned url like HDFS, local fs
	PresignedURLDisabled bool

	// Specifies the value that should be used when encountering empty fields, including both ,, and ,"",, in the CSV data being loaded into the table.
	// https://docs.databend.com/sql/sql-reference/file-format-options#empty_field_as
	// default is `string`
	// databend version should >= v1.2.345-nightly
	EmptyFieldAs        string
	EnableOpenTelemetry bool
}

Config is a set of configuration parameters

func NewConfig

func NewConfig() *Config

NewConfig creates a new config with default values

func ParseDSN

func ParseDSN(dsn string) (*Config, error)

ParseDSN parses the DSN string to a Config

func (*Config) AddParams

func (cfg *Config) AddParams(params map[string]string) (err error)

func (*Config) FormatDSN

func (cfg *Config) FormatDSN() string

FormatDSN formats the given Config into a DSN string which can be passed to the driver.

type ContextKey added in v0.5.2

type ContextKey string

type DBLogger

type DBLogger interface {
	rlog.Ext1FieldLogger
	SetLogLevel(level string) error
	WithContext(ctx context.Context) *rlog.Entry
	SetOutput(output io.Writer)
}

DBLogger Databend logger interface to expose FieldLogger defined in logrus

func CreateDefaultLogger

func CreateDefaultLogger() DBLogger

CreateDefaultLogger return a new instance of SFLogger with default config

func GetLogger

func GetLogger() DBLogger

GetLogger return logger that is not public

type DataField

type DataField struct {
	Name string `json:"name"`
	Type string `json:"type"`
}

type DataParser

type DataParser interface {
	Parse(io.RuneScanner) (driver.Value, error)
	Type() reflect.Type
}

DataParser implements parsing of a driver value and reporting its type.

func NewDataParser

func NewDataParser(t *TypeDesc, opt *DataParserOptions) (DataParser, error)

NewDataParser creates a new DataParser based on the given TypeDesc.

type DataParserOptions

type DataParserOptions struct {
	// Location describes default location for DateTime and Date field without Timezone argument.
	Location *time.Location
	// UseDBLocation if false: always use Location, ignore DateTime argument.
	UseDBLocation bool
}

DataParserOptions describes DataParser options. Ex.: Fields Location and UseDBLocation specify timezone options.

type DatabendConn

type DatabendConn struct {
	// contains filtered or unexported fields
}

func (*DatabendConn) Begin

func (dc *DatabendConn) Begin() (driver.Tx, error)

func (*DatabendConn) BeginTx added in v0.5.7

func (dc *DatabendConn) BeginTx(
	ctx context.Context,
	_ driver.TxOptions) (
	driver.Tx, error)

func (*DatabendConn) Close

func (dc *DatabendConn) Close() error

Close invalidates and potentially stops any current prepared statements and transactions, marking this connection as no longer in use.

func (*DatabendConn) ExecContext

func (dc *DatabendConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error)

func (*DatabendConn) ExecuteBatch added in v0.5.7

func (dc *DatabendConn) ExecuteBatch() (err error)

ExecuteBatch applies batch prepared statement if it exists

func (*DatabendConn) Ping added in v0.5.2

func (dc *DatabendConn) Ping(ctx context.Context) error

func (*DatabendConn) Prepare

func (dc *DatabendConn) Prepare(query string) (driver.Stmt, error)

func (*DatabendConn) PrepareContext

func (dc *DatabendConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error)

func (*DatabendConn) QueryContext

func (dc *DatabendConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error)

type DatabendDriver

type DatabendDriver struct {
	// contains filtered or unexported fields
}

DatabendDriver is a context of Go Driver

func (DatabendDriver) Open

func (d DatabendDriver) Open(dsn string) (driver.Conn, error)

Open creates a new connection.

func (DatabendDriver) OpenWithConfig

func (d DatabendDriver) OpenWithConfig(
	ctx context.Context,
	config Config) (
	driver.Conn, error)

OpenWithConfig creates a new connection with the given Config.

type Error

type Error struct {
	Code    int
	Message string
}

Error contains parsed information about server error

func (*Error) Error

func (e *Error) Error() string

Error implements the interface error

type FileAccessTokenData

type FileAccessTokenData struct {
	AccessToken string `toml:"access_token"`
}

type FileAccessTokenLoader

type FileAccessTokenLoader struct {
	// contains filtered or unexported fields
}

func NewFileAccessTokenLoader

func NewFileAccessTokenLoader(path string) *FileAccessTokenLoader

func (*FileAccessTokenLoader) LoadAccessToken

func (l *FileAccessTokenLoader) LoadAccessToken(ctx context.Context, forceRotate bool) (string, error)

try decode as toml, if not toml, return the plain key content

type PaginationConfig

type PaginationConfig struct {
	WaitTime        int64 `json:"wait_time_secs,omitempty"`
	MaxRowsInBuffer int64 `json:"max_rows_in_buffer,omitempty"`
	MaxRowsPerPage  int64 `json:"max_rows_per_page,omitempty"`
}

type PresignedResponse

type PresignedResponse struct {
	Method  string
	Headers map[string]string
	URL     string
}

type QueryError

type QueryError struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
	Kind    string `json:"kind"`
}

func (*QueryError) Error

func (e *QueryError) Error() string

type QueryIDGenerator

type QueryIDGenerator func() string

type QueryProgress

type QueryProgress struct {
	Bytes uint64 `json:"bytes"`
	Rows  uint64 `json:"rows"`
}

type QueryRequest

type QueryRequest struct {
	Session    *json.RawMessage  `json:"session,omitempty"`
	SQL        string            `json:"sql"`
	Pagination *PaginationConfig `json:"pagination,omitempty"`

	StageAttachment *StageAttachmentConfig `json:"stage_attachment,omitempty"`
}

type QueryResponse

type QueryResponse struct {
	ID      string           `json:"id"`
	Session *json.RawMessage `json:"session"`
	Schema  *[]DataField     `json:"schema"`
	Data    [][]string       `json:"data"`
	State   string           `json:"state"`
	Error   *QueryError      `json:"error"`
	Stats   *QueryStats      `json:"stats"`
	// TODO: Affect rows
	StatsURI string `json:"stats_uri"`
	FinalURI string `json:"final_uri"`
	NextURI  string `json:"next_uri"`
	KillURI  string `json:"kill_uri"`
}

func (*QueryResponse) ReadFinished added in v0.5.9

func (r *QueryResponse) ReadFinished() bool

type QueryStats

type QueryStats struct {
	RunningTimeMS  float64       `json:"running_time_ms"`
	ScanProgress   QueryProgress `json:"scan_progress"`
	WriteProgress  QueryProgress `json:"write_progress"`
	ResultProgress QueryProgress `json:"result_progress"`
}

type QueryStatsTracker

type QueryStatsTracker func(queryID string, stats *QueryStats)

QueryStatsTracker is a function that will be called when query stats are updated, it can be specified in the Config struct.

type RequestType added in v0.5.9

type RequestType int
const (
	Query RequestType = iota
	Page
	Final
	Kill
)

request type

type ServerInfo added in v0.5.7

type ServerInfo struct {
	Id        string `json:"id"`
	StartTime string `json:"start_time"`
}

type SessionState

type SessionState struct {
	Database       string    `json:"database,omitempty"`
	Role           string    `json:"role,omitempty"`
	SecondaryRoles *[]string `json:"secondary_roles,omitempty"`

	Settings map[string]string `json:"settings,omitempty"`

	// txn
	TxnState string `json:"txn_state,omitempty"`
}

type StageAttachmentConfig

type StageAttachmentConfig struct {
	Location          string            `json:"location"`
	FileFormatOptions map[string]string `json:"file_format_options,omitempty"`
	CopyOptions       map[string]string `json:"copy_options,omitempty"`
}

type StageLocation

type StageLocation struct {
	Name string
	Path string
}

func (*StageLocation) String

func (sl *StageLocation) String() string

type StaticAccessTokenLoader

type StaticAccessTokenLoader struct {
	AccessToken string
}

func NewStaticAccessTokenLoader

func NewStaticAccessTokenLoader(accessToken string) *StaticAccessTokenLoader

func (*StaticAccessTokenLoader) LoadAccessToken

func (l *StaticAccessTokenLoader) LoadAccessToken(ctx context.Context, forceRotate bool) (string, error)

type TypeDesc

type TypeDesc struct {
	Name string
	Args []*TypeDesc
}

TypeDesc describes a (possibly nested) data type returned by Databend.

func ParseTypeDesc

func ParseTypeDesc(s string) (*TypeDesc, error)

ParseTypeDesc parses the type description that Databend provides.

The grammar is quite simple:

desc
    name
    name()
    name(args)
args
    desc
    desc, args

Examples:

String
Nullable(Nothing)
Array(Tuple(Tuple(String, String), Tuple(String, UInt64)))

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL