clickhouse_buffer

package module
v0.0.0-...-4512317 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2022 License: MIT Imports: 6 Imported by: 0

README

  • Queries batching
  • CollapsingMergeTree queries managing

Usage:

MergeTree example

buffer := NewMergeTreeBuffer(
    dbConnection, // *sql.DB
    "table_name",
    time.Second*10, // clickhouse insert timeout
    time.Second, // buffer flush period
    200_000, // buffer size
    metrics.NewMetrics("example_subsystem"), // metrics interface
    zap.NewExample(),
)
defer buffer.Close()
defer buffer.Flush()

go func() {
    for err := range buffer.Errors() {
        fmt.Printf("write error: %s\n", err.Error())
    }
}()

err := buffer.Insert(table_buffer.Query{
    "first_column":  "column_value",
    "second column": 125,
    "third_column":  true,
})
if err != nil {
    return err
}

return nil

CollapsingMergeTree example:

buffer, err := NewCollapsingMergeTreeBuffer(
    context.Background(),
    dbConnection, // *sql.DB
    "table_name",
    []string{"primary_column"}, // primary key
    "sys_sign", // sign field name
    time.Second*10, // clickhouse insert timeout
    time.Second, // buffer flush period
    200_000, // buffer size
    metrics.NewMetrics("example_subsystem"), // metrics interface
    zap.NewExample(),
)
if err != nil {
    return err
}

defer buffer.Close()
defer buffer.Flush()

go func() {
    for err := range buffer.Errors() {
        fmt.Printf("write error: %s\n", err.Error())
    }
}()

err = buffer.Insert(table_buffer.Query{
    "primary_column": "column_value",
    "second column":  "value",
})
if err != nil {
    return err
}

err = buffer.Update(table_buffer.Query{
    "primary_column": "column_value",
    "second column":  "new_value",
})
if err != nil {
    return err
}

err = buffer.Delete(table_buffer.Query{
    "primary_column":  "column_value",
})
if err != nil {
    return err
}

return nil

This code will generate the following inserts:

INSERT INTO test_table(primary_column,column,sys_sign) VALUES("column_value", "value", 1)
INSERT INTO test_table(primary_column,column,sys_sign) VALUES("column_value", "value", -1)
INSERT INTO test_table(primary_column,column,sys_sign) VALUES("column_value", "new_value", 1)
INSERT INTO test_table(primary_column,column,sys_sign) VALUES("column_value", "new_value", -1)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CollapsingMergeTreeBuffer

type CollapsingMergeTreeBuffer struct {
	// contains filtered or unexported fields
}

func NewCollapsingMergeTreeBuffer

func NewCollapsingMergeTreeBuffer(
	ctx context.Context,
	db *sql.DB,
	tableName string,
	indexFields []string,
	signField string,
	insertTimeout time.Duration,
	flushPeriod time.Duration,
	bufferSize uint64,
	metrics metrics.Metrics,
	logger *zap.Logger,
) (*CollapsingMergeTreeBuffer, error)

func (*CollapsingMergeTreeBuffer) Close

func (c *CollapsingMergeTreeBuffer) Close()

func (*CollapsingMergeTreeBuffer) Delete

func (c *CollapsingMergeTreeBuffer) Delete(queries ...table_buffer.Query) error

func (*CollapsingMergeTreeBuffer) Errors

func (c *CollapsingMergeTreeBuffer) Errors() <-chan error

func (*CollapsingMergeTreeBuffer) Flush

func (c *CollapsingMergeTreeBuffer) Flush()

func (*CollapsingMergeTreeBuffer) Insert

func (c *CollapsingMergeTreeBuffer) Insert(queries ...table_buffer.Query) error

func (*CollapsingMergeTreeBuffer) InsertOrUpdate

func (c *CollapsingMergeTreeBuffer) InsertOrUpdate(queries ...table_buffer.Query) error

func (*CollapsingMergeTreeBuffer) Update

func (c *CollapsingMergeTreeBuffer) Update(queries ...table_buffer.Query) error

type MergeTreeBuffer

type MergeTreeBuffer struct {
	// contains filtered or unexported fields
}

func NewMergeTreeBuffer

func NewMergeTreeBuffer(
	db *sql.DB,
	tableName string,
	insertTimeout time.Duration,
	flushPeriod time.Duration,
	bufferSize uint64,
	metrics metrics.Metrics,
	logger *zap.Logger,
) *MergeTreeBuffer

func (*MergeTreeBuffer) Close

func (m *MergeTreeBuffer) Close()

func (*MergeTreeBuffer) Errors

func (m *MergeTreeBuffer) Errors() <-chan error

func (*MergeTreeBuffer) Flush

func (m *MergeTreeBuffer) Flush()

func (*MergeTreeBuffer) Insert

func (m *MergeTreeBuffer) Insert(queries ...table_buffer.Query) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL