bigquery

package module
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2024 License: Apache-2.0 Imports: 28 Imported by: 8

README

BigQuery SQL Driver

BigQuery database/sql driver in Go. GoDoc

This library is compatible with Go 1.17+

Please refer to CHANGELOG.md if you encounter breaking changes.

This library provides fast implementation of the BigQuery Client as a database/sql drvier.

DSN Data Source Name

The BigQuery driver accepts the following DSN

  • 'bigquery://projectID/[location/]datasetID?queryString'

    Where queryString can optionally configure the following option:

    • credURL: (url encoded) local location or URL supported by Scy
    • credKey: optional (url encoded) Scy secret manager key or key location
    • credID: Scy resource secret ID
    • credJSON: rawURL base64 encoded cred JSON (not recommended)
    • endpoint
    • userAgent
    • apiKey
    • quotaProject
    • scopes

Since this library uses Google Cloud API you can pass your credentials via GOOGLE_APPLICATION_CREDENTIALS environment variable.

Usage:

package main

import (
	"database/sql"
	"fmt"
	"context"
	"log"
	_ "github.com/viant/bigquery"
)

type Participant struct {
	Name   string
	Splits []float64
}

func main() {

	db, err := sql.Open("bigquery", "bigquery://myProjectID/mydatasetID")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	SQL := `WITH races AS (
		  SELECT "800M" AS race,
		    [STRUCT("Ben" as name, [23.4, 26.3] as splits), 
		 	 STRUCT("Frank" as name, [23.4, 26.3] as splits)
			]
		       AS participants)
		SELECT
		  race,
		  participant
		FROM races r
		CROSS JOIN UNNEST(r.participants) as participant`

	rows, err := db.QueryContext(context.Background(), SQL)
	if err != nil {
		log.Fatal(err)
	}
	defer rows.Close()
	for rows.Next() {
		if err := rows.Err(); err != nil {
			log.Fatal(err)
		}
		var race string
		var participant Participant
		err = rows.Scan(&race, &participant)
		fmt.Printf("fetched: %v %+v\n", race, participant)
		if err != nil {
			log.Fatal(err)
		}
	}
}

Data Ingestion (Load/Stream)

This driver implements LOAD/STREAM operation with the following SQL:

Loading data

To load data register a reader for supported source format, followed by LOAD SQL.

LOAD 'Reader:<SOURCE_FORMAT>:<READER_ID>' DATA INTO TABLE myproject.mydataset.mytable

The following snippet register READER_ID

 err := reader.Register(readerID, myReader)

The following code loads CSV data

package mypkg

import (
	"context"
	"database/sql"
	"fmt"
	"github.com/viant/bigquery/reader"
	"log"
	"strings"
)

func ExampleOfCSVLoad() {
	projectID := "my-gcp-project"
	db, err := sql.Open("bigquery", fmt.Sprintf("bigquery://%v/test", projectID))
	if err != nil {
		log.Fatal(err)
	}
	readerID := "123456"
	csvReader := strings.NewReader("ID,Name,Desc\n1,Name 1,Test\n2,Name 2,Test 2")
	err = reader.Register(readerID, csvReader)
	if err != nil {
		log.Fatal(err)
	}
	defer reader.Unregister(readerID)
	SQL := fmt.Sprintf(`LOAD 'Reader:csv:%v' DATA INTO TABLE mytable`, readerID)
	result, err := db.ExecContext(context.TODO(), SQL)
	if err != nil {
		log.Fatal(err)
	}
	affected, _ := result.RowsAffected()
	fmt.Printf("loaded: %v rows", affected)
}
Loading application data

The following code loads CSV data

package mypkg

import (
	"context"
	"database/sql"
	"fmt"
	"github.com/viant/bigquery/reader"
	"github.com/viant/sqlx/io/load/reader/parquet"
	"github.com/google/uuid"
	"log"
)

type Record struct {
	ID     int     `parquet:"id,plain,optional"`
	Name   string  `parquet:"name,plain,optional"`
	Active bool    `parquet:"active,plain,optional"`
	Amount float64 `parquet:"amount,plain,optional"`
}

func ExampleOfAppDataLoad() {
	projectID := "my-gcp-project"
	db, err := sql.Open("bigquery", fmt.Sprintf("bigquery://%v/test", projectID))
	if err != nil {
		log.Fatal(err)
	}
	var records = []*Record{
		{
			ID:     1,
			Name:   "record 1",
			Amount: 12.2,
		},
		{
			ID:     2,
			Name:   "record 2",
			Amount: 12.3,
		},
	}
	readerID := uuid.New().String()
    parquetReader, err := parquet.NewReader(records)
    if err != nil {
        log.Fatal(err)
    }
	err = reader.Register(readerID, parquetReader)
	if err != nil {
		log.Fatal(err)
	}
	defer reader.Unregister(readerID)
	SQL := fmt.Sprintf(`LOAD 'Reader:parquet:%v' DATA INTO TABLE mytable`, readerID)
	result, err := db.ExecContext(context.TODO(), SQL)
	if err != nil {
		log.Fatal(err)
	}
	affected, _ := result.RowsAffected()
	fmt.Printf("loaded: %v rows", affected)
}
Load option control

To customize Load you can inline JobConfigurationLoad you can as JSON as a hint LOAD 'Reader:<SOURCE_FORMAT>:<READER_ID>' /*+ <HINT> +*/ DATA INTO TABLE mytable for example:

LOAD 'Reader:CSV:201F973D-9BAB-4E0A-880F-7830B876F210' /*+ {
    "AllowJaggedRows": true,
    "AllowQuotedNewlines":true
  } +*/  DATA INTO TABLE mytable

Benchmark

Benchmark runs 3 times the following queries:

  • primitive types:
   SELECT state,gender,year,name,number 
   FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10000
  • complex type (repeated string/repeated record)
   SELECT  t.publication_number, t.inventor, t.assignee, t.description_localized 
   FROM `patents-public-data.patents.publications_202105` t ORDER BY 1 LIMIT 1000

In both case database/sql driver is faster and allocate way less memory than GCP BigQuery API Client

goos: darwin
goarch: amd64
pkg: github.com/viant/bigquery/bench
cpu: Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Benchmark_Primitive_GCPClient
database/gcp: primitive types 3.918388531s
Benchmark_Primitive_GCPClient-16               1    3918442974 ns/op    42145144 B/op      830647 allocs/op
Benchmark_Primitive_SQLDriver
database/sql: primitive types 2.880091637s
Benchmark_Primitive_SQLDriver-16               1    2880149491 ns/op    22301848 B/op      334547 allocs/op
Benchmark_Complex_GCPClient
database/gcp: structured types 3.303497894s
Benchmark_Complex_GCPClient-16               1    3303548761 ns/op    11551216 B/op      154660 allocs/op
Benchmark_Complex_SQLDriver
database/sql: structured types 2.690012577s
Benchmark_Complex_SQLDriver-16               1    2690056675 ns/op     6643176 B/op       71562 allocs/op
PASS

License

The source code is made available under the terms of the Apache License, Version 2, as stated in the file LICENSE.

Individual files may be made available under their own specific license, all compatible with Apache License, Version 2. Please see individual files for details.

Credits and Acknowledgements

Library Author: Adrian Witas

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExampleRowsNext

func ExampleRowsNext()

ExampleRowsNext fetching rows example

func NewConnector

func NewConnector(cfg *Config) (driver.Connector, error)

NewConnector returns new driver.Connector.

func SetOptions

func SetOptions(opts ...option.ClientOption)

SetOptions sets global client options

Types

type Config

type Config struct {
	CredentialsFile string // Username
	Endpoint        string
	APIKey          string
	CredentialJSON  []byte
	CredentialsURL  string
	CredID          string //scy secret resource ID
	CredentialsKey  string
	UserAgent       string
	ProjectID       string // project ID
	DatasetID       string
	QuotaProject    string
	Scopes          []string
	Location        string
	App             string
	url.Values
}

Config is a configuration parsed from a DSN string. If a new Config is created instead of being parsed from a DSN string, the NewConfig function should be used, which sets default values.

func NewConfig

func NewConfig() *Config

NewConfig creates a new Config and sets default values.

func ParseDSN

func ParseDSN(dsn string) (*Config, error)

ParseDSN parses the DSN string to a Config

type Driver

type Driver struct{}

Driver is exported to make the driver directly accessible. In general the driver is used via the database/sql package.

func (Driver) Open

func (d Driver) Open(dsn string) (driver.Conn, error)

Open new Connection. See https://github.com/viant/bigquery#dsn-data-source-name for how the DSN string is formatted

func (Driver) OpenConnector

func (d Driver) OpenConnector(dsn string) (driver.Connector, error)

OpenConnector implements driver.DriverContext.

func (*Driver) SetGCPClientOptions

func (d *Driver) SetGCPClientOptions(options ...option.ClientOption)

SetGCPClientOptions sets global gcp options

type NamedValues

type NamedValues []driver.NamedValue

NamedValues represents name values slice

func (NamedValues) QueryParameter

func (v NamedValues) QueryParameter() ([]*bigquery.QueryParameter, error)

QueryParameter converts value to query parameters

type Rows

type Rows struct {
	// contains filtered or unexported fields
}

Rows abstraction implements database/sql driver.Rows interface

func (*Rows) Close

func (r *Rows) Close() error

Close closes rows

func (*Rows) ColumnTypeDatabaseTypeName

func (r *Rows) ColumnTypeDatabaseTypeName(index int) string

ColumnTypeDatabaseTypeName returns column database type name

func (*Rows) ColumnTypeNullable

func (r *Rows) ColumnTypeNullable(index int) (nullable, ok bool)

ColumnTypeNullable returns if column is nullable

func (*Rows) ColumnTypeScanType

func (r *Rows) ColumnTypeScanType(index int) reflect.Type

ColumnTypeScanType returns column scan type

func (*Rows) Columns

func (r *Rows) Columns() []string

Columns returns query columns

func (*Rows) Next

func (r *Rows) Next(dest []driver.Value) error

Next moves to next row

type Statement

type Statement struct {
	// contains filtered or unexported fields
}

Statement abstraction implements database/sql driver.Statement interface

func (*Statement) CheckNamedValue

func (s *Statement) CheckNamedValue(n *driver.NamedValue) error

CheckNamedValue checks name values

func (*Statement) Close

func (s *Statement) Close() error

Close closes statement

func (*Statement) Exec

func (s *Statement) Exec(args []driver.Value) (driver.Result, error)

Exec executes statements

func (*Statement) ExecContext

func (s *Statement) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error)

ExecContext executes statements

func (*Statement) NumInput

func (s *Statement) NumInput() int

NumInput returns numinput

func (*Statement) Query

func (s *Statement) Query(args []driver.Value) (driver.Rows, error)

Query runs query

func (*Statement) QueryContext

func (s *Statement) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error)

QueryContext runs query

type Values

type Values []driver.Value

Values represents value slice

func (Values) QueryParameter

func (v Values) QueryParameter() ([]*bigquery.QueryParameter, error)

QueryParameter converts value to query parameters

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL