otfranz

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2022 License: MIT Imports: 15 Imported by: 0

README

core-otfranz

Using franz-go as kafka backend of core.

The core/otkafka is based on kafka-go.

Documentation

Overview

Example
if os.Getenv("KAFKA_ADDR") == "" {
	fmt.Println("set KAFKA_ADDR to run this example")
	return
}
brokers := strings.Split(os.Getenv("KAFKA_ADDR"), ",")
conf := map[string]interface{}{
	"log": map[string]interface{}{
		"level": "none",
	},
	"kafka": map[string]interface{}{
		"default": map[string]interface{}{
			"seed_brokers":          brokers,
			"default_produce_topic": "example",
			"topics":                []string{"example"},
			"group":                 "test",
		},
	},
}
c := core.Default(core.WithConfigStack(confmap.Provider(conf, "."), nil))
c.Provide(otfranz.Providers())

c.Invoke(func(cli *kgo.Client) {
	record := &kgo.Record{Value: []byte("bar")}
	cli.Produce(context.Background(), record, nil)
})

c.Invoke(func(cli *kgo.Client) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	fetches := cli.PollFetches(ctx)
	if errs := fetches.Errors(); len(errs) > 0 {
		panic(errs)
	}
	iter := fetches.RecordIter()
	if iter.Done() {
		return
	}
	rec := iter.Next()
	fmt.Println(string(rec.Value))
})
Output:

bar

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func FranzLogAdapter

func FranzLogAdapter(lvl string, logger log.Logger) kgo.Logger

FranzLogAdapter return an log adapter bridging kitlog and kgo.Logger.

func Providers

func Providers(optionFunc ...ProvidersOptionFunc) di.Deps

Providers is a set of dependencies including Factory,Maker and exported configs.

Depends On:
	opentracing.Tracer
	contract.ConfigUnmarshaler
	log.Logger
	contract.Dispatcher
Provide:
	Factory
	Maker

Types

type Client

type Client struct {
	*kgo.Client
	// contains filtered or unexported fields
}

Client is a decorator around *kgo.Client that provides tracing capabilities.

func NewClient

func NewClient(client *kgo.Client, tracer opentracing.Tracer) *Client

NewClient takes a *kgo.Client and returns a decorated Client.

func (*Client) ProduceSyncWithTracing

func (c *Client) ProduceSyncWithTracing(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults

ProduceSyncWithTracing wrap ProduceSync method with tracing.

func (*Client) ProduceWithTracing

func (c *Client) ProduceWithTracing(ctx context.Context, r *kgo.Record, promise func(*kgo.Record, error))

ProduceWithTracing wrap Produce method with tracing.

type Config

type Config struct {
	ID                     string                                                  `json:"id" yaml:"id"` // client ID
	DialFn                 func(context.Context, string, string) (net.Conn, error) `json:"-" yaml:"-"`
	RequestTimeoutOverhead time.Duration                                           `json:"request_timeout_overhead" yaml:"request_timeout_overhead"`
	ConnIdleTimeout        time.Duration                                           `json:"conn_idle_timeout" yaml:"conn_idle_timeout"`

	SoftwareName    string `json:"software_name" yaml:"software_name"`       // KIP-511
	SoftwareVersion string `json:"software_version" yaml:"software_version"` // KIP-511

	Logger kgo.Logger `json:"-" yaml:"-"`

	SeedBrokers []string           `json:"seed_brokers" yaml:"seed_brokers" mapstructure:"seed_brokers"`
	MaxVersions *kversion.Versions `json:"-" yaml:"-"`
	MinVersions *kversion.Versions `json:"-" yaml:"-"`

	RetryBackoff func(int) time.Duration `json:"-" yaml:"-"`
	Retries      int                     `json:"retries" yaml:"retries"`
	RetryTimeout time.Duration           `json:"retry_timeout" yaml:"retry_timeout"`

	MaxBrokerWriteBytes int32 `json:"max_broker_write_bytes" yaml:"max_broker_write_bytes"`
	MaxBrokerReadBytes  int32 `json:"max_broker_read_bytes" yaml:"max_broker_read_bytes"`

	AllowAutoTopicCreation bool `json:"allow_auto_topic_creation" yaml:"allow_auto_topic_creation"`

	MetadataMaxAge time.Duration `json:"metadata_max_age" yaml:"metadata_max_age"`
	MetadataMinAge time.Duration `json:"metadata_min_age" yaml:"metadata_min_age"`

	Sasls []sasl.Mechanism `json:"-" yaml:"-"`

	Hooks []kgo.Hook `json:"-" yaml:"-"`

	TxnID               string                 `json:"txn_id" yaml:"txn_id"`
	TxnTimeout          time.Duration          `json:"txn_timeout" yaml:"txn_timeout"`
	Acks                int16                  `json:"acks" yaml:"acks"`
	DisableIdempotency  bool                   `json:"disable_idempotency" yaml:"disable_idempotency"`
	Compression         []kgo.CompressionCodec `json:"-" yaml:"-"` // order of preference
	DefaultProduceTopic string                 `json:"default_produce_topic" yaml:"default_produce_topic"`
	MaxRecordBatchBytes int32                  `json:"max_record_batch_bytes" yaml:"max_record_batch_bytes"`
	MaxBufferedRecords  int                    `json:"max_buffered_records" yaml:"max_buffered_records"`
	ProduceTimeout      time.Duration          `json:"produce_timeout" yaml:"produce_timeout"`
	RecordRetries       int                    `json:"record_retries" yaml:"record_retries"`
	Linger              time.Duration          `json:"linger" yaml:"linger"`
	RecordTimeout       time.Duration          `json:"record_timeout" yaml:"record_timeout"`
	ManualFlushing      bool                   `json:"manual_flushing" yaml:"manual_flushing"`
	Partitioner         kgo.Partitioner        `json:"-" yaml:"-"`
	StopOnDataLoss      bool                   `json:"stop_on_data_loss" yaml:"stop_on_data_loss"`
	OnDataLoss          func(string, int32)    `json:"-" yaml:"-"`

	MaxWait      time.Duration `json:"max_wait" yaml:"max_wait"`
	MinBytes     int32         `json:"min_bytes" yaml:"min_bytes"`
	MaxBytes     int32         `json:"max_bytes" yaml:"max_bytes"`
	MaxPartBytes int32         `json:"max_part_bytes" yaml:"max_part_bytes"`

	ResetOffset struct {
		At       int64 `json:"at" yaml:"at"`
		Relative int64 `json:"relative" yaml:"relative"`
		Epoch    int32 `json:"epoch" yaml:"epoch"`
	} `json:"reset_offset" yaml:"reset_offset"`
	IsolationLevel       int8                            `json:"isolation_level" yaml:"isolation_level"`
	KeepControl          bool                            `json:"keep_control" yaml:"keep_control"`
	Rack                 string                          `json:"rack" yaml:"rack"`
	MaxConcurrentFetches int                             `json:"max_concurrent_fetches" yaml:"max_concurrent_fetches"`
	DisableFetchSessions bool                            `json:"disable_fetch_sessions" yaml:"disable_fetch_sessions"`
	Topics               []string                        `json:"topics" yaml:"topics"` // topics to consume; if regex is true, values are compiled regular expressions
	Partitions           map[string]map[int32]kgo.Offset `json:"-" yaml:"-"`           // partitions to directly consume from
	Regex                bool                            `json:"regex" yaml:"regex"`

	Group              string                                                                          `json:"group" yaml:"group"`             // group we are in
	InstanceID         string                                                                          `json:"instance_id" yaml:"instance_id"` // optional group instance ID
	Balancers          []kgo.GroupBalancer                                                             `json:"-" yaml:"-"`                     // balancers we can use
	Protocol           string                                                                          `json:"protocol" yaml:"protocol"`       // "consumer" by default, expected to never be overridden
	SessionTimeout     time.Duration                                                                   `json:"session_timeout" yaml:"session_timeout"`
	RebalanceTimeout   time.Duration                                                                   `json:"rebalance_timeout" yaml:"rebalance_timeout"`
	HeartbeatInterval  time.Duration                                                                   `json:"heartbeat_interval" yaml:"heartbeat_interval"`
	RequireStable      bool                                                                            `json:"require_stable" yaml:"require_stable"`
	OnAssigned         func(context.Context, *kgo.Client, map[string][]int32)                          `json:"-" yaml:"-"`
	OnRevoked          func(context.Context, *kgo.Client, map[string][]int32)                          `json:"-" yaml:"-"`
	OnLost             func(context.Context, *kgo.Client, map[string][]int32)                          `json:"-" yaml:"-"`
	AutocommitDisable  bool                                                                            `json:"autocommit_disable" yaml:"autocommit_disable"` // true if autocommit was disabled or we are transactional
	AutocommitGreedy   bool                                                                            `json:"autocommit_greedy" yaml:"autocommit_greedy"`
	AutocommitMarks    bool                                                                            `json:"autocommit_marks" yaml:"autocommit_marks"`
	AutocommitInterval time.Duration                                                                   `json:"autocommit_interval" yaml:"autocommit_interval"`
	CommitCallback     func(*kgo.Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) `json:"-" yaml:"-"`

	// Options allows users to directly use the latest options without waiting for otfranz adaptation.
	Options []kgo.Opt `json:"-" yaml:"-"`
}

Config is a configuration object used to create new instances of *kgo.Client. The detailed configuration check should be referred to kgo.cfg.validate method.

type Factory

type Factory struct {
	*di.Factory
}

Factory is a *di.Factory that creates *kafka.Client.

Unlike other database providers, the kafka factories don't bundle a default kafka client. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.

func (Factory) Make

func (k Factory) Make(name string) (*kgo.Client, error)

Make returns a *kgo.Client under the provided configuration entry.

type Interceptor

type Interceptor func(name string, config *Config)

Interceptor is an interceptor that makes last minute change to a *Config during kgo.Client's creation

type Maker

type Maker interface {
	Make(name string) (*kgo.Client, error)
}

Maker models a Factory

type ProvidersOptionFunc

type ProvidersOptionFunc func(options *providersOption)

ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.

func WithInterceptor

func WithInterceptor(interceptor Interceptor) ProvidersOptionFunc

WithInterceptor instructs the Providers to accept the Interceptor so that users can change config during runtime. This can be useful when some dynamic computations on configs are required.

func WithReload

func WithReload(shouldReload bool) ProvidersOptionFunc

WithReload toggles whether the factory should reload cached instances upon OnReload event.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL