postgres

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2022 License: MIT Imports: 18 Imported by: 0

README

Postgres tracer

Implementation of klogga Tracer that writes spans to Postgres database with TimescaleDB extension. Schema is created automatically, at least most of the time. Re

Implementation features

  • batches spans
  • creates missing tables (be careful with tracer names)
  • modifies table columns, although not all cases are supported
  • reports errors when data type for the already created column does not match that data in the span

Documentation

Index

Constants

View Source
const (
	ErrorPostgresTableName = "error_postgres"
	DefaultSchema          = "audit"
	ErrorPostgresTable     = DefaultSchema + "." + ErrorPostgresTableName
)
View Source
const PgJsonbTypeName = "jsonb"
View Source
const PgTextTypeName = "text"

Variables

This section is empty.

Functions

func GetPgTypeVal

func GetPgTypeVal(a interface{}) (string, interface{})

GetPgTypeVal converts go type to a compatible PG type structs are automatically converted to jsonb

Types

type ColumnSchema

type ColumnSchema struct {
	Name        string
	DataType    string
	TypePostfix string
	IsNullable  bool
}

func (ColumnSchema) SQL

func (s ColumnSchema) SQL() string

type Conf

type Conf struct {
	SchemaName string
	// applied per-table, and only for writing itself, without updating the schema
	WriteTimeout time.Duration

	// don't create any DB structure automatically
	// requires all the tables to be created beforehand, with exactly required fields
	SkipSchemaCreation bool

	LoadSchemaTimeout time.Duration
	// automatically created tables are declared as timescale hypertables
	UseTimescale bool
}

Conf parameters on how klogga works with DB not the connection string etc.

type Connection

type Connection interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
	// Close is called when the exporter no longer needs the connection
	Close() error
}

type Connector

type Connector interface {
	GetConnection(ctx context.Context) (Connection, error)
	// Stop for cleanup, will be  called when tracer closes
	Stop(ctx context.Context) error
}

Connector provides PG connections in an abstract way

type ErrDescriptor

type ErrDescriptor struct {
	Table          string
	Span           *klogga.Span
	Column         ColumnSchema
	ExistingColumn ColumnSchema
}

ErrDescriptor describes a problematic span column, it's description will be written to error_metrics

func (*ErrDescriptor) Err

func (e *ErrDescriptor) Err() error

func (*ErrDescriptor) Warn

func (e *ErrDescriptor) Warn() error

type Exporter

type Exporter struct {
	// contains filtered or unexported fields
}

Exporter writes Spans to postgres can create tables and add columns to table

func New

func New(cfg *Conf, connFactory Connector, trs klogga.Tracer) *Exporter

New to be used with batcher cfg - config connFactory - connection to PG trs - a tracer to write pg_exporter logs, like DB changes, pass nil to setup default golog tracer

func (*Exporter) ConnFactory

func (e *Exporter) ConnFactory() Connector

func (*Exporter) Shutdown

func (e *Exporter) Shutdown(ctx context.Context) error

func (*Exporter) Start

func (e *Exporter) Start(context.Context) error

Start for compatibility with Start / Stop interface

func (*Exporter) Stop

func (e *Exporter) Stop(ctx context.Context) error

Stop for compatibility with Start / Stop interface

func (*Exporter) Write

func (e *Exporter) Write(ctx context.Context, spans []*klogga.Span) error

type RecordSet

type RecordSet struct {
	Schema *TableSchema
	Spans  []*klogga.Span
}

type TableSchema

type TableSchema struct {
	// contains filtered or unexported fields
}

TableSchema key is the table name

func NewTableSchema

func NewTableSchema(columns []*ColumnSchema) *TableSchema

func (*TableSchema) AddColumn

func (t *TableSchema) AddColumn(col ColumnSchema)

func (TableSchema) AlterTableStatement

func (t TableSchema) AlterTableStatement(schema, tableName string) string

AlterTableStatement NOT injection safe

func (TableSchema) Column

func (t TableSchema) Column(name string) (*ColumnSchema, bool)

func (TableSchema) ColumnNames

func (t TableSchema) ColumnNames() []string

func (TableSchema) Columns

func (t TableSchema) Columns() []*ColumnSchema

func (TableSchema) ColumnsCount

func (t TableSchema) ColumnsCount() int

func (TableSchema) CreateTableStatement

func (t TableSchema) CreateTableStatement(schema, tableName string, timeCol *ColumnSchema, useTimescale bool) string

CreateTableStatement NOT injection safe

func (TableSchema) GetAlterSchema

func (t TableSchema) GetAlterSchema(dataset RecordSet) (*TableSchema, []ErrDescriptor)

GetAlterSchema returns missing columns' schema returns spans that cannot be written just by adding columns

func (TableSchema) InsertStatement

func (t TableSchema) InsertStatement(schema string, tableName string) string

InsertStatement NOT injection safe

func (*TableSchema) IsZero

func (t *TableSchema) IsZero() bool

func (*TableSchema) Merge

func (t *TableSchema) Merge(newCols []*ColumnSchema) *TableSchema

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL