connect

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2016 License: MIT Imports: 8 Imported by: 7

README

Kafka Connect CLI

Release Build Status Coverage Status Go Report Card GoDoc

A fast, portable, self-documenting CLI tool to inspect and manage Kafka Connect connectors via the REST API. Because you don't want to be fumbling through runbooks of curl commands when something's going wrong, or ever really.

This project also contains a Go library for the Kafka Connect API usable by other Go tools or applications. See Using the Go Library for details.

Usage

The tool is self-documenting: run kafka-connect help or kafka-connect help <subcommand> when you need a reference. A summary of functionality:

$ kafka-connect
usage: kafka-connect [<flags>] <command> [<args> ...]

Command line utility for managing Kafka Connect.

Flags:
  -h, --help     Show context-sensitive help (also try --help-long and --help-man).
      --version  Show application version.
  -H, --host=http://localhost:8083/
                Host address for the Kafka Connect REST API instance.

Commands:
  help [<command>...]
    Show help.

  list
    Lists active connectors. Aliased as 'ls'.

  create [<flags>] [<name>]
    Creates a new connector instance.

  update <name>
    Updates a connector.

  delete <name>
    Deletes a connector. Aliased as 'rm'.

  show <name>
    Shows information about a connector and its tasks.

  config <name>
    Displays configuration of a connector.

  tasks <name>
    Displays tasks currently running for a connector.

  status <name>
    Gets current status of a connector.

  pause <name>
    Pause a connector and its tasks.

  resume <name>
    Resume a paused connector.

  restart <name>
    Restart a connector and its tasks.

  version
    Shows kafka-connect version information.

For examples, see the Godoc page for the command.

The process exits with a zero status when operations are successful and non-zero in the case of errors.

Manual Page

If you'd like a man page, you can generate one and place it on your MANPATH:

$ kafka-connect --help-man > /usr/local/share/man/man1/kafka-connect.1
Options

Expanded details for select parameters:

  • --host / -H: API host address, default http://localhost:8083/. Can be set with environment variable KAFKA_CONNECT_CLI_HOST. Note that you can target any host in a Kafka Connect cluster.

Installation

Binary releases are available on GitHub, signed and with checksums.

Fetch the appropriate version for your platform and place it somewhere on your PATH. The YOLO way:

$ curl -L https://github.com/go-kafka/connect/releases/download/v0.9.0/kafka-connect-v0.9.0-linux-amd64.zip
$ unzip kafka-connect-v0.9.0-linux-amd64.zip
$ mv linux-amd64/kafka-connect /usr/local/bin/

The prudent way:

$ curl -L https://github.com/go-kafka/connect/releases/download/v0.9.0/kafka-connect-v0.9.0-linux-amd64.zip
$ curl -L https://github.com/go-kafka/connect/releases/download/v0.9.0/kafka-connect-v0.9.0-linux-amd64.zip.sha256sum
# Verify integrity of the archive file, on OS X try shasum --check
$ sha256sum --check kafka-connect-v0.9.0-linux-amd64.zip.sha256sum
$ unzip kafka-connect-v0.9.0-linux-amd64.zip
$ mv linux-amd64/kafka-connect /usr/local/bin/

Or best of all, the careful way:

$ curl -L https://github.com/go-kafka/connect/releases/download/v0.9.0/kafka-connect-v0.9.0-linux-amd64.zip
$ unzip kafka-connect-v0.9.0-linux-amd64.zip
# Verify signature of the binary:
$ gpg --verify linux-amd64/kafka-connect{.asc,}
$ mv linux-amd64/kafka-connect /usr/local/bin/

You can find my GPG key distributed on keyservers with ID 8638EE95. The fingerprint is:

23D6 18B5 3AB8 209F F172  C070 6E5C D3ED 8638 EE95

For a more detailed primer on GPG signatures and key authenticity, check out the Apache Software Foundation's doc.

Cross-compiled binaries are possibly untested—please report any issues. If you would like a binary build for a platform that is not currently published, I'm happy to make one available as long as Go can cross-compile it without problem—please open an issue.

To build your own version from source, see the below Building and Development section.

Command Completion

Shell completion is built in for bash and zsh, just add the following to your shell profile initialization (~/.bash_profile or the like):

which kafka-connect >/dev/null && eval "$(kafka-connect --completion-script-bash)"

Predictably, use --completion-script-zsh for zsh.

Building and Development

This project is implemented in Go and uses Glide to achieve reproducible builds. You don't need Glide unless you want to make changes to dependencies, though, since the dependency sources are checked into source control.

Once you have a working Go toolchain, it is simple to build like any standard Go project:

$ go get -d github.com/go-kafka/connect/...
$ cd $GOPATH/github.com/go-kafka/connect
$ go build    # or
$ go install  # or
$ go test     # etc.

If you are building with Go 1.5, be sure to export GO15VENDOREXPERIMENT=1 to resolve the dependencies in vendor/. This is default in Go 1.6.

Cross-compiling is again standard Go procedure: set GOOS and GOARCH. For example if you wanted to build a CLI tool binary for Linux on ARM:

$ env GOOS=linux GOARCH=arm go build ./cmd/...
$ file ./kafka-connect
kafka-connect: ELF 32-bit LSB executable, ARM, version 1 (SYSV), statically linked, not stripped
Testing with Ginkgo

This project uses the Ginkgo BDD testing library. You can run the tests normally with go test or make test. If you wish to use additional features of the Ginkgo CLI tool like watch mode or generating stub test files, etc. you'll need to install it on your regular GOPATH using:

$ go install github.com/onsi/ginkgo/ginkgo

This tool is not included in vendor/, in part because Glide doesn't support that yet, and also because it's optional.

Using the Go Library

GoDoc

To use the Go library, simply use go get and import it in your code as usual:

$ go get -u github.com/go-kafka/connect

The library has no dependencies beyond the standard library. Dependencies in this repository's vendor/ are for the CLI tool (the cmd sub-package, not installed unless you append /... to the go get command above).

See the API documentation linked above for examples.

Versions

For information about versioning policy and compatibility status please see the release notes.

Alternatives

https://github.com/datamountaineer/kafka-connect-tools

When I wanted a tool like this, I found this one. It's written in Scala—I <3 Scala, but JVM start-up time is sluggish for CLI tools, and it's much easier to distribute self-contained native binaries to management hosts that don't require a JVM installed.

Similar things can be said of Kafka's packaged management scripts, which are less ergonomic. Hence, I wrote this Go variant.

Kudos to the kafka-connect-tools authors for inspiration.

Contributing

Please see the Contributing Guide!

License

The library and CLI tool are made available under the terms of the MIT license, see the LICENSE file for full details.

Documentation

Overview

Package connect provides a client for the Kafka Connect REST API.

Index

Constants

View Source
const (
	// DefaultHostURL is the default HTTP host used for connecting to a Kafka
	// Connect REST API.
	DefaultHostURL = "http://localhost:8083/"
)
View Source
const (
	// StatusUnprocessableEntity is the status code returned when sending a
	// request with invalid fields.
	StatusUnprocessableEntity = 422
)
View Source
const Version = "0.9.0"

Version is the go-kafka/connect library version.

Variables

This section is empty.

Functions

This section is empty.

Types

type APIError

type APIError struct {
	Code     int            `json:"error_code"`
	Message  string         `json:"message"`
	Response *http.Response // HTTP response that caused this error
}

APIError holds information returned from a Kafka Connect API instance about why an API call failed.

func (APIError) Error

func (e APIError) Error() string

type Client

type Client struct {

	// HTTP client used to communicate with the API. By default
	// http.DefaultClient will be used.
	HTTPClient *http.Client

	// User agent used when communicating with the Kafka Connect API.
	UserAgent string
	// contains filtered or unexported fields
}

A Client manages communication with the Kafka Connect REST API.

func NewClient

func NewClient(host ...string) *Client

NewClient returns a new Kafka Connect API client that communicates with the optional host. If no host is given, DefaultHostURL (localhost) is used.

func (*Client) CreateConnector

func (c *Client) CreateConnector(conn *Connector) (*http.Response, error)

CreateConnector creates a new connector instance. If successful, conn is updated with the connector's state returned by the API, including Tasks.

Passing an object that already contains Tasks produces an error.

See: http://docs.confluent.io/current/connect/userguide.html#post--connectors

func (*Client) DeleteConnector

func (c *Client) DeleteConnector(name string) (*http.Response, error)

DeleteConnector deletes a connector with the given name, halting all tasks and deleting its configuration.

See: http://docs.confluent.io/current/connect/userguide.html#delete--connectors-(string-name)-

func (*Client) Do

func (c *Client) Do(req *http.Request, v interface{}) (*http.Response, error)

Do sends an API request and returns the API response. The API response is JSON-decoded and stored in the value pointed to by v, or returned as an error if an API or HTTP error has occurred.

func (*Client) GetConnector

func (c *Client) GetConnector(name string) (*Connector, *http.Response, error)

GetConnector retrieves information about a connector with the given name.

See: http://docs.confluent.io/current/connect/userguide.html#get--connectors-(string-name)

func (*Client) GetConnectorConfig

func (c *Client) GetConnectorConfig(name string) (ConnectorConfig, *http.Response, error)

GetConnectorConfig retrieves configuration for a connector with the given name.

See: http://docs.confluent.io/current/connect/userguide.html#get--connectors-(string-name)-config

func (*Client) GetConnectorStatus

func (c *Client) GetConnectorStatus(name string) (*ConnectorStatus, *http.Response, error)

GetConnectorStatus gets current status of the connector, including whether it is running, failed or paused, which worker it is assigned to, error information if it has failed, and the state of all its tasks.

See: http://docs.confluent.io/current/connect/userguide.html#get--connectors-(string-name)-status

func (*Client) GetConnectorTasks

func (c *Client) GetConnectorTasks(name string) ([]Task, *http.Response, error)

GetConnectorTasks retrieves a list of tasks currently running for a connector with the given name.

See: http://docs.confluent.io/current/connect/userguide.html#get--connectors-(string-name)-tasks

func (*Client) Host

func (c *Client) Host() string

Host returns the API root URL the Client is configured to talk to.

func (*Client) ListConnectors

func (c *Client) ListConnectors() ([]string, *http.Response, error)

ListConnectors retrieves a list of active connector names.

See: http://docs.confluent.io/current/connect/userguide.html#get--connectors

func (*Client) NewRequest

func (c *Client) NewRequest(method, path string, body interface{}) (*http.Request, error)

NewRequest creates an API request. A relative URL can be provided in path, in which case it is resolved relative to the BaseURL of the Client. Relative URLs should always be specified without a preceding slash. If specified, the value pointed to by body is JSON-encoded and included as the request body.

func (*Client) PauseConnector

func (c *Client) PauseConnector(name string) (*http.Response, error)

PauseConnector pauses a connector and its tasks, which stops message processing until the connector is resumed. Tasks will transition to PAUSED state asynchronously.

See: http://docs.confluent.io/current/connect/userguide.html#put--connectors-(string-name)-pause

func (*Client) RestartConnector

func (c *Client) RestartConnector(name string) (*http.Response, error)

RestartConnector restarts a connector and its tasks.

See http://docs.confluent.io/current/connect/userguide.html#post--connectors-(string-name)-restart

func (*Client) ResumeConnector

func (c *Client) ResumeConnector(name string) (*http.Response, error)

ResumeConnector resumes a paused connector. Tasks will transition to RUNNING state asynchronously.

See: http://docs.confluent.io/current/connect/userguide.html#put--connectors-(string-name)-resume

func (*Client) UpdateConnectorConfig

func (c *Client) UpdateConnectorConfig(name string, config ConnectorConfig) (*Connector, *http.Response, error)

UpdateConnectorConfig updates configuration for an existing connector with the given name, returning the new state of the Connector.

If the connector does not exist, it will be created, and the returned HTTP response will indicate a 201 Created status.

See: http://docs.confluent.io/current/connect/userguide.html#put--connectors-(string-name)-config

type Connector

type Connector struct {
	Name   string          `json:"name"`
	Config ConnectorConfig `json:"config,omitempty"`
	Tasks  []TaskID        `json:"tasks,omitempty"`
}

A Connector represents a Kafka Connect connector instance.

See: http://docs.confluent.io/current/connect/userguide.html#connectors-tasks-and-workers

type ConnectorConfig

type ConnectorConfig map[string]string

ConnectorConfig is a key-value mapping of configuration for connectors, where keys are in the form of Java properties.

See: http://docs.confluent.io/current/connect/userguide.html#configuring-connectors

type ConnectorState

type ConnectorState struct {
	State    string `json:"state"`
	WorkerID string `json:"worker_id"`
}

ConnectorState reflects the running state of a Connector and the worker where it is running.

type ConnectorStatus

type ConnectorStatus struct {
	Name      string         `json:"name"`
	Connector ConnectorState `json:"connector"`
	Tasks     []TaskState    `json:"tasks"`
}

ConnectorStatus reflects the status of a Connector and state of its Tasks.

Having connector name and a "connector" object at top level is a little awkward and produces stuttering, but it's their design, not ours.

type Task

type Task struct {
	ID     TaskID            `json:"id"`
	Config map[string]string `json:"config"`
}

A Task is a unit of work dispatched by a Connector to parallelize the work of a data copy job.

See: http://docs.confluent.io/current/connect/userguide.html#connectors-tasks-and-workers

type TaskID

type TaskID struct {
	ConnectorName string `json:"connector"`
	ID            int    `json:"task"`
}

A TaskID has two components, a numerical ID and a connector name by which the ID is scoped.

type TaskState

type TaskState struct {
	ID       int    `json:"id"`
	State    string `json:"state"`
	WorkerID string `json:"worker_id"`
	Trace    string `json:"trace,omitempty"`
}

TaskState reflects the running state of a Task and the worker where it is running.

Directories

Path Synopsis
cmd
kafka-connect
kafka-connect is a command line utility for managing Kafka Connect.
kafka-connect is a command line utility for managing Kafka Connect.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL