Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Must ¶ added in v0.1.9
Must helper, see https://stackoverflow.com/a/73584801/4292075 a generic version of https://go.dev/src/text/template/helper.go?s=576:619 panics if the error is non-nil and intended for use in variable initializations
opts := Must[*Options](NewOptionsFromEnv())
func NewCloudEvent ¶ added in v0.1.4
func NewCloudEvent(sourceURI string, eventType string, data interface{}) (cloudevents.Event, error)
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client to communicate with the Kafka REST Endpoint Topic/Records API
func NewClientFromEnv ¶ added in v0.1.9
NewClientFromEnv Convenience function using default envconfig prefix for
opts, err := NewOptionsFromEnv() client := NewClient(opts)
func (*Client) Produce ¶
func (c *Client) Produce(ctx context.Context, request RecordRequest) (RecordResponse, error)
Produce produces a Kafka Record into the given Topic
type Options ¶
type Options struct { ProducerTopicURL url.URL `` /* 145-byte string literal not displayed */ // RestEndpoint for kafka rest proxy api RestEndpoint string `yaml:"rest_endpoint" default:"" required:"false" desc:"Kafka REST Proxy Endpoint" split_words:"true"` // ClusterID of kafka cluster (which becomes part of the URL) ClusterID string `yaml:"cluster_id" default:"" required:"false" desc:"Kafka Cluster ID" split_words:"true"` ProducerAPIKey string `yaml:"api_key" default:"" required:"false" desc:"Kafka API Key with Producer Privileges" split_words:"true"` ProducerAPISecret string `yaml:"api_secret" default:"" required:"false" desc:"Kafka API Secret with Producer Privileges" split_words:"true"` HTTPTimeout time.Duration `yaml:"http_timeout" default:"10s" required:"false" desc:"Timeout for HTTP Client" split_words:"true"` DumpMessages bool `yaml:"dump_messages" default:"false" required:"false" desc:"Print http request/response to stdout" split_words:"true"` LogLevel string `yaml:"log_level" default:"info" required:"false" desc:"Min LogLevel debug,info,warn,error" split_words:"true"` }
Options keeps the settings to set up client connection.
func NewOptionsFromEnv ¶ added in v0.1.9
NewOptionsFromEnv uses environment configuration with default prefix "kafka" to init Options
func NewOptionsFromEnvWithPrefix ¶ added in v0.1.9
NewOptionsFromEnvWithPrefix same as NewOptionsFromEnv but allows custom prefix
func (Options) BasicAuth ¶ added in v0.3.0
BasicAuth returns the base64 encoded authentication string to be used as Auth Header for REST Proxy Http request
func (Options) RecordEndpoint ¶ added in v0.3.0
RecordEndpoint returns the REST API endpoint for producing messages, basic on endpoint, cluster and topic If ProducerTopicURL is specified, it takes precedence
type RecordRequest ¶ added in v0.1.9
type RecordRequest struct { Topic string Data interface{} Key string Headers map[string]string // AsCloudEvent section for CloudEvents specific attributes AsCloudEvent bool Source string Type string Subject string }
RecordRequest holds the data to build the Kafka Message Payload plus optional Key and Headers
type RecordResponse ¶ added in v0.1.9
type RecordResponse struct { ErrorCode int `json:"error_code"` kafkarestv3.ProduceResponse }
RecordResponse Wrapper around the external RecordResponse