Documentation ¶
Index ¶
- Variables
- type Client
- type CloseQueryPayload
- type CommandResult
- type CommandStatus
- type DescribeResult
- type ExecPayload
- type ExecResult
- type ExplainResult
- type Field
- type HealthcheckResult
- type InfoResult
- type InsertsStreamAck
- type InsertsStreamCloser
- type InsertsStreamTargetPayload
- type InsertsStreamWriter
- type ListPropertiesResult
- type ListQueriesResult
- type ListStreamsResult
- type ListTablesResult
- type Option
- type Query
- type QueryConfig
- type QueryDescription
- type QueryError
- type QueryPayload
- type QueryResult
- type QueryResultHeader
- type QueryRows
- type QueryStrategy
- type QueryStreamPayload
- type QueryStreamRows
- type Result
- type Row
- type Rows
- type Schema
- type SourceDescription
- type Stream
- type StreamsProperties
- type StreamsPropertiesOption
- type Table
- type TerminateClusterPayload
- type Warning
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrColumnNumberMismatch is returned when the number of columns present doesn't match the destination slice length ErrColumnNumberMismatch = errors.New("unexpected number of columns") // ErrRowsClosed is returned when trying to interate over rows after the iterator has been closed ErrRowsClosed = errors.New("rows closed") )
var DefaultQueryConfig = NewQueryConfig()
DefaultQueryConfig is the fallback config when querying through the database/sql interface
var ErrAckUnsucessful = errors.New("an ack was received but the status was not 'ok'")
ErrAckUnsucessful signifies that the document couldn't be written the the stream
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { // Close closes all open connections Close() error // Describe returns information about an object Describe(ctx context.Context, source string) (DescribeResult, error) // Exec runs KSQL statements which can be anything except SELECT Exec(ctx context.Context, params ExecPayload) ([]ExecResult, error) // Explain returns details of the execution plan for a query or expression Explain(ctx context.Context, queryNameOrExpression string) (ExplainResult, error) // Healthcheck gets basic health information from the ksqlDB cluster Healthcheck(ctx context.Context) (HealthcheckResult, error) // Info returns status information about the ksqlDB cluster Info(ctx context.Context) (InfoResult, error) // InsertsStream allows you to insert rows into an existing ksqlDB stream. The stream must have already been created in ksqlDB. InsertsStream(ctx context.Context, payload InsertsStreamTargetPayload) (*InsertsStreamWriter, error) // ListQueries is a convenience method which executes a `LIST QUERIES;` operation ListQueries(ctx context.Context) (ListQueriesResult, error) // ListTables is a convenience method which executes a `LIST TABLES;` operation ListTables(ctx context.Context) (ListTablesResult, error) // ListStreams is a convenience method which executes a `LIST STREAMS;` operation ListStreams(ctx context.Context) (ListStreamsResult, error) // ListProperties is a convenience method which executes a `LIST PROPERTIES;` operation ListProperties(ctx context.Context) (ListPropertiesResult, error) // Query runs a KSQL query and returns a cursor. For streaming results use the QueryStream method. Query(ctx context.Context, payload QueryPayload) (*QueryRows, error) // QueryStream runs a streaming push & pull query QueryStream(ctx context.Context, payload QueryStreamPayload) (*QueryStreamRows, error) // CloseQuery explicitly terminates a push query stream CloseQuery(ctx context.Context, payload CloseQueryPayload) error // TerminateCluster terminates a running ksqlDB cluster TerminateCluster(ctx context.Context, payload TerminateClusterPayload) error }
Client is a ksqlDB REST API client
type CloseQueryPayload ¶
type CloseQueryPayload struct {
QueryID string `json:"queryId"`
}
CloseQueryPayload represents the JSON body used to close a query stream
type CommandResult ¶
type CommandResult struct { // CommandID is the identified for the requested operation. You can use this ID to poll the result of the operation using the status endpoint. CommandID string `json:"commandId,omitempty"` // CommandStatus is the status of the requested operation. CommandStatus CommandStatus `json:"commandStatus,omitempty"` // contains filtered or unexported fields }
CommandResult contains information about a CREATE, DROP or TERMINATE command
type CommandStatus ¶
type CommandStatus struct { // Status is one of QUEUED, PARSING, EXECUTING, TERMINATED, SUCCESS, or ERROR Status string `json:"status"` // Message regarding the status of the execution statement. Message string `json:"message"` // CommandSequenceNumber is the sequence number of the command, -1 if unsuccessful CommandSequenceNumber int64 `json:"commandSequenceNumber"` }
CommandStatus contains details of status of a given command
type DescribeResult ¶
type DescribeResult struct { // SourceDescription is a detailed description of the source (a STREAM or TABLE) SourceDescription SourceDescription `json:"sourceDescription,omitempty"` // contains filtered or unexported fields }
DescribeResult represents the response from a DESCRIBE statement
type ExecPayload ¶
type ExecPayload struct { // KSQL is a sequence of SQL statements. Anything is permitted except SELECT, for which you should use the Query method KSQL string `json:"ksql"` // StreamsProperties is a map of property overrides StreamsProperties StreamsProperties `json:"streamsProperties,omitempty"` // CommandSequenceNumber optionally waits until the specified sequence has been completed before running CommandSequenceNumber int64 `json:"commandSequenceNumber,omitempty"` }
ExecPayload represents the JSON payload for the /ksql endpoint
type ExecResult ¶
type ExecResult struct { // CREATE, DROP, TERMINATE *CommandResult // LIST STREAMS, SHOW STREAMS *ListStreamsResult // LIST TABLES, SHOW TABLES *ListTablesResult // LIST QUERIES, SHOW QUERIES *ListQueriesResult // LIST PROPERTIES, SHOW PROPERTIES *ListPropertiesResult // DESCRIBE *DescribeResult // EXPLAIN *ExplainResult // contains filtered or unexported fields }
ExecResult is the response result from the /ksql endpoint
func (ExecResult) As ¶
func (e ExecResult) As(target Result) bool
As checks if the ExecResult contains a subset result. If it does, then data is copied over for convenience.
Example ¶
package main import ( "fmt" ksql "github.com/vancelongwill/ksql-go/client" ) func main() { result := ksql.ExecResult{ CommandResult: &ksql.CommandResult{ CommandID: "69141AFF-1C6B-43F5-8905-7D6923588875", CommandStatus: ksql.CommandStatus{ Status: "QUEUED", }, }, } var cmd ksql.CommandResult b := result.As(&cmd) fmt.Println(b) var describe ksql.DescribeResult b = result.As(&describe) fmt.Println(b) }
Output: true false
type ExplainResult ¶
type ExplainResult struct { // QueryDescription is a detailed description of a query statement. QueryDescription QueryDescription `json:"queryDescription,omitempty"` // OverriddenProperties is a map of property overrides that the query is running with. OverriddenProperties map[string]interface{} `json:"overriddenProperties,omitempty"` // contains filtered or unexported fields }
ExplainResult represents the response for an `EXPLAIN` statement
type Field ¶
type Field struct { // The name of the field. Name string `json:"name"` // A schema object that describes the schema of the field. Schema Schema `json:"schema"` }
Field represents a single fields in ksqlDB
type HealthcheckResult ¶
type HealthcheckResult struct { IsHealthy bool `json:"isHealthy"` Details struct { Metastore struct { IsHealthy bool `json:"isHealthy"` } `json:"metastore"` Kafka struct { IsHealthy bool `json:"isHealthy"` } `json:"kafka"` } `json:"details"` }
HealthcheckResult represents the health check information returned by the health check endpoint
type InsertsStreamAck ¶
InsertsStreamAck represents an insert acknowledgement message in an inserts stream
type InsertsStreamCloser ¶
type InsertsStreamCloser struct {
// contains filtered or unexported fields
}
InsertsStreamCloser gracefully terminates the stream
func (*InsertsStreamCloser) Close ¶
func (i *InsertsStreamCloser) Close() error
Close closes the request body thus terminating the stream
type InsertsStreamTargetPayload ¶
type InsertsStreamTargetPayload struct {
Target string `json:"target"`
}
InsertsStreamTargetPayload represents the request body for initiating an inserts stream
type InsertsStreamWriter ¶
type InsertsStreamWriter struct {
// contains filtered or unexported fields
}
InsertsStreamWriter represents an inserts stream
func (*InsertsStreamWriter) Close ¶
func (i *InsertsStreamWriter) Close() error
Close terminates the request and therefore inserts stream
type ListPropertiesResult ¶
type ListPropertiesResult struct { // Properties is the map of server query properties Properties map[string]string `json:"properties,omitempty"` // contains filtered or unexported fields }
ListPropertiesResult represents the response for a `LIST PROPERTIES;` statement
type ListQueriesResult ¶
type ListQueriesResult struct { // Queries is the list of running queries Queries []Query `json:"queries,omitempty"` // contains filtered or unexported fields }
ListQueriesResult represents the API response from the `LIST QUERIES;` operation
type ListStreamsResult ¶
type ListStreamsResult struct { // Streams is the list of streams returned Streams []Stream `json:"streams,omitempty"` // contains filtered or unexported fields }
ListStreamsResult represents the API response from the `LIST STREAMS;` operation
type ListTablesResult ¶
type ListTablesResult struct { // Tables is the list of tables returned Tables []Table `json:"tables,omitempty"` // contains filtered or unexported fields }
ListTablesResult represents the API response from the `LIST TABLES;` operation
type Option ¶
type Option func(*ksqldb)
Option represents a function option for the ksqlDB client
func WithHTTPClient ¶
WithHTTPClient is an option for the ksqlDB client which allows the user to override the default http client
type Query ¶
type Query struct { // QueryString is the text of the statement that started the query QueryString string `json:"queryString"` // Sinks are the streams and tables being written to by the query Sinks []string `json:"sinks"` // ID is the query id ID string `json:"id"` }
Query is info about a query
type QueryConfig ¶
type QueryConfig struct { Strategy QueryStrategy StreamsProperties StreamsProperties }
QueryConfig is for use with database/sql based queries
func NewQueryConfig ¶
func NewQueryConfig() *QueryConfig
NewQueryConfig constructs a new default QueryConfig builder
func (*QueryConfig) SetProperties ¶
func (q *QueryConfig) SetProperties(props StreamsProperties) *QueryConfig
SetProperties sets the stream properties for the query, replacing any existing properties
func (*QueryConfig) Static ¶
func (q *QueryConfig) Static() *QueryConfig
Static configures the query to use the /query resource. Queries made with this strategy do not require manual closing.
func (*QueryConfig) Stream ¶
func (q *QueryConfig) Stream() *QueryConfig
Stream configures the query to use the /query-stream resource (i.e. pull queries). Results are streamed back until explicitly closed or the context is cancelled.
func (*QueryConfig) WithProperties ¶
func (q *QueryConfig) WithProperties(options ...StreamsPropertiesOption) *QueryConfig
WithProperties is a shorthand for configuring stream options
type QueryDescription ¶
type QueryDescription struct { // StatementText is a ksqlDB statement for which the query being explained is running. StatementText string `json:"statementText"` // Fields is a list of field objects that describes each field in the query output. Fields []Field `json:"fields"` // Sources is a list of the stream and table names being read by the query. Sources []string `json:"sources"` // Sinks is a list of the stream and table names being written to by the query. Sinks []string `json:"sinks"` // ExecutionPlan is the query execution plan. ExecutionPlan string `json:"executionPlan"` // Topology is the Kafka Streams topology for the query that is running. Topology string `json:"topology"` }
QueryDescription is a detailed description of a query statement.
type QueryError ¶
type QueryError struct {
// contains filtered or unexported fields
}
QueryError represents an error querying
func (*QueryError) Error ¶
func (q *QueryError) Error() string
type QueryPayload ¶
type QueryPayload struct { // KSQL is SELECT statement KSQL string `json:"ksql"` // StreamsProperties is a map of property overrides StreamsProperties StreamsProperties `json:"streamsProperties,omitempty"` }
QueryPayload represents the JSON payload for the POST /query endpoint
type QueryResult ¶
type QueryResult struct { Row Row `json:"row"` ErrorMessage string `json:"errorMessage,omitempty"` FinalMessage string `json:"finalMessage,omitempty"` }
QueryResult is the result of running a query
type QueryResultHeader ¶
type QueryResultHeader struct { // QueryID is a unique ID, provided for push queries only QueryID string `json:"queryID"` // ColumnNames is a list of column names ColumnNames []string `json:"columnNames"` // ColumnTypes is a list of the column types (e.g. 'BIGINT', 'STRING', 'BOOLEAN') ColumnTypes []string `json:"columnTypes"` }
QueryResultHeader is a header object which contains details of the push & pull query results
type QueryRows ¶
type QueryRows struct {
// contains filtered or unexported fields
}
QueryRows is a row iterator for static queries
type QueryStrategy ¶
type QueryStrategy string
QueryStrategy is used to
const ( // StreamQuery uses the clients QueryStream method StreamQuery QueryStrategy = "StreamQuery" // StaticQuery uses the clients more limited Query method StaticQuery = "StaticQuery" )
type QueryStreamPayload ¶
type QueryStreamPayload struct { // KSQL is the SELECT query to execute KSQL string `json:"sql"` // Properties is a map of optional properties for the query Properties map[string]string `json:"properties,omitempty"` }
QueryStreamPayload is the request body type for the /query-stream endpoint
type QueryStreamRows ¶
type QueryStreamRows struct {
// contains filtered or unexported fields
}
QueryStreamRows implements the standard libs Rows interface for reading DB rows
func (*QueryStreamRows) Close ¶
func (r *QueryStreamRows) Close() error
Close safely closes the response, allowing connections to be kept alive
func (*QueryStreamRows) Next ¶
func (r *QueryStreamRows) Next(dest []interface{}) error
Next reads another Row from the stream
type Result ¶
type Result interface {
// contains filtered or unexported methods
}
Result is common interface implemented by all result types for the ksqlDB REST API.
Internaly, it is used to convert generic responses in to narrower structs
type Rows ¶
Rows is almost the same as the standard library's driver.Rows interface except the Value type alias
type Schema ¶
type Schema struct { // The type the schema represents. One of INTEGER, BIGINT, BOOLEAN, DOUBLE, STRING, MAP, ARRAY, or STRUCT. Type string `json:"type"` // A schema object. For MAP and ARRAY types, contains the schema of the map values and array elements, respectively. For other types this field is not used and its value is undefined. MemberSchema map[string]interface{} `json:"memberSchema,omitempty"` // For STRUCT types, contains a list of field objects that describes each field within the struct. For other types this field is not used and its value is undefined. Fields []Field `json:"fields,omitempty"` }
Schema represents a ksqlDB fields schema
type SourceDescription ¶
type SourceDescription struct { // Name of the stream or table. Name string `json:"name"` // ReadQueries is the list of queries reading from the stream or table. ReadQueries []Query `json:"readQueries"` // WriteQueries is the list of queries writing into the stream or table WriteQueries []Query `json:"writeQueries"` // Fields is a list of field objects that describes each field in the stream/table. Fields []Field `json:"fields"` // Type is either STREAM or TABLE. Type string `json:"type"` // Key is the name of the key column. Key string `json:"key"` // Timestamp is the name of the timestamp column. Timestamp string `json:"timestamp"` // Format is the serialization format of the data in the stream or table. One of JSON, AVRO, PROTOBUF, or DELIMITED. Format string `json:"format"` // Topic backing the stream or table. Topic string `json:"topic"` // Extended indicates if this is an extended description. Extended bool `json:"extended"` // Statistics about production and consumption to and from the backing topic (extended only). Statistics string `json:"statistics,omitempty"` // ErrorStats is a string about errors producing and consuming to and from the backing topic (extended only). ErrorStats string `json:"errorStats,omitempty"` // Replication factor of the backing topic (extended only). Replication int `json:"replication,omitempty"` // Partitions is the number of partitions in the backing topic (extended only). Partitions int `json:"partitions,omitempty"` }
SourceDescription is a detailed description of the source (a STREAM or TABLE)
type Stream ¶
type Stream struct { // Name is the name of the stream Name string `json:"name"` // Topic is the associated Kafka topic Topic string `json:"topic"` // Format is the serialization format of the stream. One of JSON, AVRO, PROTOBUF, or DELIMITED. Format string `json:"format"` // Type is always 'STREAM' Type string `json:"type"` }
Stream is info about a stream
type StreamsProperties ¶
StreamsProperties is a map of property overrides https://docs.ksqldb.io/en/latest/operate-and-deploy/installation/server-config/config-reference/
func NewStreamsProperties ¶
func NewStreamsProperties(options ...StreamsPropertiesOption) StreamsProperties
NewStreamsProperties creates a new StreamsProperties map with a optional set of options
func (StreamsProperties) With ¶
func (s StreamsProperties) With(options ...StreamsPropertiesOption) StreamsProperties
With configures the StreamsProperties map with the provided options
type StreamsPropertiesOption ¶
type StreamsPropertiesOption func(StreamsProperties)
StreamsPropertiesOption configures an option in the StreamsProperties map
var ( // OffsetEarliest configures the query to stream from the beginning OffsetEarliest StreamsPropertiesOption = func(s StreamsProperties) { s["ksql.streams.auto.offset.reset"] = "earliest" } // OffsetLatest is the default offset strategy reading from the latest item in the stream OffsetLatest StreamsPropertiesOption = func(s StreamsProperties) { s["ksql.streams.auto.offset.reset"] = "latest" } // ExactlyOnce enables exactly-once semantics for the query. If a producer within a ksqlDB application sends a duplicate record, it's written to the broker exactly once. ExactlyOnce StreamsPropertiesOption = func(s StreamsProperties) { s["processing.guarantee"] = "exactly_once" } // AtLeastOnce enables at-least-once semantics for the query and is the default setting. Records are never lost but may be redelivered. AtLeastOnce StreamsPropertiesOption = func(s StreamsProperties) { s["processing.guarantee"] = "at_least_once" } )
type Table ¶
type Table struct { // Name of the table. Name string `json:"name"` // Topic backing the table. Topic string `json:"topic"` // The serialization format of the data in the table. One of JSON, AVRO, PROTOBUF, or DELIMITED. Format string `json:"format"` // The source type. Always returns 'TABLE'. Type string `json:"type"` // IsWindowed is true if the table provides windowed results; otherwise, false. IsWindowed bool `json:"isWindowed"` }
Table is info about a table
type TerminateClusterPayload ¶
type TerminateClusterPayload struct { // DeleteTopicList is an optional list of Kafka topics to delete DeleteTopicList []string `json:"deleteTopicList,omitempty"` }
TerminateClusterPayload represents the request body payload to terminate a ksqlDB cluster