ch

package module
v0.0.0-...-bd62367 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2022 License: Apache-2.0 Imports: 26 Imported by: 0

README

ch

Low level TCP ClickHouse client and protocol implementation in Go. Designed for very fast data block streaming with low network, cpu and memory overhead.

NB: No pooling, reconnects and not goroutine-safe by default, only single connection. Use clickhouse-go for high-level database/sql-compatible client, pooling for ch-go is available as chpool package.

ClickHouse is an open-source, high performance columnar OLAP database management system for real-time analytics using SQL.

go get github.com/ClickHouse/ch-go@latest

Example

package main

import (
  "context"
  "fmt"

  "github.com/ClickHouse/ch-go"
  "github.com/ClickHouse/ch-go/proto"
)

func main() {
  ctx := context.Background()
  c, err := ch.Dial(ctx, ch.Options{Address: "localhost:9000"})
  if err != nil {
    panic(err)
  }
  var (
    numbers int
    data    proto.ColUInt64
  )
  if err := c.Do(ctx, ch.Query{
    Body: "SELECT number FROM system.numbers LIMIT 500000000",
    Result: proto.Results{
      {Name: "number", Data: &data},
    },
    // OnResult will be called on next received data block.
    OnResult: func(ctx context.Context, b proto.Block) error {
      numbers += len(data)
      return nil
    },
  }); err != nil {
    panic(err)
  }
  fmt.Println("numbers:", numbers)
}
393ms 0.5B rows  4GB  10GB/s 1 job
874ms 2.0B rows 16GB  18GB/s 4 jobs
Results

To stream query results, set Result and OnResult fields of Query. The OnResult will be called after Result is filled with received data block.

The OnResult is optional, but query will fail if more than single block is received, so it is ok to solely set the Result if only one row is expected.

Automatic result inference
var result proto.Results
q := ch.Query{
  Body:   "SELECT * FROM table",
  Result: result.Auto(),
}
Single result with column name inference
var res proto.ColBool
q := ch.Query{
  Body:   "SELECT v FROM test_table",
  Result: proto.ResultColumn{Data: &res},
}
Writing data

See examples/insert.

For table

CREATE TABLE test_table_insert
(
    ts                DateTime64(9),
    severity_text     Enum8('INFO'=1, 'DEBUG'=2),
    severity_number   UInt8,
    body              String,
    name              String,
    arr               Array(String)
) ENGINE = Memory

We prepare data block for insertion as follows:

var (
	body      proto.ColStr
	name      proto.ColStr
	sevText   proto.ColEnum
	sevNumber proto.ColUInt8

	ts  = new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano) // DateTime64(9)
	arr = new(proto.ColStr).Array()                                   // Array(String)
	now = time.Date(2010, 1, 1, 10, 22, 33, 345678, time.UTC)
)

// Append 10 rows to initial data block.
for i := 0; i < 10; i++ {
	body.AppendBytes([]byte("Hello"))
	ts.Append(now)
	name.Append("name")
	sevText.Append("INFO")
	sevNumber.Append(10)
	arr.Append([]string{"foo", "bar", "baz"})
}

input := proto.Input{
	{Name: "ts", Data: ts},
	{Name: "severity_text", Data: &sevText},
	{Name: "severity_number", Data: sevNumber},
	{Name: "body", Data: body},
	{Name: "name", Data: name},
	{Name: "arr", Data: arr},
}
Single data block
if err := conn.Do(ctx, ch.Query{
	// Or "INSERT INTO test_table_insert (ts, severity_text, severity_number, body, name, arr) VALUES"
	// Or input.Into("test_table_insert")
	Body: "INSERT INTO test_table_insert VALUES",
	Input: input,
}); err != nil {
	panic(err)
}
Stream data
// Stream data to ClickHouse server in multiple data blocks.
var blocks int
if err := conn.Do(ctx, ch.Query{
	Body:  input.Into("test_table_insert"), // helper that generates INSERT INTO query with all columns
	Input: input,

	// OnInput is called to prepare Input data before encoding and sending
	// to ClickHouse server.
	OnInput: func(ctx context.Context) error {
		// On OnInput call, you should fill the input data.
		//
		// NB: You should reset the input columns, they are
		// not reset automatically.
		//
		// That is, we are re-using the same input columns and
		// if we will return nil without doing anything, data will be
		// just duplicated.

		input.Reset() // calls "Reset" on each column

		if blocks >= 10 {
			// Stop streaming.
			//
			// This will also write tailing input data if any,
			// but we just reset the input, so it is currently blank.
			return io.EOF
		}

		// Append new values:
		for i := 0; i < 10; i++ {
			body.AppendBytes([]byte("Hello"))
			ts.Append(now)
			name.Append("name")
			sevText.Append("DEBUG")
			sevNumber.Append(10)
			arr.Append([]string{"foo", "bar", "baz"})
		}

		// Data will be encoded and sent to ClickHouse server after returning nil.
		// The Do method will return error if any.
		blocks++
		return nil
	},
}); err != nil {
	panic(err)
}

Features

  • OpenTelemetry support
  • No reflection or interface{}
  • Generics (go1.18) for Array[T], LowCardinaliy[T], Map[K, V], Nullable[T]
  • Reading or writing ClickHouse dumps in Native format
  • Column-oriented design that operates directly with blocks of data
    • Dramatically more efficient
    • Up to 100x faster than row-first design around sql
    • Up to 700x faster than HTTP API
    • Low memory overhead (data blocks are slices, i.e. continuous memory)
    • Highly efficient input and output block streaming
    • As close to ClickHouse as possible
  • Structured query execution telemetry streaming
  • LZ4, ZSTD or None (just checksums for integrity check) compression
  • External data support
  • Rigorously tested
    • Windows, Mac, Linux (also x86)
    • Unit tests for encoding and decoding
      • ClickHouse Server in Go for faster tests
      • Golden files for all packets, columns
      • Both server and client structures
      • Ensuring that partial read leads to failure
    • End-to-end tests on multiple LTS and stable versions
    • Fuzzing

Supported types

  • UInt8, UInt16, UInt32, UInt64, UInt128, UInt256
  • Int8, Int16, Int32, Int64, Int128, Int256
  • Date, Date32, DateTime, DateTime64
  • Decimal32, Decimal64, Decimal128, Decimal256 (only low-level raw values)
  • IPv4, IPv6
  • String, FixedString(N)
  • UUID
  • Array(T)
  • Enum8, Enum16
  • LowCardinality(T)
  • Map(K, V)
  • Bool
  • Tuple(T1, T2, ..., Tn)
  • Nullable(T)
  • Point
  • Nothing, Interval

Generics

Most columns implement proto.ColumnOf[T] generic constraint:

type ColumnOf[T any] interface {
	Column
	Append(v T)
	Row(i int) T
}

For example, ColStr (and ColStr.LowCardinality) implements ColumnOf[string]. Same for arrays: new(proto.ColStr).Array() implements ColumnOf[[]string], column of []string values.

Array

Generic for Array(T)

// Array(String)
arr := proto.NewArray[string](new(proto.ColStr))
// Or
arr := new(proto.ColStr).Array()
q := ch.Query{
  Body:   "SELECT ['foo', 'bar', 'baz']::Array(String) as v",
  Result: arr.Results("v"),
}
// Do ...
arr.Row(0) // ["foo", "bar", "baz"]

Dumps

Reading

Use proto.Block.DecodeRawBlock on proto.NewReader:

func TestDump(t *testing.T) {
	// Testing decoding of Native format dump.
	//
	// CREATE TABLE test_dump (id Int8, v String)
	//   ENGINE = MergeTree()
	// ORDER BY id;
	//
	// SELECT * FROM test_dump
	//   ORDER BY id
	// INTO OUTFILE 'test_dump_native.raw' FORMAT Native;
	data, err := os.ReadFile(filepath.Join("_testdata", "test_dump_native.raw"))
	require.NoError(t, err)
	var (
		dec    proto.Block
		ids    proto.ColInt8
		values proto.ColStr
	)
	require.NoError(t, dec.DecodeRawBlock(
		proto.NewReader(bytes.NewReader(data)),
		proto.Results{
			{Name: "id", Data: &ids},
			{Name: "v", Data: &values},
		}),
	)
}
Writing

Use proto.Block.EncodeRawBlock on proto.Buffer with Rows and Columns set:

func TestLocalNativeDump(t *testing.T) {
	ctx := context.Background()
	// Testing clickhouse-local.
	var v proto.ColStr
	for _, s := range data {
		v.Append(s)
	}
	buf := new(proto.Buffer)
	b := proto.Block{Rows: 2, Columns: 2}
	require.NoError(t, b.EncodeRawBlock(buf, []proto.InputColumn{
		{Name: "title", Data: v},
		{Name: "data", Data: proto.ColInt64{1, 2}},
	}), "encode")

	dir := t.TempDir()
	inFile := filepath.Join(dir, "data.native")
	require.NoError(t, os.WriteFile(inFile, buf.Buf, 0600), "write file")

	cmd := exec.Command("clickhouse-local", "local",
		"--logger.console",
		"--log-level", "trace",
		"--file", inFile,
		"--input-format", "Native",
		"--output-format", "JSON",
		"--query", "SELECT * FROM table",
	)
	out := new(bytes.Buffer)
	errOut := new(bytes.Buffer)
	cmd.Stdout = out
	cmd.Stderr = errOut

	t.Log(cmd.Args)
	require.NoError(t, cmd.Run(), "run: %s", errOut)
	t.Log(errOut)

	v := struct {
		Rows int `json:"rows"`
		Data []struct {
			Title string `json:"title"`
			Data  int    `json:"data,string"`
		}
	}{}
	require.NoError(t, json.Unmarshal(out.Bytes(), &v), "json")
	assert.Equal(t, 2, v.Rows)
	if assert.Len(t, v.Data, 2) {
		for i, r := range []struct {
			Title string `json:"title"`
			Data  int    `json:"data,string"`
		}{
			{"Foo", 1},
			{"Bar", 2},
		} {
			assert.Equal(t, r, v.Data[i])
		}
	}
}

TODO

  • Types
    • Decimal(P, S) API
    • JSON
    • SimpleAggregateFunction
    • AggregateFunction
    • Nothing
    • Interval
    • Nested
    • Geo types
      • Point
      • Ring
      • Polygon
      • MultiPolygon
  • Improved i/o timeout handling for reading packets from server
    • Close connection on context cancellation in all cases
    • Ensure that reads can't block forever

Reference

License

Apache License 2.0

Documentation

Overview

Package ch implements ClickHouse client.

Index

Examples

Constants

View Source
const (
	DefaultDatabase    = "default"
	DefaultUser        = "default"
	DefaultHost        = "127.0.0.1"
	DefaultPort        = 9000
	DefaultDialTimeout = 1 * time.Second
)

Defaults for connection.

Variables

View Source
var ErrClosed = errors.New("client is closed")

ErrClosed means that client was already closed.

Functions

func CompressionStrings

func CompressionStrings() []string

CompressionStrings returns a slice of all String values of the enum

func IsErr

func IsErr(err error, codes ...proto.Error) bool

IsErr reports whether err is error with provided exception codes.

func IsException

func IsException(err error) bool

IsException reports whether err is Exception.

func Parameters

func Parameters(m map[string]any) []proto.Parameter

Parameters is helper for building Query.Parameters.

EXPERIMENTAL.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements ClickHouse binary protocol client on top of single TCP connection.

func Connect

func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error)

Connect performs handshake with ClickHouse server and initializes application level connection.

func Dial

func Dial(ctx context.Context, opt Options) (c *Client, err error)

Dial dials requested address and establishes TCP connection to ClickHouse server, performing handshake.

func (*Client) Close

func (c *Client) Close() error

Close closes underlying connection and frees all resources, rendering Client to unusable state.

func (*Client) Do

func (c *Client) Do(ctx context.Context, q Query) (err error)

Do performs Query on ClickHouse server.

func (*Client) IsClosed

func (c *Client) IsClosed() bool

IsClosed indicates that connection is closed.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) (err error)

Ping server.

Do not call concurrently with Do.

func (*Client) ServerInfo

func (c *Client) ServerInfo() proto.ServerHello

ServerInfo returns server information.

type Compression

type Compression byte

Compression setting.

Trade bandwidth for CPU.

const (
	// CompressionDisabled disables compression. Lowest CPU overhead.
	CompressionDisabled Compression = iota
	// CompressionLZ4 enables LZ4 compression for data. Medium CPU overhead.
	CompressionLZ4
	// CompressionZSTD enables ZStandard compression. High CPU overhead.
	CompressionZSTD
	// CompressionNone uses no compression but data has checksums.
	CompressionNone
)

func CompressionString

func CompressionString(s string) (Compression, error)

CompressionString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func CompressionValues

func CompressionValues() []Compression

CompressionValues returns all values of the enum

func (Compression) IsACompression

func (i Compression) IsACompression() bool

IsACompression returns "true" if the value is listed in the enum definition. "false" otherwise

func (Compression) String

func (i Compression) String() string

type CorruptedDataErr

type CorruptedDataErr struct {
	Actual    city.U128
	Reference city.U128
	RawSize   int
	DataSize  int
}

CorruptedDataErr means that provided hash mismatch with calculated.

func (*CorruptedDataErr) Error

func (c *CorruptedDataErr) Error() string

type Dialer

type Dialer interface {
	DialContext(ctx context.Context, network, address string) (net.Conn, error)
}

A Dialer dials using a context.

type Exception

type Exception struct {
	Code    proto.Error
	Name    string
	Message string
	Stack   string
	Next    []Exception // non-nil only for top exception
}

Exception is server-side error.

func AsException

func AsException(err error) (*Exception, bool)

AsException finds first *Exception in err chain.

func (*Exception) Error

func (e *Exception) Error() string

func (*Exception) IsCode

func (e *Exception) IsCode(codes ...proto.Error) bool

type Log

type Log = proto.Log

type Options

type Options struct {
	Logger      *zap.Logger // defaults to Nop.
	Address     string      // 127.0.0.1:9000
	Database    string      // "default"
	User        string      // "default"
	Password    string      // blank string by default
	QuotaKey    string      // blank string by default
	Compression Compression // disabled by default
	Settings    []Setting   // none by default

	Dialer      Dialer        // defaults to net.Dialer
	DialTimeout time.Duration // defaults to 1s
	TLS         *tls.Config   // no TLS is used by default

	ProtocolVersion int // force protocol version, optional

	// Additional OpenTelemetry instrumentation that will capture query body
	// and other parameters.
	//
	// Note: OpenTelemetry context propagation works without this option too.
	OpenTelemetryInstrumentation bool
	TracerProvider               trace.TracerProvider
	MeterProvider                metric.MeterProvider
	// contains filtered or unexported fields
}

Options for Client. Zero value is valid.

type ProfileEvent

type ProfileEvent = proto.ProfileEvent

type ProfileEventType

type ProfileEventType = proto.ProfileEventType

type Query

type Query struct {
	// Body of query, like "SELECT 1".
	Body string
	// QueryID is ID of query, defaults to new UUIDv4.
	QueryID string
	// QuotaKey of query, optional.
	QuotaKey string

	// Input columns for INSERT operations.
	Input proto.Input
	// OnInput is called to allow ingesting more data to Input.
	//
	// The io.EOF reports that no more input should be ingested.
	//
	// Optional, single block is ingested from Input if not provided,
	// but query will fail if Input is set but has zero rows.
	OnInput func(ctx context.Context) error

	// Result columns for SELECT operations.
	Result proto.Result
	// OnResult is called when Result is filled with result block.
	//
	// Optional, but query will fail of more than one block is received
	// and no OnResult is provided.
	OnResult func(ctx context.Context, block proto.Block) error

	// OnProgress is optional progress handler. The progress value contain
	// difference, so progress should be accumulated if needed.
	OnProgress func(ctx context.Context, p proto.Progress) error
	// OnProfile is optional handler for profiling data.
	OnProfile func(ctx context.Context, p proto.Profile) error
	// OnProfileEvent is optional handler for profiling event stream data.
	OnProfileEvent func(ctx context.Context, e ProfileEvent) error
	// OnLog is optional handler for server log entry.
	OnLog func(ctx context.Context, l Log) error

	// Settings are optional query-scoped settings. Can override client settings.
	Settings []Setting

	// EXPERIMENTAL: parameters for query.
	Parameters []proto.Parameter

	// Secret is optional inter-server per-cluster secret for Distributed queries.
	//
	// See https://clickhouse.com/docs/en/engines/table-engines/special/distributed/#distributed-clusters
	Secret string

	// InitialUser is optional initial user for Distributed queries.
	InitialUser string

	// ExternalData is optional data for server to load.
	//
	// https://clickhouse.com/docs/en/engines/table-engines/special/external-data/
	ExternalData []proto.InputColumn
	// ExternalTable name. Defaults to _data.
	ExternalTable string
}

Query to ClickHouse.

Example (MultipleInputColumns)
var (
	body      proto.ColStr
	name      proto.ColStr
	sevText   proto.ColEnum
	sevNumber proto.ColUInt8

	ts  = new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano)
	arr = new(proto.ColStr).Array() // Array(String)
	now = time.Date(2010, 1, 1, 10, 22, 33, 345678, time.UTC)
)
// Append 10 rows.
for i := 0; i < 10; i++ {
	body.AppendBytes([]byte("Hello"))
	ts.Append(now)
	name.Append("name")
	sevText.Values = append(sevText.Values, "INFO")
	sevNumber = append(sevNumber, 10)
	arr.Append([]string{"foo", "bar", "baz"})
}
input := proto.Input{
	{Name: "ts", Data: ts},
	{Name: "severity_text", Data: &sevText},
	{Name: "severity_number", Data: sevNumber},
	{Name: "body", Data: body},
	{Name: "name", Data: name},
	{Name: "arr", Data: arr},
}
fmt.Println(input.Into("logs"))
Output:

INSERT INTO "logs" ("ts","severity_text","severity_number","body","name","arr") VALUES

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is basic ClickHouse server.

func NewServer

func NewServer(opt ServerOptions) *Server

NewServer returns new ClickHouse Server.

func (*Server) Serve

func (s *Server) Serve(ln net.Listener) error

Serve connections on net.Listener.

type ServerConn

type ServerConn struct {
	// contains filtered or unexported fields
}

ServerConn wraps Server connection.

func (*ServerConn) Handle

func (c *ServerConn) Handle() error

Handle connection.

type ServerOptions

type ServerOptions struct {
	Logger   *zap.Logger
	Timezone *time.Location
	OnError  func(err error)
}

ServerOptions wraps possible Server configuration.

type Setting

type Setting struct {
	Key, Value string
	Important  bool
}

Setting to send to server.

func SettingInt

func SettingInt(k string, v int) Setting

SettingInt returns Setting with integer value v.

Directories

Path Synopsis
Package chpool is a connection pool for ch.
Package chpool is a connection pool for ch.
Package cht implements ClickHouse testing utilities, primarily end to end.
Package cht implements ClickHouse testing utilities, primarily end to end.
Package compress implements compression support.
Package compress implements compression support.
examples
internal
cmd/app
Package app is helper for simple cli apps.
Package app is helper for simple cli apps.
e2e
Package e2e implements end to end testing utilities.
Package e2e implements end to end testing utilities.
gold
Package gold implements golden files.
Package gold implements golden files.
version
Package version resolves current module version.
Package version resolves current module version.
ztest
Package ztest provides a variety of helpers for testing log output.
Package ztest provides a variety of helpers for testing log output.
Package otelch provide OpenTelemetry instrumentation for go-faster/ch.
Package otelch provide OpenTelemetry instrumentation for go-faster/ch.
Package proto implements ClickHouse wire protocol.
Package proto implements ClickHouse wire protocol.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL