kafka

package
v0.0.131 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.

*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.

*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.

*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.

*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.

*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.

*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.

Index

Constants

This section is empty.

Variables

View Source
var File_pkg_mq_kafka_kafka_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type CompletedConfig

type CompletedConfig struct {
	// contains filtered or unexported fields
}

func (CompletedConfig) New

func (c CompletedConfig) New(ctx context.Context) (*MQ, error)

type Config

type Config struct {
	Proto Kafka
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(options ...ConfigOption) *Config

func (*Config) ApplyOptions

func (o *Config) ApplyOptions(options ...ConfigOption) *Config

func (*Config) Complete

func (c *Config) Complete() CompletedConfig

Complete set default ServerRunOptions.

type ConfigOption

type ConfigOption interface {
	// contains filtered or unexported methods
}

A ConfigOption sets options.

func WithViper

func WithViper(v *viper.Viper) ConfigOption

type ConfigOptionFunc

type ConfigOptionFunc func(*Config)

ConfigOptionFunc wraps a function that modifies Client into an implementation of the ConfigOption interface.

type Consumer

type Consumer struct {
	*kafka.Reader
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(config kafka.ReaderConfig) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) ReadStream

func (c *Consumer) ReadStream(ctx context.Context) <-chan mq_.Message

func (*Consumer) Topic added in v0.0.121

func (c *Consumer) Topic() string

type ConsumerOptions

type ConsumerOptions struct {
	// contains filtered or unexported fields
}

type EmptyConfigOption

type EmptyConfigOption struct{}

EmptyConfigOption does not alter the configuration. It can be embedded in another structure to build custom options.

This API is EXPERIMENTAL.

type EmptyMQOption

type EmptyMQOption struct{}

EmptyMQUrlOption does not alter the MQuration. It can be embedded in another structure to build custom options.

This API is EXPERIMENTAL.

type Kafka

type Kafka struct {
	Enabled             bool               `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"`
	Brokers             []string           `protobuf:"bytes,2,rep,name=brokers,proto3" json:"brokers,omitempty"`
	ApiVersionRequest   bool               `protobuf:"varint,3,opt,name=api_version_request,json=apiVersionRequest,proto3" json:"api_version_request,omitempty"`
	ReconnectBackoff    *duration.Duration `protobuf:"bytes,4,opt,name=reconnect_backoff,json=reconnectBackoff,proto3" json:"reconnect_backoff,omitempty"`
	ReconnectBackoffMax *duration.Duration `protobuf:"bytes,5,opt,name=reconnect_backoff_max,json=reconnectBackoffMax,proto3" json:"reconnect_backoff_max,omitempty"`
	MaxWaitDuration     *duration.Duration `protobuf:"bytes,6,opt,name=max_wait_duration,json=maxWaitDuration,proto3" json:"max_wait_duration,omitempty"`
	FailAfterDuration   *duration.Duration `protobuf:"bytes,7,opt,name=fail_after_duration,json=failAfterDuration,proto3" json:"fail_after_duration,omitempty"`
	// contains filtered or unexported fields
}

https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

func (*Kafka) Descriptor deprecated

func (*Kafka) Descriptor() ([]byte, []int)

Deprecated: Use Kafka.ProtoReflect.Descriptor instead.

func (*Kafka) GetApiVersionRequest

func (x *Kafka) GetApiVersionRequest() bool

func (*Kafka) GetBrokers

func (x *Kafka) GetBrokers() []string

func (*Kafka) GetEnabled

func (x *Kafka) GetEnabled() bool

func (*Kafka) GetFailAfterDuration

func (x *Kafka) GetFailAfterDuration() *duration.Duration

func (*Kafka) GetMaxWaitDuration

func (x *Kafka) GetMaxWaitDuration() *duration.Duration

func (*Kafka) GetReconnectBackoff

func (x *Kafka) GetReconnectBackoff() *duration.Duration

func (*Kafka) GetReconnectBackoffMax

func (x *Kafka) GetReconnectBackoffMax() *duration.Duration

func (*Kafka) ProtoMessage

func (*Kafka) ProtoMessage()

func (*Kafka) ProtoReflect

func (x *Kafka) ProtoReflect() protoreflect.Message

func (*Kafka) Reset

func (x *Kafka) Reset()

func (*Kafka) String

func (x *Kafka) String() string

type KafkaMessage

type KafkaMessage struct {
	Err error
	Msg *kafka.Message
}

func (KafkaMessage) Error added in v0.0.121

func (m KafkaMessage) Error() error

func (KafkaMessage) Key added in v0.0.121

func (m KafkaMessage) Key() []byte

func (KafkaMessage) Value added in v0.0.121

func (m KafkaMessage) Value() []byte

type Kafka_Consumer

type Kafka_Consumer struct {
	// contains filtered or unexported fields
}

func (*Kafka_Consumer) Descriptor deprecated

func (*Kafka_Consumer) Descriptor() ([]byte, []int)

Deprecated: Use Kafka_Consumer.ProtoReflect.Descriptor instead.

func (*Kafka_Consumer) ProtoMessage

func (*Kafka_Consumer) ProtoMessage()

func (*Kafka_Consumer) ProtoReflect

func (x *Kafka_Consumer) ProtoReflect() protoreflect.Message

func (*Kafka_Consumer) Reset

func (x *Kafka_Consumer) Reset()

func (*Kafka_Consumer) String

func (x *Kafka_Consumer) String() string

type Kafka_Producer

type Kafka_Producer struct {
	// contains filtered or unexported fields
}

func (*Kafka_Producer) Descriptor deprecated

func (*Kafka_Producer) Descriptor() ([]byte, []int)

Deprecated: Use Kafka_Producer.ProtoReflect.Descriptor instead.

func (*Kafka_Producer) ProtoMessage

func (*Kafka_Producer) ProtoMessage()

func (*Kafka_Producer) ProtoReflect

func (x *Kafka_Producer) ProtoReflect() protoreflect.Message

func (*Kafka_Producer) Reset

func (x *Kafka_Producer) Reset()

func (*Kafka_Producer) String

func (x *Kafka_Producer) String() string

type MQ

type MQ struct {
	*kafka.Conn

	Conf MQConfig
	// contains filtered or unexported fields
}

func NewMQ

func NewMQ(conf MQConfig, opts ...MQOption) *MQ

func (*MQ) ApplyOptions

func (o *MQ) ApplyOptions(options ...MQOption) *MQ

func (*MQ) AsConsumers

func (q *MQ) AsConsumers(ctx context.Context, topics ...string) (consumers []*Consumer, err error)

func (*MQ) AsProducers

func (q *MQ) AsProducers(ctx context.Context, topics ...string) (producers []*Producer, err error)

func (*MQ) Close

func (q *MQ) Close()

func (*MQ) GetConsumer

func (q *MQ) GetConsumer(topic string) (*Consumer, error)

func (*MQ) GetProducer

func (q *MQ) GetProducer(topic string) (*Producer, error)

func (*MQ) InstallMQ

func (q *MQ) InstallMQ(
	ctx context.Context,
	maxWaitInterval time.Duration,
	failAfter time.Duration,
) (*MQ, error)

func (*MQ) ReadStream

func (q *MQ) ReadStream(ctx context.Context, topic string) <-chan mq_.Message

func (*MQ) Send

func (q *MQ) Send(ctx context.Context, topic string, msgs ...kafka.Message) error

type MQConfig

type MQConfig struct {
	Brokers []string
}

type MQOption

type MQOption interface {
	// contains filtered or unexported methods
}

A MQOption sets options.

func WithConsumerGroupID

func WithConsumerGroupID(groupID string) MQOption

consumer options

func WithConsumerMaxBytes

func WithConsumerMaxBytes(maxBytes int) MQOption

func WithConsumerMaxWait

func WithConsumerMaxWait(maxWait time.Duration) MQOption

func WithConsumerMinBytes

func WithConsumerMinBytes(minBytes int) MQOption

func WithConsumerPartition

func WithConsumerPartition(partition int) MQOption

func WithDialTimeout

func WithDialTimeout(dialTimeout time.Duration) MQOption

base options

func WithProducerBatchBytes

func WithProducerBatchBytes(batchBytes int) MQOption

func WithProducerBatchSize

func WithProducerBatchSize(batchSize int) MQOption

producer options

func WithProducerBatchTimeout

func WithProducerBatchTimeout(batchTimeout time.Duration) MQOption

type MQOptionFunc

type MQOptionFunc func(*MQ)

MQOptionFunc wraps a function that modifies MQ into an implementation of the MQOption interface.

type MQOptions

type MQOptions struct {
	SaslUsername string
	SaslPassword string
	// contains filtered or unexported fields
}

type Producer

type Producer struct {
	*kafka.Writer
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(config kafka.WriterConfig) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, msgs ...kafka.Message) error

type ProducerOptions

type ProducerOptions struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL