clickhouse

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2021 License: MIT Imports: 13 Imported by: 1

README

go-clickhouse

Golang Yandex ClickHouse HTTP connector

ClickHouse manages extremely large volumes of data in a stable and sustainable manner. It currently powers Yandex.Metrica, world’s second largest web analytics platform, with over 13 trillion database records and over 20 billion events a day, generating customized reports on-the-fly, directly from non-aggregated data. This system was successfully implemented at CERN’s LHCb experiment to store and process metadata on 10bn events with over 1000 attributes per event registered in 2011.

Examples

Query rows
conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())
query := clickhouse.NewQuery("SELECT name, date FROM clicks")
iter := query.Iter(conn)
var (
    name string
    date string
)
for iter.Scan(&name, &date) {
    //
}
if iter.Error() != nil {
    log.Panicln(iter.Error())
}
Single insert
conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())
query, err := clickhouse.BuildInsert("clicks",
    clickhouse.Columns{"name", "date", "sourceip"},
    clickhouse.Row{"Test name", "2016-01-01 21:01:01", clickhouse.Func{"IPv4StringToNum", "192.0.2.192"}},
)
if err == nil {
    err = query.Exec(conn)
    if err == nil {
        //
    }
}
Auth and multiple insert
conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())
conn.AddParam("user", "{username}")
conn.AddParam("password", "{password}")

queryStr := `INSERT INTO clicks FORMAT TabSeparated
1	2017-09-27	10
2	2017-09-27	11
3	2017-09-27	12`

query := clickhouse.NewQuery(queryStr)
iter := query.Iter(conn)

if iter.Error() != nil {
    //
}
Fetch rows
conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())

queryStr := `SELECT visit_id, visit_number FROM clicks ORDER BY created_at DESC LIMIT 5`
query := clickhouse.NewQuery(queryStr)

type fetch struct {
    VisitId     string  `json:"visit_id"`
    VisitNumber int     `json:"visit_number"`
}
fetchObj := []fetch{}

err := query.ExecScan(dbConnection, &fetchObj)

if err != nil {
    //
}

/**
fetchObj:
([]fetch)
  0(fetch)
    VisitId(string) "ufifgdwp0y0wfiqp-7887"
    VisitNumber(int) 1
  1(fetch)
    VisitId(string) "ufifgdwp0y0wfiww-5356"
    VisitNumber(int) 1
  2(fetch)
    VisitId(string) "ufifgdwp0y0wfiwt-408"
    VisitNumber(int) 2
...
*/
Fetch rows and stat
conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())

queryStr := `SELECT visit_id, visit_number FROM clicks ORDER BY created_at DESC LIMIT 5`
query := clickhouse.NewQuery(queryStr)

type fetch struct {
    VisitId     string  `json:"visit_id"`
    VisitNumber int     `json:"visit_number"`
}
fetchObj := []fetch{}

err, stat := query.ExecScanStat(dbConnection, &fetchObj)

if err != nil {
    //
}

/**
fetchObj:
([]fetch)
  0(fetch)
    VisitId(string) "ufifgdwp0y0wfiqp-7887"
    VisitNumber(int) 1
  1(fetch)
    VisitId(string) "ufifgdwp0y0wfiww-5356"
    VisitNumber(int) 1
  2(fetch)
    VisitId(string) "ufifgdwp0y0wfiwt-408"
    VisitNumber(int) 2
...

stat:
(clickhouse.Stat)
   Rows(int) 25
   RowsBeforeLimitAtLeast(int) 73
   Stat(struct)
     Elapsed(float64) 0.003121279
     RowsRead(int) 74553
     BytesRead(int) 1043742
*/
External data for query processing

See documentation for details

conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())
query := clickhouse.NewQuery("SELECT Num, Name FROM extdata")
query.AddExternal("extdata", "Num UInt32, Name String", []byte("1	first\n2	second")) // tab separated


iter := query.Iter(conn)
var (
    num  int
    name string
)
for iter.Scan(&num, &name) {
    //
}
if iter.Error() != nil {
    log.Panicln(iter.Error())
}

Cluster

Cluster is useful if you have several servers with same Distributed table (master). In this case you can send requests to random master to balance load.

  • cluster.Check() pings all connections and filters active ones
  • cluster.ActiveConn() returns random active connection
  • cluster.OnCheckError() is called when any connection fails

Important: You should call method Check() at least once after initialization, but we recommend to call it continuously, so ActiveConn() will always return filtered active connection.

http := clickhouse.NewHttpTransport()
conn1 := clickhouse.NewConn("host1", http)
conn2 := clickhouse.NewConn("host2", http)

cluster := clickhouse.NewCluster(conn1, conn2)
cluster.OnCheckError(func (c *clickhouse.Conn) {
    log.Fatalf("Clickhouse connection failed %s", c.Host)
})
// Ping connections every second
go func() {
    for {
        cluster.Check()
        time.Sleep(time.Second)
    }
}()

Transport options

Timeout
t := clickhouse.NewHttpTransport()
t.Timeout = time.Second * 5

conn := clickhouse.NewConn("host", t)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsLeader added in v1.0.2

func IsLeader(table string, conn *Conn) bool

Types

type Array

type Array []interface{}

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(conn ...*Conn) *Cluster

func (*Cluster) ActiveConn

func (c *Cluster) ActiveConn() *Conn

func (*Cluster) Check

func (c *Cluster) Check()

func (*Cluster) IsDown

func (c *Cluster) IsDown() bool

func (*Cluster) OnCheckError

func (c *Cluster) OnCheckError(f PingErrorFunc)

type Column

type Column string

type Columns

type Columns []string

type Conn

type Conn struct {
	Host string
	// contains filtered or unexported fields
}

func NewConn

func NewConn(host string, t Transport) *Conn

func (*Conn) AddParam added in v1.0.2

func (c *Conn) AddParam(key string, value string)

func (*Conn) GetParams added in v1.0.2

func (c *Conn) GetParams() url.Values

func (*Conn) Ping

func (c *Conn) Ping() (err error)

func (*Conn) SetParams added in v1.0.2

func (c *Conn) SetParams(params url.Values)

type DbError

type DbError struct {
	// contains filtered or unexported fields
}

func (*DbError) Code

func (e *DbError) Code() int

func (*DbError) Error

func (e *DbError) Error() string

func (*DbError) Message

func (e *DbError) Message() string

func (*DbError) Response

func (e *DbError) Response() string

func (*DbError) String

func (e *DbError) String() string

type External added in v1.0.2

type External struct {
	Name      string
	Structure string
	Data      []byte
}

type Func added in v1.0.2

type Func struct {
	Name string
	Args interface{}
}

type HttpTransport

type HttpTransport struct {
	Timeout time.Duration
}

func NewHttpTransport

func NewHttpTransport() HttpTransport

func (HttpTransport) Exec

func (t HttpTransport) Exec(conn *Conn, q Query, readOnly bool) (res string, err error)

type Iter

type Iter struct {
	// contains filtered or unexported fields
}

func (*Iter) Error

func (r *Iter) Error() error

func (*Iter) Len added in v1.0.2

func (r *Iter) Len() int

func (*Iter) Scan

func (r *Iter) Scan(vars ...interface{}) bool

type PingErrorFunc

type PingErrorFunc func(*Conn)

type Query

type Query struct {
	Stmt string
	// contains filtered or unexported fields
}

func BuildInsert

func BuildInsert(tbl string, cols Columns, row Row) (Query, error)

func BuildMultiInsert

func BuildMultiInsert(tbl string, cols Columns, rows Rows) (Query, error)

func NewQuery

func NewQuery(stmt string, args ...interface{}) Query

func OptimizePartition added in v1.0.2

func OptimizePartition(table string, partition string) Query

func OptimizeTable added in v1.0.2

func OptimizeTable(table string) Query

func (*Query) AddExternal added in v1.0.2

func (q *Query) AddExternal(name string, structure string, data []byte)

Adding external dictionary

func (Query) AddParam added in v1.0.2

func (q Query) AddParam(name string, value string)

Additional parameters like: max_memory_usage, etc.

func (Query) Exec

func (q Query) Exec(conn *Conn) (err error)

func (Query) ExecGetJsonData added in v1.0.2

func (q Query) ExecGetJsonData(conn *Conn) ([]byte, error)

func (Query) ExecScan added in v1.0.2

func (q Query) ExecScan(conn *Conn, dataObj interface{}) error

func (Query) ExecScanStat added in v1.0.2

func (q Query) ExecScanStat(conn *Conn, dataObj interface{}) (Stat, error)

func (Query) Iter

func (q Query) Iter(conn *Conn) *Iter

func (Query) MergeParams added in v1.0.2

func (q Query) MergeParams(params url.Values)

type Row

type Row []interface{}

type Rows

type Rows []Row

type Stat added in v1.0.2

type Stat struct {
	Rows                   int `json:"rows"`
	RowsBeforeLimitAtLeast int `json:"rows_before_limit_at_least"`
	Stat                   struct {
		Elapsed   float64 `json:"elapsed"`
		RowsRead  int     `json:"rows_read"`
		BytesRead int     `json:"bytes_read"`
	} `json:"statistics"`
}

type StringArray added in v1.0.2

type StringArray []string

type Transport

type Transport interface {
	Exec(conn *Conn, q Query, readOnly bool) (res string, err error)
}

type VisitParamsString added in v1.0.2

type VisitParamsString map[string]interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL