ksqldb

package module
v0.0.0-...-35d4fd2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2021 License: Apache-2.0 Imports: 14 Imported by: 1

README

= ksqlDB Go library
Robin Moffatt <robin@moffatt.me>
v0.04, 8 October 2020

'''
'''
'''

🌟🌟🌟 *@thmeitz has recently forked this—with my gratitude and blessing—in order to develop it further - please see his repo at https://github.com/thmeitz/ksqldb-go[thmeitz/ksqldb-go]*

'''
'''
'''

:toc:

This is a Go client for https://ksqldb.io/[ksqlDB]. It supports both pull and push queries, as well as command execution. 

⚠️ Disclaimer #1: https://rmoff.net/2020/06/25/learning-golang-some-rough-notes-s01e00/[I am brand new to Go!] Tips (or PRs) to improve the code very welcome :)

⚠️ Disclaimer #2: This is a personal project and not supported or endorsed by Confluent.

image::ksqldb-go.gif[Animation of the ksqlDB Golang client in action]

== Installation

Module install:

This client is a Go module, therefore you can have it simply by adding the following import to your code:

[source,golang]
----
import "github.com/rmoff/ksqldb-go"
----

Then run a build to have this client automatically added to your go.mod file as a dependency.

Manual install:

[source,bash]
----
go get -u github.com/rmoff/ksqldb-go
----

== Examples

See the link:test/environment.adoc[test environment here], and link:test/main.go[this sample code] which you can run with

[source,bash]
----
go run ./test/
----

Create a ksqlDB Client 

[source,go]
----
client := ksqldb.NewClient("http://ksqldb:8088","username","password").Debug()
----

For no authentication just use blank username and password values. 

=== Pull query

[source,go]
----
ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer ctxCancel()

k := "SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START, TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END, DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE WHERE DOG_SIZE='" + s + "';"
_, r, e := client.Pull(ctx, k, false)

if e != nil {
    // handle the error better here, e.g. check for no rows returned
    return fmt.Errorf("Error running Pull request against ksqlDB:\n%v", e)
}

var DOG_SIZE string
var DOGS_CT float64
for _, row := range r {
    if row != nil {
        // Should do some type assertions here
        DOG_SIZE = row[2].(string)
        DOGS_CT = row[3].(float64)
        fmt.Printf("🐶 There are %v dogs size %v\n", DOGS_CT, DOG_SIZE)
    }
}
----

=== Push query

[source,go]
----
rc := make(chan ksqldb.Row)
hc := make(chan ksqldb.Header, 1)

k := "SELECT ROWTIME, ID, NAME, DOGSIZE, AGE FROM DOGS EMIT CHANGES;"

// This Go routine will handle rows as and when they
// are sent to the channel
go func() {
    var NAME string
    var DOG_SIZE string
    for row := range rc {
        if row != nil {
            // Should do some type assertions here
            NAME = row[2].(string)
            DOG_SIZE = row[3].(string)

            fmt.Printf("🐾%v: %v\n",  NAME, DOG_SIZE)
        }
    }
}()

ctx, ctxCancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer ctxCancel()

e := client.Push(ctx, k, rc, hc)

if e != nil {
    // handle the error better here, e.g. check for no rows returned
    return fmt.Errorf("Error running Push request against ksqlDB:\n%v", e)
}
----

=== Execute a command

[source,go]
----
if err := client.Execute(ctx, ksqlDBServer, `
	CREATE STREAM DOGS (ID STRING KEY, 
						NAME STRING, 
						DOGSIZE STRING, 
						AGE STRING) 
				  WITH (KAFKA_TOPIC='dogs', 
				  VALUE_FORMAT='JSON');
`); err != nil {
    return fmt.Errorf("Error creating the DOGS stream.\n%v", err)
}
----

== TODO

See https://github.com/rmoff/ksqldb-go/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound = errors.New("No result found")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

The ksqlDB client

func NewClient

func NewClient(url string, u string, p string) *Client

NewClient creates new ksqldb client with log.Println default logging

func (*Client) Debug

func (cl *Client) Debug() *Client

Debug sets debug mode for logging

func (*Client) Execute

func (cl *Client) Execute(q string) (err error)

Execute will execute a ksqlDB statement, such as creating a new stream or table. To run queries use Push or Pull functions.

To use this function pass in the base URL of your ksqlDB server, and the SQL query statement

Ref: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/

TODO Add support for commandSequenceNumber and streamsProperties TODO Add better support for responses to CREATE/DROP/TERMINATE (e.g. commandID, commandStatus.status, etc)

func (*Client) Pull

func (cl *Client) Pull(ctx context.Context, q string, s bool) (h Header, r Payload, err error)

Pull queries are like "traditional" RDBMS queries in which the query terminates once the state has been queried.

To use this function pass in the the SQL query statement, and a boolean for whether full table scans should be enabled.

The function returns a ksqldb.Header and ksqldb.Payload which will hold one or more rows of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:

var COL1 string
var COL2 float64
for _, row := range r {
	COL1 = row[0].(string)
	COL2 = row[1].(float64)
	// Do other stuff with the data here
	}
}

func (*Client) Push

func (cl *Client) Push(ctx context.Context, q string, rc chan<- Row, hc chan<- Header) (err error)

Push queries are continuous queries in which new events or changes to a table's state are pushed to the client. You can think of them as subscribing to a stream of changes.

Since push queries never end, this function expects a channel to which it can write new rows of data as and when they are received.

To use this function pass in a context, the SQL query statement, and two channels:

  • ksqldb.Row - rows of data
  • ksqldb.Header - header (including column definitions). If you don't want to block before receiving row data then make this channel buffered.

The channel is populated with ksqldb.Row which represents one row of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:

var DATA_TS float64
var ID string
for row := range rc {
	if row != nil {
		DATA_TS = row[0].(float64)
		ID = row[1].(string)

func (*Client) SetLogFunc

func (cl *Client) SetLogFunc(fn func(format string, v ...interface{})) *Client

SetLogFunc sets custom logging function with func(format string, v ...interface{}) profile

type Column

type Column struct {
	Name string
	Type string
}

Column represents the metadata for a column in a Row

type Header struct {
	// contains filtered or unexported fields
}

Header represents a header returned from a query

type Payload

type Payload []Row

Payload represents multiple rows

type Row

type Row []interface{}

Row represents a row returned from a query

Directories

Path Synopsis
@rmoff
@rmoff

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL