confluent

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2021 License: Apache-2.0 Imports: 15 Imported by: 1

README

Confluent Platform Client

codecov Build Status FOSSA Status

Context

We're running Confluent Platform (Not Confluent Cloud) that hosted on our infrastructure, and we can't find any clients that support us to integrate with Confluent Platform API. This project for our Confluent Platform with some configurations:

  • SASL Authentication with username + password (Support both basic auth and LDAP user)
  • Kafka Brokers embeded MDS server
  • RBAC enabled

Usages:

Installation
go get github.com/wayarmny/gonfluent
Implementation example
  • I'm using 2 clients to connect with Confluent: HTTP clients and Sarama Client to connect with 1 Confluent cluster, so that when you do initiate Gonfluent, you need to initiate 2 authentications methods.
package main

import (
	"fmt"
	"os"

	confluent "github.com/wayarmy/gonfluent"
)

const (
	ClientVersion = "0.1"
	UserAgent     = "confluent-client-go-sdk-" + ClientVersion
)

func main() {
	baseUrl := "https://localhost:8090"

	username := os.Getenv("CONFLUENT_USER")
	password := os.Getenv("CONFLUENT_PASSWORD")

	// Initialize the client
	httpClient := confluent.NewDefaultHttpClient(baseUrl, username, password)
	httpClient.UserAgent = UserAgent
	bootstrapServer := []string{
		"localhost:9093",
	}
	kConfig := &confluent.Config{
		BootstrapServers: &bootstrapServer,
		CACert:                  "certs/ca.pem",
		ClientCert:              "certs/cert.pem",
		ClientCertKey:           "certs/key.pem",
		SkipTLSVerify:           true,
		SASLMechanism:           "plain",
		TLSEnabled:              true,
		Timeout:                 120,
	}
	kClient, err := confluent.NewSaramaClient(kConfig)
	if err != nil {
		panic(err)
	}

	client := confluent.NewClient(httpClient, kClient)
	bearerToken, err := client.Login()
	if err != nil {
		panic(err)
	}
	httpClient.Token = bearerToken

	// Get the list of clusters in Confluent platform
	listCluster, err := client.ListKafkaCluster()
	if err != nil {
		panic(err)
	}

	fmt.Println(listCluster)

	// Get the cluster information
	clusterId := listCluster[0].ClusterID
	cluster, err := client.GetKafkaCluster(clusterId)
	if err != nil {
		panic(err)
	}
	fmt.Printf("%#v", cluster)
	fmt.Println(cluster)
	//
	//topicConfig, err := client.GetTopicConfigs(clusterId, "example_topic_name")
	//if err != nil {
	//	panic(err)
	//}
	//fmt.Printf("%#v", topicConfig)


	//topic, err := client.GetTopic(clusterId, "test7-terraform-confluent-provider")
	//if err != nil {
	//	panic(err)
	//}
	//fmt.Println(topic.Partitions)


	////Create a new topic
	//partitionConfig := []confluent.TopicConfig{
	//	{
	//		Name:  "compression.type",
	//		Value: "gzip",
	//	},
	//	{
	//		Name:  "cleanup.policy",
	//		Value: "compact",
	//	},
	//	//{
	//	//	Name:  "retention.ms",
	//	//	Value: "20000",
	//	//},
	//}
	//err = client.CreateTopic(clusterId, "test_topic_name", 3, 3, partitionConfig, nil)
	//if err != nil {
	//	panic(err)
	//}
	//fmt.Println("Topic Created!")

	d := []confluent.TopicConfig{
		{
			Name:  "compression.type",
			Value: "gzip",
		},
		{
			Name:  "cleanup.policy",
			Value: "compact",
		},
		{
			Name:  "retention.ms",
			Value: "300000",
		},
	}

	err = client.UpdateTopicConfigs(clusterId, "test_topic_name", d)
	if err != nil {
		panic(err)
	}
	fmt.Println("Topic Updated!")
	//


	// // Create Principal but don't know use case for it
	// testPrincipals := []confluent.UserPrincipalAction{
	// 	{
	// 		Scope: confluent.Scope{
	// 			Clusters: confluent.AuthorClusters{
	// 				KafkaCluster: clusterId,
	// 			},
	// 		},
	// 		ResourceName: "Testing-Principal",
	// 		ResourceType: "Cluster",
	// 		Operation:    "Read",
	// 	},
	// }

	// newPrincipal, err := client.CreatePrincipal("User:system-platform", testPrincipals)
	// if err != nil {
	// 	panic(err)
	// }
	// fmt.Printf("%s", newPrincipal)

	//

	// Get the topics information
	//getTopic, err := client.GetTopic(clusterId, "test2_topic_name")
	//if err != nil {
	//	panic(err)
	//}
	//fmt.Printf("%v", getTopic)

	//Delete the existing topic
	//deleteTopic := client.DeleteTopic(clusterId, "test_topic_name")
	//if deleteTopic != nil {
	//	panic(deleteTopic)
	//}
	//fmt.Println("Topic deleted")

	//// Add role assignment
	//c := confluent.ClusterDetails{}
	//c.Clusters.KafkaCluster = clusterId
	//err = client.BindPrincipalToRole("User:manh.do", "Operator", c)
	//if err != nil {
	//	panic(err)
	//}
	//fmt.Println("Role Binded!")

	// // Add role assignment for principal to topic
	// r := confluent.UpdateRoleBinding{
	// 	Scope: c,
	// }
	// r.ResourcePatterns = []confluent.RoleBindings{
	// 	{
	// 		ResourceType: "Topic",
	// 		Name:         "system-platform",
	// 		PatternType:  "PREFIXED",
	// 	},
	// }
	// principalCN := "User:CN=common-name.example.com"
	// err = client.IncreaseRoleBinding(principalCN, "ResourceOwner", r)
	// if err != nil {
	// 	panic(err)
	// }
	// fmt.Println("Role Binded!")
	//t := confluent.Topic{
	//	Name: "test_topic_name",
	//	Partitions: 10,
	//	ReplicationFactor: 4,
	//}
	//err = client.UpdatePartitions(t)
	//if err != nil {
	//	panic(err)
	//}
	//fmt.Println("Update partition successfully")
	//err = client.UpdateReplicationsFactor(t)
	//if err != nil {
	//	panic(err)
	//}
	//fmt.Println("Update RF successfully")

}

Contributing

  • Clone this project
git clone ...
  • Install any requirements
go mod download
  • Add your code and logics
  • Push to new branch and submit merge request after all test success
Testing

go test

License

FOSSA Status

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Acl

type Acl struct {
	ClusterId    string `json:"cluster_id,omitempty"`
	ResourceType string `json:"resource_type,omitempty"`
	ResourceName string `json:"resource_name,omitempty"`
	PatternType  string `json:"pattern_type,omitempty"`
	Principal    string `json:"principal,omitempty"`
	Host         string `json:"host,omitempty"`
	Operation    string `json:"operation,omitempty"`
	Permission   string `json:"permission,omitempty"`
}

type Authenticate

type Authenticate struct {
	AuthToken string `json:"auth_token"`
	TokenType string `json:"token_type"`
	ExpiresIn int    `json:"expires_in"`
}

type AuthenticateError

type AuthenticateError struct {
	StatusCode int    `json:"status_code"`
	ErrrorCode int    `json:"errror_code"`
	Type       string `json:"type"`
	Message    string `json:"message"`
	Errors     []struct {
		ErrorType string `json:"error_type"`
		Message   string `json:"message"`
	} `json:"errors"`
}

type AuthorClusters

type AuthorClusters struct {
	KafkaCluster string `json:"kafka-cluster"`
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client will provide all Confluent clients

func NewClient

func NewClient(httpClient HttpClient, saramaClient SaramaClient, saramaClusterAdmin SaramaClusterAdmin) *Client

func (*Client) BindPrincipalToRole

func (c *Client) BindPrincipalToRole(principal, roleName string, cDetails ClusterDetails) error

BindPrincipalToRole will bind the principal to a cluster-scoped role for a specific cluster or in a given scope

func (*Client) CreateAcl

func (c *Client) CreateAcl(clusterId string, aclConfig *Acl) error

Creates an ACL. @ref https://docs.confluent.io/platform/current/kafka-rest/api.html#post--clusters-cluster_id-acls

func (*Client) CreatePrincipal

func (c *Client) CreatePrincipal(userPrincipal string, principals []UserPrincipalAction) (*UserPrincipal, error)

func (*Client) CreateTopic

func (c *Client) CreateTopic(clusterId, topicName string, partitionsCount, replicationFactor int, configs []TopicConfig, replicasAssignments []ReplicasAssignment) error

func (*Client) DecreaseRoleBinding

func (c *Client) DecreaseRoleBinding(principal, roleName string, uRoleBinding RoleBinding) error

DecreaseRoleBinding : Incrementally remove the resources from the principal at the given scope/cluster using the given role

func (*Client) DeleteAcl

func (c *Client) DeleteAcl(clusterId, resourceName string) error

Deletes the list of ACLs that matches the search criteria. Parameters:

cluster_id (string) – The Kafka cluster ID.

Query Parameters:

resource_type (string) – The ACL resource type.
resource_name (string) – The ACL resource name.
pattern_type (string) – The ACL pattern type.
principal (string) – The ACL principal.
host (string) – The ACL host.
operation (string) – The ACL operation.
permission (string) – The ACL permission.

@ref https://docs.confluent.io/platform/current/kafka-rest/api.html#delete--clusters-cluster_id-acls

func (*Client) DeleteRoleBinding

func (c *Client) DeleteRoleBinding(principal, roleName string, cDetails ClusterDetails) error

DeleteRoleBinding remove the role (cluster or resource scoped) from the principal at the give scope/cluster

func (*Client) DeleteTopic

func (c *Client) DeleteTopic(clusterId, topicName string) error

func (*Client) DoRequest

func (c *Client) DoRequest(method string, uri string, reqBody io.Reader) ([]byte, error)

func (*Client) GetKafkaCluster

func (c *Client) GetKafkaCluster(clusterId string) (*KafkaCluster, error)

Returns the Kafka cluster with the specified cluster_id. @ref https://docs.confluent.io/platform/current/kafka-rest/api.html#get--clusters-cluster_id

func (*Client) GetTopic

func (c *Client) GetTopic(clusterId, topicName string) (*Topic, error)

func (*Client) GetTopicConfigs

func (c *Client) GetTopicConfigs(clusterId string, topicName string) ([]TopicConfig, error)

@ref https://docs.confluent.io/platform/current/kafka-rest/api.html#get--clusters-cluster_id-topics-topic_name-configs Return the list of configs that belong to the specified topic.

func (*Client) GetTopicPartitions

func (c *Client) GetTopicPartitions(clusterId, topicName string) ([]Partition, error)

func (*Client) IncreaseRoleBinding

func (c *Client) IncreaseRoleBinding(principal, roleName string, uRoleBinding RoleBinding) error

IncreaseRoleBinding : incrementally grant the resources to the principal at the given scope/cluster using the given role

func (*Client) IsReplicationFactorUpdating

func (c *Client) IsReplicationFactorUpdating(topic string) (bool, error)

func (*Client) ListAcls

func (c *Client) ListAcls(clusterId string) ([]Acl, error)

Returns a list of ACLs that match the search criteria. Parameters:

cluster_id (string) – The Kafka cluster ID.

Query Parameters:

resource_type (string) – The ACL resource type.
resource_name (string) – The ACL resource name.
pattern_type (string) – The ACL pattern type.
principal (string) – The ACL principal.
host (string) – The ACL host.
operation (string) – The ACL operation.
permission (string) – The ACL permission.

@ref https://docs.confluent.io/platform/current/kafka-rest/api.html#get--clusters-cluster_id-acls

func (*Client) ListKafkaCluster

func (c *Client) ListKafkaCluster() ([]KafkaCluster, error)

Returns a list of known Kafka clusters. Currently both Kafka and Kafka REST Proxy are only aware of the Kafka cluster pointed at by the bootstrap.servers configuration. Therefore only one Kafka cluster will be returned in the response. @ref https://docs.confluent.io/platform/current/kafka-rest/api.html#get--clusters

func (*Client) ListTopics

func (c *Client) ListTopics(clusterId string) ([]Topic, error)

func (*Client) Login

func (c *Client) Login() (string, error)

func (*Client) LookupRoleBinding

func (c *Client) LookupRoleBinding(principal, roleName string, cDetails ClusterDetails) ([]ResourcePattern, error)

LookupRoleBinding will lookup the role-bindings for the principal at the given scope/cluster using the given role

func (*Client) OverwriteRoleBinding

func (c *Client) OverwriteRoleBinding(principal, roleName string, uRoleBinding RoleBinding) error

OverwriteRoleBinding will overwrite existing resource grants

func (*Client) UpdatePartitions

func (c *Client) UpdatePartitions(t Topic) error

func (*Client) UpdateReplicationsFactor

func (c *Client) UpdateReplicationsFactor(t Topic) error

func (*Client) UpdateTopicConfigs

func (c *Client) UpdateTopicConfigs(clusterId string, topicName string, data []TopicConfig) error

@ref Return the list of configs that belong to the specified topic. Updates or deletes a set of topic configs.

type ClusterDetails

type ClusterDetails struct {
	ClusterName string   `json:"clusterName,omitempty"`
	Clusters    Clusters `json:"clusters"`
}

type Clusters

type Clusters struct {
	// Kafka cluster ID
	KafkaCluster string `json:"kafka-cluster,omitempty"`

	// Kafka Connect Cluster ID
	ConnectCluster string `json:"connect-cluster,omitempty"`

	// kSQL cluster ID
	KSqlCluster string `json:"ksql-cluster,omitempty"`

	// Schema Registry Cluster ID
	SchemaRegistryCluster string `json:"schema-registry-cluster,omitempty"`
}

Clusters active in Confluent system Support: - kafka cluster - Kafka connect cluster - KSql cluster - Schema Registry cluster @ref https://docs.confluent.io/platform/current/kafka-rest/api.html#cluster

type Config

type Config struct {
	BootstrapServers        *[]string
	Timeout                 int
	CACert                  string
	ClientCert              string
	ClientCertKey           string
	ClientCertKeyPassphrase string
	TLSEnabled              bool
	SkipTLSVerify           bool
	SASLUsername            string
	SASLPassword            string
	SASLMechanism           string
}

type DefaultHttpClient

type DefaultHttpClient struct {
	// BaseURL : https://localhost:8090
	// API endpoint of Confluent platform
	BaseUrl string

	// Define the user-agent would be sent to confluent api
	// Default: confluent-client-go-sdk
	Username string

	Password  string
	Token     string
	UserAgent string
}

func NewDefaultHttpClient

func NewDefaultHttpClient(baseUrl string, username string, password string) *DefaultHttpClient

func (*DefaultHttpClient) DoRequest

func (c *DefaultHttpClient) DoRequest(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error)

type DefaultSaramaClient

type DefaultSaramaClient struct {
	// contains filtered or unexported fields
}

func NewDefaultSaramaClient

func NewDefaultSaramaClient(config *Config) (*DefaultSaramaClient, sarama.Client, error)

Init kafka client point to sarama client

func (*DefaultSaramaClient) Brokers

func (k *DefaultSaramaClient) Brokers() []*sarama.Broker

func (*DefaultSaramaClient) Config

func (k *DefaultSaramaClient) Config() *sarama.Config

func (*DefaultSaramaClient) Controller

func (k *DefaultSaramaClient) Controller() (*sarama.Broker, error)

func (*DefaultSaramaClient) ID

func (k *DefaultSaramaClient) ID(broker *sarama.Broker) int32

func (*DefaultSaramaClient) Partitions

func (k *DefaultSaramaClient) Partitions(topic string) ([]int32, error)

func (*DefaultSaramaClient) RefreshMetadata

func (k *DefaultSaramaClient) RefreshMetadata() error

func (*DefaultSaramaClient) Replicas

func (k *DefaultSaramaClient) Replicas(topic string, partitionId int32) ([]int32, error)

type DefaultSaramaClusterAdmin

type DefaultSaramaClusterAdmin struct {
	// contains filtered or unexported fields
}

func (*DefaultSaramaClusterAdmin) AlterPartitionReassignments

func (ca *DefaultSaramaClusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error

func (*DefaultSaramaClusterAdmin) ListPartitionReassignments

func (ca *DefaultSaramaClusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error)

type ErrorResponse

type ErrorResponse struct {
	StatusCode int    `json:"status_code,omitempty"`
	ErrorCode  int    `json:"error_code"`
	Type       string `json:"type,omitempty"`
	Message    string `json:"message"`
	Errors     []struct {
		ErrorType string `json:"error_type,omitempty"`
		Message   string `json:"message,omitempty"`
	} `json:"errors,omitempty"`
}

type HttpClient

type HttpClient interface {
	DoRequest(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error)
}

type KafkaCluster

type KafkaCluster struct {
	ClusterID string `json:"cluster_id"`
}

type Metadata

type Metadata struct {
	Self         string `json:"self"`
	ResourceName string `json:"resource_name,omitempty"`
	Next         string `json:"next,omitempty"`
}

type Partition

type Partition struct {
	ClusterID   string `json:"cluster_id"`
	TopicName   string `json:"topic_name"`
	PartitionId int    `json:"partition_id"`
}
type Related struct {
	Related string `json:"related"`
}

type ReplicasAssignment

type ReplicasAssignment struct {
	ParitionId int   `json:"partition_id,omitempty"`
	BrokerIds  []int `json:"broker_ids,omitempty"`
}

type ResourcePattern

type ResourcePattern struct {
	ResourceType string `json:"resourceType,omitempty"`
	Name         string `json:"name,omitempty"`
	PatternType  string `json:"patternType,omitempty"`
}

type RoleBinding

type RoleBinding struct {
	Scope            ClusterDetails    `json:"scope"`
	ResourcePatterns []ResourcePattern `json:"resourcePatterns"`
}

type SaramaClient

type SaramaClient interface {
	Controller() (*sarama.Broker, error)
	Config() *sarama.Config
	RefreshMetadata() error
	Partitions(topic string) ([]int32, error)
	Brokers() []*sarama.Broker
	Replicas(topic string, partitionId int32) ([]int32, error)
	ID(broker *sarama.Broker) int32
}

type SaramaClusterAdmin

type SaramaClusterAdmin interface {
	ListPartitionReassignments(topic string, partitions []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error)
	AlterPartitionReassignments(topic string, assignment [][]int32) error
}

func NewDefaultSaramaClusterAdmin

func NewDefaultSaramaClusterAdmin(saramaClient sarama.Client) (SaramaClusterAdmin, error)

type Scope

type Scope struct {
	Clusters AuthorClusters `json:"clusters"`
}

type Synonyms

type Synonyms struct {
	Name      string `json:"name"`
	Value     string `json:"value,omitempty"`
	Source    string `json:"source,omitempty"`
	Operation string `json:"operation,omitempty"`
}

type Topic

type Topic struct {
	ClusterID           string               `json:"cluster_id,omitempty"`
	IsInternal          bool                 `json:"is_internal,omitempty"`
	Name                string               `json:"topic_name"`
	Partitions          int32                `json:"partitions_count,omitempty"`
	ReplicationFactor   int16                `json:"replication_factor,omitempty"`
	Config              []TopicConfig        `json:"configs,omitempty"`
	ReplicasAssignments []ReplicasAssignment `json:"replicas_assignments,omitempty"`
	PartitionsDetails   []Partition          `json:"partitions_details,omitempty"`
}

type TopicConfig

type TopicConfig struct {
	ClusterId   string     `json:"cluster_id,omitempty"`
	TopicName   string     `json:"topic_name,omitempty"`
	Name        string     `json:"name"`
	Value       string     `json:"value"`
	IsDefault   bool       `json:"is_default,omitempty"`
	IsReadOnly  bool       `json:"is_read_only,omitempty"`
	IsSensitive bool       `json:"is_sensitive,omitempty"`
	Source      string     `json:"source,omitempty"`
	Synonyms    []Synonyms `json:"synonyms,omitempty"`
}

type UserPrincipal

type UserPrincipal struct {
	// UserPrincipal example: User:<Username>
	UserPrincipal string `json:"userPrincipal"`

	// Actions allow or deny for this principal
	Actions []UserPrincipalAction `json:"actions"`
}

type UserPrincipalAction

type UserPrincipalAction struct {
	Scope        Scope  `json:"scope"`
	ResourceName string `json:"resourceName"`
	ResourceType string `json:"resourceType"`
	Operation    string `json:"operation"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL