client

package
v0.0.0-...-f06801b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2021 License: MIT Imports: 17 Imported by: 3

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrColumnNumberMismatch is returned when the number of columns present doesn't match the destination slice length
	ErrColumnNumberMismatch = errors.New("unexpected number of columns")
	// ErrRowsClosed is returned when trying to interate over rows after the iterator has been closed
	ErrRowsClosed = errors.New("rows closed")
)
View Source
var DefaultQueryConfig = NewQueryConfig()

DefaultQueryConfig is the fallback config when querying through the database/sql interface

View Source
var ErrAckUnsucessful = errors.New("an ack was received but the status was not 'ok'")

ErrAckUnsucessful signifies that the document couldn't be written the the stream

Functions

This section is empty.

Types

type Client

type Client interface {
	// Close closes all open connections
	Close() error
	// Describe returns information about an object
	Describe(ctx context.Context, source string) (DescribeResult, error)
	// Exec runs KSQL statements which can be anything except SELECT
	Exec(ctx context.Context, params ExecPayload) ([]ExecResult, error)
	// Explain returns details of the execution plan for a query or expression
	Explain(ctx context.Context, queryNameOrExpression string) (ExplainResult, error)
	// Healthcheck gets basic health information from the ksqlDB cluster
	Healthcheck(ctx context.Context) (HealthcheckResult, error)
	// Info returns status information about the ksqlDB cluster
	Info(ctx context.Context) (InfoResult, error)
	// InsertsStream allows you to insert rows into an existing ksqlDB stream. The stream must have already been created in ksqlDB.
	InsertsStream(ctx context.Context, payload InsertsStreamTargetPayload) (*InsertsStreamWriter, error)
	// ListQueries is a convenience method which executes a `LIST QUERIES;` operation
	ListQueries(ctx context.Context) (ListQueriesResult, error)
	// ListTables is a convenience method which executes a `LIST TABLES;` operation
	ListTables(ctx context.Context) (ListTablesResult, error)
	// ListStreams is a convenience method which executes a `LIST STREAMS;` operation
	ListStreams(ctx context.Context) (ListStreamsResult, error)
	// ListProperties is a convenience method which executes a `LIST PROPERTIES;` operation
	ListProperties(ctx context.Context) (ListPropertiesResult, error)
	// Query runs a KSQL query and returns a cursor. For streaming results use the QueryStream method.
	Query(ctx context.Context, payload QueryPayload) (*QueryRows, error)
	// QueryStream runs a streaming push & pull query
	QueryStream(ctx context.Context, payload QueryStreamPayload) (*QueryStreamRows, error)
	// CloseQuery explicitly terminates a push query stream
	CloseQuery(ctx context.Context, payload CloseQueryPayload) error
	// TerminateCluster terminates a running ksqlDB cluster
	TerminateCluster(ctx context.Context, payload TerminateClusterPayload) error
}

Client is a ksqlDB REST API client

func New

func New(baseURL string, options ...Option) Client

New constructs a new ksqlDB REST API client.

By default this uses an insecure HTTP2 client. In production you should pass in your own client via the WithHTTPClient option.

type CloseQueryPayload

type CloseQueryPayload struct {
	QueryID string `json:"queryId"`
}

CloseQueryPayload represents the JSON body used to close a query stream

type CommandResult

type CommandResult struct {

	// CommandID is the identified for the requested operation. You can use this ID to poll the result of the operation using the status endpoint.
	CommandID string `json:"commandId,omitempty"`
	// CommandStatus is the status of the requested operation.
	CommandStatus CommandStatus `json:"commandStatus,omitempty"`
	// contains filtered or unexported fields
}

CommandResult contains information about a CREATE, DROP or TERMINATE command

type CommandStatus

type CommandStatus struct {
	// Status is one of QUEUED, PARSING, EXECUTING, TERMINATED, SUCCESS, or ERROR
	Status string `json:"status"`
	// Message regarding the status of the execution statement.
	Message string `json:"message"`
	// CommandSequenceNumber is the sequence number of the command, -1 if unsuccessful
	CommandSequenceNumber int64 `json:"commandSequenceNumber"`
}

CommandStatus contains details of status of a given command

type DescribeResult

type DescribeResult struct {

	// SourceDescription is a detailed description of the source (a STREAM or TABLE)
	SourceDescription SourceDescription `json:"sourceDescription,omitempty"`
	// contains filtered or unexported fields
}

DescribeResult represents the response from a DESCRIBE statement

type ExecPayload

type ExecPayload struct {
	// KSQL is a sequence of SQL statements. Anything is permitted except SELECT, for which you should use the Query method
	KSQL string `json:"ksql"`
	// StreamsProperties is a map of property overrides
	StreamsProperties StreamsProperties `json:"streamsProperties,omitempty"`
	// CommandSequenceNumber optionally waits until the specified sequence has been completed before running
	CommandSequenceNumber int64 `json:"commandSequenceNumber,omitempty"`
}

ExecPayload represents the JSON payload for the /ksql endpoint

type ExecResult

type ExecResult struct {

	// CREATE, DROP, TERMINATE
	*CommandResult

	// LIST STREAMS, SHOW STREAMS
	*ListStreamsResult

	// LIST TABLES, SHOW TABLES
	*ListTablesResult

	// LIST QUERIES, SHOW QUERIES
	*ListQueriesResult

	// LIST PROPERTIES, SHOW PROPERTIES
	*ListPropertiesResult

	// DESCRIBE
	*DescribeResult

	// EXPLAIN
	*ExplainResult
	// contains filtered or unexported fields
}

ExecResult is the response result from the /ksql endpoint

func (ExecResult) As

func (e ExecResult) As(target Result) bool

As checks if the ExecResult contains a subset result. If it does, then data is copied over for convenience.

Example
package main

import (
	"fmt"

	ksql "github.com/vancelongwill/ksql-go/client"
)

func main() {
	result := ksql.ExecResult{
		CommandResult: &ksql.CommandResult{
			CommandID: "69141AFF-1C6B-43F5-8905-7D6923588875",
			CommandStatus: ksql.CommandStatus{
				Status: "QUEUED",
			},
		},
	}
	var cmd ksql.CommandResult
	b := result.As(&cmd)
	fmt.Println(b)
	var describe ksql.DescribeResult
	b = result.As(&describe)
	fmt.Println(b)
}
Output:

true
false

type ExplainResult

type ExplainResult struct {

	// QueryDescription is a detailed description of a query statement.
	QueryDescription QueryDescription `json:"queryDescription,omitempty"`
	// OverriddenProperties is a map of property overrides that the query is running with.
	OverriddenProperties map[string]interface{} `json:"overriddenProperties,omitempty"`
	// contains filtered or unexported fields
}

ExplainResult represents the response for an `EXPLAIN` statement

type Field

type Field struct {
	// The name of the field.
	Name string `json:"name"`
	// A schema object that describes the schema of the field.
	Schema Schema `json:"schema"`
}

Field represents a single fields in ksqlDB

type HealthcheckResult

type HealthcheckResult struct {
	IsHealthy bool `json:"isHealthy"`
	Details   struct {
		Metastore struct {
			IsHealthy bool `json:"isHealthy"`
		} `json:"metastore"`
		Kafka struct {
			IsHealthy bool `json:"isHealthy"`
		} `json:"kafka"`
	} `json:"details"`
}

HealthcheckResult represents the health check information returned by the health check endpoint

type InfoResult

type InfoResult map[string]interface{}

InfoResult is a map of status information

type InsertsStreamAck

type InsertsStreamAck struct {
	Status string `json:"status"`
	Seq    int64  `json:"seq"`
}

InsertsStreamAck represents an insert acknowledgement message in an inserts stream

type InsertsStreamCloser

type InsertsStreamCloser struct {
	// contains filtered or unexported fields
}

InsertsStreamCloser gracefully terminates the stream

func (*InsertsStreamCloser) Close

func (i *InsertsStreamCloser) Close() error

Close closes the request body thus terminating the stream

type InsertsStreamTargetPayload

type InsertsStreamTargetPayload struct {
	Target string `json:"target"`
}

InsertsStreamTargetPayload represents the request body for initiating an inserts stream

type InsertsStreamWriter

type InsertsStreamWriter struct {
	// contains filtered or unexported fields
}

InsertsStreamWriter represents an inserts stream

func (*InsertsStreamWriter) Close

func (i *InsertsStreamWriter) Close() error

Close terminates the request and therefore inserts stream

func (*InsertsStreamWriter) WriteJSON

func (i *InsertsStreamWriter) WriteJSON(ctx context.Context, p interface{}) error

WriteJSON encodes and writes p to the inserts stream, and waits for the corresponding Ack to be received

type ListPropertiesResult

type ListPropertiesResult struct {

	// Properties is the map of server query properties
	Properties map[string]string `json:"properties,omitempty"`
	// contains filtered or unexported fields
}

ListPropertiesResult represents the response for a `LIST PROPERTIES;` statement

type ListQueriesResult

type ListQueriesResult struct {

	// Queries is the list of running queries
	Queries []Query `json:"queries,omitempty"`
	// contains filtered or unexported fields
}

ListQueriesResult represents the API response from the `LIST QUERIES;` operation

type ListStreamsResult

type ListStreamsResult struct {

	// Streams is the list of streams returned
	Streams []Stream `json:"streams,omitempty"`
	// contains filtered or unexported fields
}

ListStreamsResult represents the API response from the `LIST STREAMS;` operation

type ListTablesResult

type ListTablesResult struct {

	// Tables is the list of tables returned
	Tables []Table `json:"tables,omitempty"`
	// contains filtered or unexported fields
}

ListTablesResult represents the API response from the `LIST TABLES;` operation

type Option

type Option func(*ksqldb)

Option represents a function option for the ksqlDB client

func WithHTTPClient

func WithHTTPClient(client *http.Client) Option

WithHTTPClient is an option for the ksqlDB client which allows the user to override the default http client

type Query

type Query struct {
	// QueryString is the text of the statement that started the query
	QueryString string `json:"queryString"`
	// Sinks are the streams and tables being written to by the query
	Sinks []string `json:"sinks"`
	// ID is the query id
	ID string `json:"id"`
}

Query is info about a query

type QueryConfig

type QueryConfig struct {
	Strategy          QueryStrategy
	StreamsProperties StreamsProperties
}

QueryConfig is for use with database/sql based queries

func NewQueryConfig

func NewQueryConfig() *QueryConfig

NewQueryConfig constructs a new default QueryConfig builder

func (*QueryConfig) SetProperties

func (q *QueryConfig) SetProperties(props StreamsProperties) *QueryConfig

SetProperties sets the stream properties for the query, replacing any existing properties

func (*QueryConfig) Static

func (q *QueryConfig) Static() *QueryConfig

Static configures the query to use the /query resource. Queries made with this strategy do not require manual closing.

func (*QueryConfig) Stream

func (q *QueryConfig) Stream() *QueryConfig

Stream configures the query to use the /query-stream resource (i.e. pull queries). Results are streamed back until explicitly closed or the context is cancelled.

func (*QueryConfig) WithProperties

func (q *QueryConfig) WithProperties(options ...StreamsPropertiesOption) *QueryConfig

WithProperties is a shorthand for configuring stream options

type QueryDescription

type QueryDescription struct {
	// StatementText is a ksqlDB statement for which the query being explained is running.
	StatementText string `json:"statementText"`
	// Fields is a list of field objects that describes each field in the query output.
	Fields []Field `json:"fields"`
	// Sources is a list of the stream and table names being read by the query.
	Sources []string `json:"sources"`
	// Sinks is a list of the stream and table names being written to by the query.
	Sinks []string `json:"sinks"`
	// ExecutionPlan is the query execution plan.
	ExecutionPlan string `json:"executionPlan"`
	// Topology is the Kafka Streams topology for the query that is running.
	Topology string `json:"topology"`
}

QueryDescription is a detailed description of a query statement.

type QueryError

type QueryError struct {
	// contains filtered or unexported fields
}

QueryError represents an error querying

func (*QueryError) Error

func (q *QueryError) Error() string

type QueryPayload

type QueryPayload struct {
	// KSQL is SELECT statement
	KSQL string `json:"ksql"`
	// StreamsProperties is a map of property overrides
	StreamsProperties StreamsProperties `json:"streamsProperties,omitempty"`
}

QueryPayload represents the JSON payload for the POST /query endpoint

type QueryResult

type QueryResult struct {
	Row          Row    `json:"row"`
	ErrorMessage string `json:"errorMessage,omitempty"`
	FinalMessage string `json:"finalMessage,omitempty"`
}

QueryResult is the result of running a query

type QueryResultHeader

type QueryResultHeader struct {
	// QueryID is a unique ID, provided for push queries only
	QueryID string `json:"queryID"`
	// ColumnNames is a list of column names
	ColumnNames []string `json:"columnNames"`
	// ColumnTypes is a list of the column types (e.g. 'BIGINT', 'STRING', 'BOOLEAN')
	ColumnTypes []string `json:"columnTypes"`
}

QueryResultHeader is a header object which contains details of the push & pull query results

type QueryRows

type QueryRows struct {
	// contains filtered or unexported fields
}

QueryRows is a row iterator for static queries

func (*QueryRows) Close

func (q *QueryRows) Close() error

Close closes the rows interator

func (QueryRows) Columns

func (c QueryRows) Columns() []string

func (*QueryRows) Next

func (q *QueryRows) Next(dest []interface{}) error

Next implements the sql driver row interface used for interating over rows

func (QueryRows) Validate

func (c QueryRows) Validate(dest []interface{}) error

type QueryStrategy

type QueryStrategy string

QueryStrategy is used to

const (
	// StreamQuery uses the clients QueryStream method
	StreamQuery QueryStrategy = "StreamQuery"
	// StaticQuery uses the clients more limited Query method
	StaticQuery = "StaticQuery"
)

type QueryStreamPayload

type QueryStreamPayload struct {
	// KSQL is the SELECT query to execute
	KSQL string `json:"sql"`
	// Properties is a map of optional properties for the query
	Properties map[string]string `json:"properties,omitempty"`
}

QueryStreamPayload is the request body type for the /query-stream endpoint

type QueryStreamRows

type QueryStreamRows struct {
	// contains filtered or unexported fields
}

QueryStreamRows implements the standard libs Rows interface for reading DB rows

func (*QueryStreamRows) Close

func (r *QueryStreamRows) Close() error

Close safely closes the response, allowing connections to be kept alive

func (QueryStreamRows) Columns

func (c QueryStreamRows) Columns() []string

func (*QueryStreamRows) Next

func (r *QueryStreamRows) Next(dest []interface{}) error

Next reads another Row from the stream

func (QueryStreamRows) Validate

func (c QueryStreamRows) Validate(dest []interface{}) error

type Result

type Result interface {
	// contains filtered or unexported methods
}

Result is common interface implemented by all result types for the ksqlDB REST API.

Internaly, it is used to convert generic responses in to narrower structs

type Row

type Row struct {
	Columns []interface{} `json:"columns"`
}

Row is a row in the DB

type Rows

type Rows interface {
	Columns() []string
	Close() error
	Next(dest []interface{}) error
}

Rows is almost the same as the standard library's driver.Rows interface except the Value type alias

type Schema

type Schema struct {
	// The type the schema represents. One of INTEGER, BIGINT, BOOLEAN, DOUBLE, STRING, MAP, ARRAY, or STRUCT.
	Type string `json:"type"`
	// A schema object. For MAP and ARRAY types, contains the schema of the map values and array elements, respectively. For other types this field is not used and its value is undefined.
	MemberSchema map[string]interface{} `json:"memberSchema,omitempty"`
	// For STRUCT types, contains a list of field objects that describes each field within the struct. For other types this field is not used and its value is undefined.
	Fields []Field `json:"fields,omitempty"`
}

Schema represents a ksqlDB fields schema

type SourceDescription

type SourceDescription struct {
	// Name of the stream or table.
	Name string `json:"name"`
	// ReadQueries is the list of queries reading from the stream or table.
	ReadQueries []Query `json:"readQueries"`
	// WriteQueries is the list of queries writing into the stream or table
	WriteQueries []Query `json:"writeQueries"`
	// Fields is a list of field objects that describes each field in the stream/table.
	Fields []Field `json:"fields"`
	// Type is either STREAM or TABLE.
	Type string `json:"type"`
	// Key is the name of the key column.
	Key string `json:"key"`
	// Timestamp is the name of the timestamp column.
	Timestamp string `json:"timestamp"`
	// Format is the serialization format of the data in the stream or table. One of JSON, AVRO, PROTOBUF, or DELIMITED.
	Format string `json:"format"`
	// Topic backing the stream or table.
	Topic string `json:"topic"`
	// Extended indicates if this is an extended description.
	Extended bool `json:"extended"`
	// Statistics about production and consumption to and from the backing topic (extended only).
	Statistics string `json:"statistics,omitempty"`
	// ErrorStats is a string about errors producing and consuming to and from the backing topic (extended only).
	ErrorStats string `json:"errorStats,omitempty"`
	// Replication factor of the backing topic (extended only).
	Replication int `json:"replication,omitempty"`
	// Partitions is the number of partitions in the backing topic (extended only).
	Partitions int `json:"partitions,omitempty"`
}

SourceDescription is a detailed description of the source (a STREAM or TABLE)

type Stream

type Stream struct {
	// Name is the name of the stream
	Name string `json:"name"`
	// Topic is the associated Kafka topic
	Topic string `json:"topic"`
	// Format is the serialization format of the stream. One of JSON, AVRO, PROTOBUF, or DELIMITED.
	Format string `json:"format"`
	// Type is always 'STREAM'
	Type string `json:"type"`
}

Stream is info about a stream

type StreamsProperties

type StreamsProperties map[string]string

StreamsProperties is a map of property overrides https://docs.ksqldb.io/en/latest/operate-and-deploy/installation/server-config/config-reference/

func NewStreamsProperties

func NewStreamsProperties(options ...StreamsPropertiesOption) StreamsProperties

NewStreamsProperties creates a new StreamsProperties map with a optional set of options

func (StreamsProperties) With

With configures the StreamsProperties map with the provided options

type StreamsPropertiesOption

type StreamsPropertiesOption func(StreamsProperties)

StreamsPropertiesOption configures an option in the StreamsProperties map

var (
	// OffsetEarliest configures the query to stream from the beginning
	OffsetEarliest StreamsPropertiesOption = func(s StreamsProperties) {
		s["ksql.streams.auto.offset.reset"] = "earliest"
	}

	// OffsetLatest is the default offset strategy reading from the latest item in the stream
	OffsetLatest StreamsPropertiesOption = func(s StreamsProperties) {
		s["ksql.streams.auto.offset.reset"] = "latest"
	}

	// ExactlyOnce enables exactly-once semantics for the query. If a producer within a ksqlDB application sends a duplicate record, it's written to the broker exactly once.
	ExactlyOnce StreamsPropertiesOption = func(s StreamsProperties) {
		s["processing.guarantee"] = "exactly_once"
	}

	// AtLeastOnce enables at-least-once semantics for the query and is the default setting. Records are never lost but may be redelivered.
	AtLeastOnce StreamsPropertiesOption = func(s StreamsProperties) {
		s["processing.guarantee"] = "at_least_once"
	}
)

type Table

type Table struct {
	// Name of the table.
	Name string `json:"name"`
	// Topic backing the table.
	Topic string `json:"topic"`
	// The serialization format of the data in the table. One of JSON, AVRO, PROTOBUF, or DELIMITED.
	Format string `json:"format"`
	// The source type. Always returns 'TABLE'.
	Type string `json:"type"`
	// IsWindowed is true if the table provides windowed results; otherwise, false.
	IsWindowed bool `json:"isWindowed"`
}

Table is info about a table

type TerminateClusterPayload

type TerminateClusterPayload struct {
	// DeleteTopicList is an optional list of Kafka topics to delete
	DeleteTopicList []string `json:"deleteTopicList,omitempty"`
}

TerminateClusterPayload represents the request body payload to terminate a ksqlDB cluster

type Warning

type Warning struct {
	Message string `json:"message"`
}

Warning represents a non-fatal user warning

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL