proton

package module
v2.0.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2023 License: Apache-2.0 Imports: 27 Imported by: 1

README

Proton Go Driver

Introduction

Proton is a unified streaming and historical data processing engine in a single binary. The historical store is built based on ClickHouse.

This project provides go driver to interact with Proton, the code is based on https://github.com/ClickHouse/clickhouse-go.

Installation

To get started, you need to have Go installed. Then, import the Proton Database Go Driver using Go Modules:

go get github.com/timeplus-io/proton-go-driver/v2

Quick Start

  1. Run proton with docker, docker run -d -p 8463:8463 --pull always --name proton ghcr.io/timeplus-io/proton:develop
  2. Run following Golang code
package main

import (
	"fmt"
	"github.com/timeplus-io/proton-go-driver/v2"
)

func main() {
	conn := proton.OpenDB(&proton.Options{
		Addr: []string{"127.0.0.1:8463"},
		Auth: proton.Auth{
			Username: "default",
			Password: "",
		},
	})
	var value int
	conn.QueryRow("SELECT 300").Scan(&value)
	fmt.Println(value)
}

above code should return 1 , which shows that everything is working fine now.

Connecting to Proton Database

To connect to the Proton database, create a connection using the following code:

conn := proton.OpenDB(&proton.Options{
    Addr: []string{"127.0.0.1:8463"},
    Auth: proton.Auth{
        Database: "default",
        Username: "default",
        Password: "",
    },
    DialTimeout: 5 * time.Second,
    Compression: &proton.Compression{
        proton.CompressionLZ4,
    },
})
conn.SetMaxIdleConns(5)
conn.SetMaxOpenConns(10)
conn.SetConnMaxLifetime(time.Hour)
ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) {
    if rand.Float32() < 0.3 {
        log.Println("progress:", p)
    }
}))

Create Stream

Before working with streaming data, you need to initialize it. Here's an example for creating a stream:

if _, err := conn.ExecContext(ctx, "DROP STREAM IF EXISTS car"); err != nil {
    return err
}
if _, err := conn.ExecContext(ctx, "CREATE STREAM IF NOT EXISTS car(id int64, speed float64)"); err != nil {
    return err
}

Batch Insertion

scope, err := conn.Begin()
if err != nil {
    log.Fatal(err)
}
batch, err := scope.PrepareContext(ctx, "INSERT INTO car (id, speed, _tp_time) values")
for i := 0; i < 20; i++ {
    speed := rand.Float64()*20 + 50
    _, err := batch.Exec(id, speed, time.Now())
    if err != nil {
        log.Fatal(err)
    }
    time.Sleep(time.Duration(100) * time.Millisecond)
}
err = scope.Commit()
if err != nil {
    log.Fatal(err)
}

Streaming Query

const QueryDDL = `SELECT id, avg(speed), window_start, window_end
    FROM session(car, 1h, [speed >= 60, speed < 60))
    GROUP BY id, window_start, window_end`
conn, ctx := getConnection(context.Background())
ctx, cancel := context.WithCancel(ctx)
rows, err := conn.QueryContext(ctx, QueryDDL)
if err != nil {
    log.Fatal(err)
}
defer rows.Close()
go func() {
    time.Sleep(time.Duration(20) * time.Second)
    cancel()
}()
for rows.Next() {
    var car SpeedingCarRcd
    if err := rows.Scan(&car.Id, &car.Speed, &car.Start, &car.End); err != nil {
        log.Fatal(err)
    }
    log.Printf("%+v", car)
}
err = rows.Err()
if err != nil {
    log.Fatal(err)
}

[!NOTE] To cancel a streaming query, you need to use the cancel function returned by context.WithCancel.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBatchAlreadySent               = errors.New("proton: batch has already been sent")
	ErrAcquireConnTimeout             = errors.New("proton: acquire conn timeout. you can increase the number of max open conn or the dial timeout")
	ErrUnsupportedServerRevision      = errors.New("proton: unsupported server revision")
	ErrBindMixedNamedAndNumericParams = errors.New("proton [bind]: mixed named and numeric parameters")
)
View Source
var CompressionLZ4 compress.Method = compress.LZ4

Functions

func Context

func Context(parent context.Context, options ...QueryOption) context.Context

func Named

func Named(name string, value interface{}) driver.NamedValue

func Open

func Open(opt *Options) (driver.Conn, error)

func OpenDB

func OpenDB(opt *Options) *sql.DB

Types

type Auth

type Auth struct {
	Database string
	Username string
	Password string
}

type Compression

type Compression struct {
	Method compress.Method
}

type Conn

type Conn = driver.Conn

type ConnOpenStrategy

type ConnOpenStrategy uint8
const (
	ConnOpenInOrder ConnOpenStrategy = iota
	ConnOpenRoundRobin
)

type Exception

type Exception = proto.Exception

type Log

type Log struct {
	Time      time.Time
	TimeMicro uint32
	Hostname  string
	QueryID   string
	ThreadID  uint64
	Priority  int8
	Source    string
	Text      string
}

type OpError

type OpError struct {
	Op         string
	ColumnName string
	Err        error
}

func (*OpError) Error

func (e *OpError) Error() string

type Options

type Options struct {
	TLS              *tls.Config
	Addr             []string
	Auth             Auth
	DialContext      func(ctx context.Context, addr string) (net.Conn, error)
	Debug            bool
	Settings         Settings
	Compression      *Compression
	DialTimeout      time.Duration // default 1 second
	MaxOpenConns     int           // default MaxIdleConns + 5
	MaxIdleConns     int           // default 5
	ConnMaxLifetime  time.Duration // default 1 hour
	ConnOpenStrategy ConnOpenStrategy
}

func ParseDSN

func ParseDSN(dsn string) (*Options, error)

type ProfileEvent

type ProfileEvent struct {
	Hostname    string
	CurrentTime time.Time
	ThreadID    uint64
	Type        string
	Name        string
	Value       int64
}

type ProfileInfo

type ProfileInfo = proto.ProfileInfo

type Progress

type Progress = proto.Progress

type QueryOption

type QueryOption func(*QueryOptions) error

func WithExternalTable

func WithExternalTable(t ...*external.Table) QueryOption

func WithLogs

func WithLogs(fn func(*Log)) QueryOption

func WithProfileEvents

func WithProfileEvents(fn func([]ProfileEvent)) QueryOption

func WithProfileInfo

func WithProfileInfo(fn func(*ProfileInfo)) QueryOption

func WithProgress

func WithProgress(fn func(*Progress)) QueryOption

func WithQueryID

func WithQueryID(queryID string) QueryOption

func WithQuotaKey

func WithQuotaKey(quotaKey string) QueryOption

func WithSettings

func WithSettings(settings Settings) QueryOption

func WithSpan

func WithSpan(span trace.SpanContext) QueryOption

func WithStdAsync

func WithStdAsync(wait bool) QueryOption

type QueryOptions

type QueryOptions struct {
	// contains filtered or unexported fields
}

type ServerVersion

type ServerVersion = proto.ServerHandshake

type Settings

type Settings map[string]interface{}

Directories

Path Synopsis
examples
lib
cityhash102
* COPY from https://github.com/zentures/cityhash/
* COPY from https://github.com/zentures/cityhash/
io
issues/485
Licensed to ClickHouse, Inc.
Licensed to ClickHouse, Inc.
std

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL