Documentation ¶
Index ¶
- func IsNotRunning(task connect.TaskState) bool
- func IsRunning(task connect.TaskState) bool
- type Config
- type ConnectorManager
- func (c *ConnectorManager) Add(connectors []connect.Connector) error
- func (c *ConnectorManager) GetAllConnectors() ([]*ConnectorWithState, error)
- func (c *ConnectorManager) GetAllPlugins() ([]*connect.Plugin, error)
- func (c *ConnectorManager) GetClusterInfo() (*connect.ClusterInfo, error)
- func (c *ConnectorManager) GetConnector(connectorName string) (*ConnectorWithState, error)
- func (c *ConnectorManager) ListConnectors() ([]string, error)
- func (c *ConnectorManager) LivenessCheck() (string, func() error)
- func (c *ConnectorManager) Manage(source ConnectorSource, stopCH <-chan struct{}) error
- func (c *ConnectorManager) Pause(connectors []string) error
- func (c *ConnectorManager) ReadinessCheck() (string, func() error)
- func (c *ConnectorManager) Remove(connectorNames []string) error
- func (c *ConnectorManager) Restart(connectorNames []string, restartTasks bool, forceRestartTasks bool, ...) error
- func (c *ConnectorManager) Resume(connectors []string) error
- func (c *ConnectorManager) Status(connectors []string) ([]*connect.ConnectorStatus, error)
- func (c *ConnectorManager) Sync(source ConnectorSource) error
- func (c *ConnectorManager) ValidatePlugins(config connect.ConnectorConfig) (*connect.ConfigValidation, error)
- type ConnectorSource
- type ConnectorWithState
- type Logger
- type Option
- type Policy
- type RestartPolicy
- type TaskPredicate
- type Tasks
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsNotRunning ¶ added in v0.1.1
IsNotRunning returns true if the connector task is not in a RUNNING state
Types ¶
type Config ¶
type Config struct { ClusterURL string `json:"cluster_url"` SyncPeriod time.Duration `json:"sync_period"` InitialWaitPeriod time.Duration `json:"initial_wait_period"` AllowPurge bool `json:"allow_purge"` AutoRestart bool `json:"auto_restart"` Version string `json:"version"` GlobalConnectorRestartsMax int `json:"global_connector_restarts_max"` GlobalConnectorRestartPeriod time.Duration `json:"global_connector_restart_period"` GlobalTaskRestartsMax int `json:"global_task_restarts_max"` GlobalTaskRestartPeriod time.Duration `json:"global_task_restart_period"` RestartOverrides *RestartPolicy `json:"restart_policy"` }
Config represents the connect manager configuration
type ConnectorManager ¶
type ConnectorManager struct {
// contains filtered or unexported fields
}
ConnectorManager manages connectors in a Kafka Connect cluster
func NewConnectorsManager ¶
func NewConnectorsManager(client client, config *Config, opts ...Option) (*ConnectorManager, error)
NewConnectorsManager creates a new ConnectorManager
func (*ConnectorManager) Add ¶
func (c *ConnectorManager) Add(connectors []connect.Connector) error
Add will add connectors to a cluster
func (*ConnectorManager) GetAllConnectors ¶
func (c *ConnectorManager) GetAllConnectors() ([]*ConnectorWithState, error)
GetAllConnectors returns all the connectors in a cluster
func (*ConnectorManager) GetAllPlugins ¶
func (c *ConnectorManager) GetAllPlugins() ([]*connect.Plugin, error)
GetAllPlugins returns all the connector plugins installed
func (*ConnectorManager) GetClusterInfo ¶ added in v0.1.31
func (c *ConnectorManager) GetClusterInfo() (*connect.ClusterInfo, error)
GetClusterInfo returns kafka cluster info
func (*ConnectorManager) GetConnector ¶ added in v0.1.1
func (c *ConnectorManager) GetConnector(connectorName string) (*ConnectorWithState, error)
GetConnector returns information about a named connector in the cluster
func (*ConnectorManager) ListConnectors ¶ added in v0.1.1
func (c *ConnectorManager) ListConnectors() ([]string, error)
ListConnectors returns the names of all connectors in the cluster
func (*ConnectorManager) LivenessCheck ¶ added in v0.1.3
func (c *ConnectorManager) LivenessCheck() (string, func() error)
LivenessCheck checks if the the kafka-connect instance is running. The timeout of 2 seconds is arbitrary.
func (*ConnectorManager) Manage ¶
func (c *ConnectorManager) Manage(source ConnectorSource, stopCH <-chan struct{}) error
Manage will start the connector manager running and managing connectors
func (*ConnectorManager) Pause ¶
func (c *ConnectorManager) Pause(connectors []string) error
Pause will pause a number of connectors in a cluster
func (*ConnectorManager) ReadinessCheck ¶ added in v0.1.3
func (c *ConnectorManager) ReadinessCheck() (string, func() error)
ReadinessCheck checks if we have been able to start syncing with kafka-connect
func (*ConnectorManager) Remove ¶
func (c *ConnectorManager) Remove(connectorNames []string) error
Remove will remove connectors from a cluster
func (*ConnectorManager) Restart ¶
func (c *ConnectorManager) Restart(connectorNames []string, restartTasks bool, forceRestartTasks bool, taskIDs []int) error
Restart will restart a number of connectors in a cluster
func (*ConnectorManager) Resume ¶
func (c *ConnectorManager) Resume(connectors []string) error
Resume will resume a number of connectors in a cluster
func (*ConnectorManager) Status ¶ added in v0.1.32
func (c *ConnectorManager) Status(connectors []string) ([]*connect.ConnectorStatus, error)
Status - gets status of specified (or all) connectors
func (*ConnectorManager) Sync ¶
func (c *ConnectorManager) Sync(source ConnectorSource) error
Sync will synchronise the desired and actual state of connectors in a cluster
func (*ConnectorManager) ValidatePlugins ¶ added in v0.1.33
func (c *ConnectorManager) ValidatePlugins(config connect.ConnectorConfig) (*connect.ConfigValidation, error)
ValidatePlugins returns validation results of a connector config
type ConnectorSource ¶
ConnectorSource will return a slice of the desired connector configuration
type ConnectorWithState ¶
type ConnectorWithState struct { Name string `json:"name"` Config connect.ConnectorConfig `json:"config,omitempty"` ConnectorState connect.ConnectorState `json:"connectorState,omitempty"` Tasks Tasks `json:"tasks,omitempty"` }
ConnectorWithState is the connect config and state
type Logger ¶ added in v0.1.12
type Logger interface { Infof(message string, args ...interface{}) Warnf(message string, args ...interface{}) Debugf(message string, args ...interface{}) Errorf(message string, args ...interface{}) }
Logger is the interface to implement to get all of the great news/updates
type Option ¶ added in v0.1.12
type Option func(c *ConnectorManager)
Option can be supplied that override the default ConnectorManager properties
func WithLogger ¶ added in v0.1.12
WithLogger allows for a logger of choice to be injected
type Policy ¶ added in v0.1.11
type Policy struct { ConnectorRestartsMax int `json:"connector_restarts_max"` ConnectorRestartPeriod time.Duration `json:"connector_restart_period"` TaskRestartsMax int `json:"task_restarts_max"` TaskRestartPeriod time.Duration `json:"task_restart_period"` }
Policy contains a collection of values to be managed
type RestartPolicy ¶ added in v0.1.11
RestartPolicy lists each connectors maximum restart policy If AutoRestart == true If a policy does not exist for a connector the connector or task will be restarted once. If a connector or task is restarted the count of failed attempts is reset. If the number of unsuccessful restarts is reached the manager will return and connectctl will stop.
type TaskPredicate ¶ added in v0.1.1
TaskPredicate is a function that performs some test on a connect.TaskState
func ByID ¶ added in v0.1.1
func ByID(taskIDs ...int) TaskPredicate
ByID returns a predicate that returns true if the connector task has one of the given task IDs