rubin

package
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Must added in v0.1.9

func Must[T any](obj T, err error) T

Must helper, see https://stackoverflow.com/a/73584801/4292075 a generic version of https://go.dev/src/text/template/helper.go?s=576:619 panics if the error is non-nil and intended for use in variable initializations

opts := Must[*Options](NewOptionsFromEnv())

func NewCloudEvent added in v0.1.4

func NewCloudEvent(sourceURI string, eventType string, data interface{}) (cloudevents.Event, error)

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client to communicate with the Kafka REST Endpoint Topic/Records API

func NewClient added in v0.1.4

func NewClient(options *Options) *Client

NewClient returns a new Rubin Client for http interaction

func NewClientFromEnv added in v0.1.9

func NewClientFromEnv() (*Client, error)

NewClientFromEnv Convenience function using default envconfig prefix for

opts, err := NewOptionsFromEnv()
client := NewClient(opts)

func (*Client) Produce

func (c *Client) Produce(ctx context.Context, request RecordRequest) (RecordResponse, error)

Produce produces a Kafka Record into the given Topic

func (*Client) String added in v0.1.4

func (c *Client) String() string

String representation of the client instance

type Options

type Options struct {
	ProducerTopicURL url.URL `` /* 145-byte string literal not displayed */
	// RestEndpoint for kafka rest proxy api
	RestEndpoint string `yaml:"rest_endpoint" default:"" required:"false" desc:"Kafka REST Proxy Endpoint"  split_words:"true"`
	// ClusterID of kafka cluster (which becomes part of the URL)
	ClusterID         string        `yaml:"cluster_id" default:"" required:"false" desc:"Kafka Cluster ID"  split_words:"true"`
	ProducerAPIKey    string        `yaml:"api_key" default:"" required:"false" desc:"Kafka API Key with Producer Privileges"  split_words:"true"`
	ProducerAPISecret string        `yaml:"api_secret" default:"" required:"false" desc:"Kafka API Secret with Producer Privileges"  split_words:"true"`
	HTTPTimeout       time.Duration `yaml:"http_timeout" default:"10s" required:"false" desc:"Timeout for HTTP Client" split_words:"true"`
	DumpMessages      bool          `yaml:"dump_messages" default:"false" required:"false" desc:"Print http request/response to stdout" split_words:"true"`
	LogLevel          string        `yaml:"log_level" default:"info" required:"false" desc:"Min LogLevel debug,info,warn,error" split_words:"true"`
}

Options keeps the settings to set up client connection.

func NewOptionsFromEnv added in v0.1.9

func NewOptionsFromEnv() (*Options, error)

NewOptionsFromEnv uses environment configuration with default prefix "kafka" to init Options

func NewOptionsFromEnvWithPrefix added in v0.1.9

func NewOptionsFromEnvWithPrefix(prefix string) (*Options, error)

NewOptionsFromEnvWithPrefix same as NewOptionsFromEnv but allows custom prefix

func (Options) BasicAuth added in v0.3.0

func (o Options) BasicAuth() string

BasicAuth returns the base64 encoded authentication string to be used as Auth Header for REST Proxy Http request

func (Options) RecordEndpoint added in v0.3.0

func (o Options) RecordEndpoint(topic string) string

RecordEndpoint returns the REST API endpoint for producing messages, basic on endpoint, cluster and topic If ProducerTopicURL is specified, it takes precedence

func (Options) String added in v0.1.4

func (o Options) String() string

String returns a String representation of the object (but hides sensitive information)

type RecordRequest added in v0.1.9

type RecordRequest struct {
	Topic   string
	Data    interface{}
	Key     string
	Headers map[string]string
	// AsCloudEvent section for CloudEvents specific attributes
	AsCloudEvent bool
	Source       string
	Type         string
	Subject      string
}

RecordRequest holds the data to build the Kafka Message Payload plus optional Key and Headers

type RecordResponse added in v0.1.9

type RecordResponse struct {
	ErrorCode int `json:"error_code"`
	kafkarestv3.ProduceResponse
}

RecordResponse Wrapper around the external RecordResponse

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL