Documentation ¶
Index ¶
- Variables
- type Client
- func (cl *Client) Debug() *Client
- func (cl *Client) Execute(q string) (err error)
- func (cl *Client) Pull(ctx context.Context, q string, s bool) (h Header, r Payload, err error)
- func (cl *Client) Push(ctx context.Context, q string, rc chan<- Row, hc chan<- Header) (err error)
- func (cl *Client) SetLogFunc(fn func(format string, v ...interface{})) *Client
- type Column
- type Header
- type Payload
- type Row
Constants ¶
This section is empty.
Variables ¶
var (
ErrNotFound = errors.New("No result found")
)
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
The ksqlDB client
func (*Client) Execute ¶
Execute will execute a ksqlDB statement, such as creating a new stream or table. To run queries use Push or Pull functions.
To use this function pass in the base URL of your ksqlDB server, and the SQL query statement
Ref: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/
TODO Add support for commandSequenceNumber and streamsProperties TODO Add better support for responses to CREATE/DROP/TERMINATE (e.g. commandID, commandStatus.status, etc)
func (*Client) Pull ¶
Pull queries are like "traditional" RDBMS queries in which the query terminates once the state has been queried.
To use this function pass in the the SQL query statement, and a boolean for whether full table scans should be enabled.
The function returns a ksqldb.Header and ksqldb.Payload which will hold one or more rows of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:
var COL1 string var COL2 float64 for _, row := range r { COL1 = row[0].(string) COL2 = row[1].(float64) // Do other stuff with the data here } }
func (*Client) Push ¶
Push queries are continuous queries in which new events or changes to a table's state are pushed to the client. You can think of them as subscribing to a stream of changes.
Since push queries never end, this function expects a channel to which it can write new rows of data as and when they are received.
To use this function pass in a context, the SQL query statement, and two channels:
- ksqldb.Row - rows of data
- ksqldb.Header - header (including column definitions). If you don't want to block before receiving row data then make this channel buffered.
The channel is populated with ksqldb.Row which represents one row of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:
var DATA_TS float64 var ID string for row := range rc { if row != nil { DATA_TS = row[0].(float64) ID = row[1].(string)
func (*Client) SetLogFunc ¶
SetLogFunc sets custom logging function with func(format string, v ...interface{}) profile