kafkadriver

package
v0.0.0-...-367b6cc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2020 License: MPL-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const Key = "kafkaDriver"

Variables

View Source
var (
	IdentityFieldToMessageKey_name = map[int32]string{
		0: "TENANT_ID",
		1: "RANDOM",
	}
	IdentityFieldToMessageKey_value = map[string]int32{
		"TENANT_ID": 0,
		"RANDOM":    1,
	}
)

Enum value maps for IdentityFieldToMessageKey.

View Source
var (
	DefaultMessageKey = StandardMessageKey
)
View Source
var File_adapters_kafka_driver_kafka_driver_proto protoreflect.FileDescriptor

Functions

func SetContextEventMessageKey

func SetContextEventMessageKey(ctx context.Context, value interface{}, key ...EventMessageKey) context.Context

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

func New

func New() *Adapter

func (*Adapter) Attach

func (a *Adapter) Attach()

func (*Adapter) Detach

func (a *Adapter) Detach()

func (*Adapter) EventContext

func (a *Adapter) EventContext(ctx context.Context) (context.Context, error)

func (*Adapter) GetConsumer

func (a *Adapter) GetConsumer(key string) *Consumer

func (*Adapter) GetProducer

func (a *Adapter) GetProducer(key string) *Producer

func (*Adapter) Initialize

func (a *Adapter) Initialize()

func (*Adapter) Register

func (a *Adapter) Register() (string, interface{})

type Configuration

type Configuration struct {
	BootstrapServers                 []string                           `protobuf:"bytes,1,rep,name=bootstrap_servers,json=bootstrapServers,proto3" json:"bootstrap_servers,omitempty"`
	MaxMessageBytes                  int64                              `protobuf:"varint,2,opt,name=max_message_bytes,json=maxMessageBytes,proto3" json:"max_message_bytes,omitempty"`
	MaxMessageCopyBytes              int64                              `protobuf:"varint,3,opt,name=max_message_copy_bytes,json=maxMessageCopyBytes,proto3" json:"max_message_copy_bytes,omitempty"`
	ReceiveMessageMaxBytes           int64                              `` /* 132-byte string literal not displayed */
	MaxInFlight                      int64                              `protobuf:"varint,5,opt,name=max_in_flight,json=maxInFlight,proto3" json:"max_in_flight,omitempty"`
	Debug                            string                             `protobuf:"bytes,6,opt,name=debug,proto3" json:"debug,omitempty"`
	ReconnectBackoffMs               int64                              `protobuf:"varint,7,opt,name=reconnect_backoff_ms,json=reconnectBackoffMs,proto3" json:"reconnect_backoff_ms,omitempty"`
	ReconnectBackoffMaxMs            int64                              `` /* 129-byte string literal not displayed */
	StatisticsIntervalMs             int64                              `protobuf:"varint,9,opt,name=statistics_interval_ms,json=statisticsIntervalMs,proto3" json:"statistics_interval_ms,omitempty"`
	ApiVersionRequest                bool                               `protobuf:"varint,10,opt,name=api_version_request,json=apiVersionRequest,proto3" json:"api_version_request,omitempty"`
	ApiVersionFallbackMs             int64                              `` /* 127-byte string literal not displayed */
	BrokerVersionFallback            string                             `` /* 127-byte string literal not displayed */
	SecurityProtocol                 string                             `protobuf:"bytes,13,opt,name=security_protocol,json=securityProtocol,proto3" json:"security_protocol,omitempty"`
	SslKeyLocation                   string                             `protobuf:"bytes,14,opt,name=ssl_key_location,json=sslKeyLocation,proto3" json:"ssl_key_location,omitempty"`
	SslCertificateLocation           string                             `` /* 130-byte string literal not displayed */
	SslCaLocation                    string                             `protobuf:"bytes,16,opt,name=ssl_ca_location,json=sslCaLocation,proto3" json:"ssl_ca_location,omitempty"`
	SslCrlLocation                   string                             `protobuf:"bytes,17,opt,name=ssl_crl_location,json=sslCrlLocation,proto3" json:"ssl_crl_location,omitempty"`
	EnableSslCertificateVerification bool                               `` /* 163-byte string literal not displayed */
	Consumers                        map[string]*Configuration_Consumer `` /* 160-byte string literal not displayed */
	Producers                        map[string]*Configuration_Producer `` /* 160-byte string literal not displayed */
	MessageKeyFrom                   IdentityFieldToMessageKey          `` /* 165-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*Configuration) Descriptor deprecated

func (*Configuration) Descriptor() ([]byte, []int)

Deprecated: Use Configuration.ProtoReflect.Descriptor instead.

func (*Configuration) GetApiVersionFallbackMs

func (x *Configuration) GetApiVersionFallbackMs() int64

func (*Configuration) GetApiVersionRequest

func (x *Configuration) GetApiVersionRequest() bool

func (*Configuration) GetBootstrapServers

func (x *Configuration) GetBootstrapServers() []string

func (*Configuration) GetBrokerVersionFallback

func (x *Configuration) GetBrokerVersionFallback() string

func (*Configuration) GetConsumers

func (x *Configuration) GetConsumers() map[string]*Configuration_Consumer

func (*Configuration) GetDebug

func (x *Configuration) GetDebug() string

func (*Configuration) GetEnableSslCertificateVerification

func (x *Configuration) GetEnableSslCertificateVerification() bool

func (*Configuration) GetMaxInFlight

func (x *Configuration) GetMaxInFlight() int64

func (*Configuration) GetMaxMessageBytes

func (x *Configuration) GetMaxMessageBytes() int64

func (*Configuration) GetMaxMessageCopyBytes

func (x *Configuration) GetMaxMessageCopyBytes() int64

func (*Configuration) GetMessageKeyFrom

func (x *Configuration) GetMessageKeyFrom() IdentityFieldToMessageKey

func (*Configuration) GetProducers

func (x *Configuration) GetProducers() map[string]*Configuration_Producer

func (*Configuration) GetReceiveMessageMaxBytes

func (x *Configuration) GetReceiveMessageMaxBytes() int64

func (*Configuration) GetReconnectBackoffMaxMs

func (x *Configuration) GetReconnectBackoffMaxMs() int64

func (*Configuration) GetReconnectBackoffMs

func (x *Configuration) GetReconnectBackoffMs() int64

func (*Configuration) GetSecurityProtocol

func (x *Configuration) GetSecurityProtocol() string

func (*Configuration) GetSslCaLocation

func (x *Configuration) GetSslCaLocation() string

func (*Configuration) GetSslCertificateLocation

func (x *Configuration) GetSslCertificateLocation() string

func (*Configuration) GetSslCrlLocation

func (x *Configuration) GetSslCrlLocation() string

func (*Configuration) GetSslKeyLocation

func (x *Configuration) GetSslKeyLocation() string

func (*Configuration) GetStatisticsIntervalMs

func (x *Configuration) GetStatisticsIntervalMs() int64

func (*Configuration) ProtoMessage

func (*Configuration) ProtoMessage()

func (*Configuration) ProtoReflect

func (x *Configuration) ProtoReflect() protoreflect.Message

func (*Configuration) Reset

func (x *Configuration) Reset()

func (*Configuration) String

func (x *Configuration) String() string

type Configuration_Consumer

type Configuration_Consumer struct {
	SessionTimeoutMs        int64    `protobuf:"varint,1,opt,name=session_timeout_ms,json=sessionTimeoutMs,proto3" json:"session_timeout_ms,omitempty"`
	MaxPollIntervalMs       int64    `protobuf:"varint,2,opt,name=max_poll_interval_ms,json=maxPollIntervalMs,proto3" json:"max_poll_interval_ms,omitempty"`
	QueuedMinMessages       int64    `protobuf:"varint,3,opt,name=queued_min_messages,json=queuedMinMessages,proto3" json:"queued_min_messages,omitempty"`
	QueuedMaxMessagesKbytes int64    `` /* 135-byte string literal not displayed */
	FetchMessageMaxBytes    int64    `` /* 126-byte string literal not displayed */
	FetchErrorBackoffMs     int64    `protobuf:"varint,6,opt,name=fetch_error_backoff_ms,json=fetchErrorBackoffMs,proto3" json:"fetch_error_backoff_ms,omitempty"`
	ReadTimeoutMs           int64    `protobuf:"varint,7,opt,name=read_timeout_ms,json=readTimeoutMs,proto3" json:"read_timeout_ms,omitempty"`
	Topics                  []string `protobuf:"bytes,8,rep,name=topics,proto3" json:"topics,omitempty"`
	// contains filtered or unexported fields
}

func (*Configuration_Consumer) Descriptor deprecated

func (*Configuration_Consumer) Descriptor() ([]byte, []int)

Deprecated: Use Configuration_Consumer.ProtoReflect.Descriptor instead.

func (*Configuration_Consumer) GetFetchErrorBackoffMs

func (x *Configuration_Consumer) GetFetchErrorBackoffMs() int64

func (*Configuration_Consumer) GetFetchMessageMaxBytes

func (x *Configuration_Consumer) GetFetchMessageMaxBytes() int64

func (*Configuration_Consumer) GetMaxPollIntervalMs

func (x *Configuration_Consumer) GetMaxPollIntervalMs() int64

func (*Configuration_Consumer) GetQueuedMaxMessagesKbytes

func (x *Configuration_Consumer) GetQueuedMaxMessagesKbytes() int64

func (*Configuration_Consumer) GetQueuedMinMessages

func (x *Configuration_Consumer) GetQueuedMinMessages() int64

func (*Configuration_Consumer) GetReadTimeoutMs

func (x *Configuration_Consumer) GetReadTimeoutMs() int64

func (*Configuration_Consumer) GetSessionTimeoutMs

func (x *Configuration_Consumer) GetSessionTimeoutMs() int64

func (*Configuration_Consumer) GetTopics

func (x *Configuration_Consumer) GetTopics() []string

func (*Configuration_Consumer) ProtoMessage

func (*Configuration_Consumer) ProtoMessage()

func (*Configuration_Consumer) ProtoReflect

func (x *Configuration_Consumer) ProtoReflect() protoreflect.Message

func (*Configuration_Consumer) Reset

func (x *Configuration_Consumer) Reset()

func (*Configuration_Consumer) String

func (x *Configuration_Consumer) String() string

type Configuration_Producer

type Configuration_Producer struct {
	QueueBufferingMaxMs       int64  `protobuf:"varint,1,opt,name=queue_buffering_max_ms,json=queueBufferingMaxMs,proto3" json:"queue_buffering_max_ms,omitempty"`
	RetryBackoffMs            int64  `protobuf:"varint,2,opt,name=retry_backoff_ms,json=retryBackoffMs,proto3" json:"retry_backoff_ms,omitempty"`
	BatchNumMessages          int64  `protobuf:"varint,3,opt,name=batch_num_messages,json=batchNumMessages,proto3" json:"batch_num_messages,omitempty"`
	MessageTimeoutMs          int64  `protobuf:"varint,4,opt,name=message_timeout_ms,json=messageTimeoutMs,proto3" json:"message_timeout_ms,omitempty"`
	TransactionTimeoutMs      int64  `protobuf:"varint,5,opt,name=transaction_timeout_ms,json=transactionTimeoutMs,proto3" json:"transaction_timeout_ms,omitempty"`
	QueueBufferingMaxMessages int64  `` /* 141-byte string literal not displayed */
	QueueBufferingMaxKbytes   int64  `` /* 135-byte string literal not displayed */
	MessageSendMaxRetries     int64  `` /* 129-byte string literal not displayed */
	IsTransactional           bool   `protobuf:"varint,9,opt,name=is_transactional,json=isTransactional,proto3" json:"is_transactional,omitempty"`
	IsIdempotent              bool   `protobuf:"varint,10,opt,name=is_idempotent,json=isIdempotent,proto3" json:"is_idempotent,omitempty"`
	TransactionalId           string `protobuf:"bytes,11,opt,name=transactional_id,json=transactionalId,proto3" json:"transactional_id,omitempty"`
	// contains filtered or unexported fields
}

func (*Configuration_Producer) Descriptor deprecated

func (*Configuration_Producer) Descriptor() ([]byte, []int)

Deprecated: Use Configuration_Producer.ProtoReflect.Descriptor instead.

func (*Configuration_Producer) GetBatchNumMessages

func (x *Configuration_Producer) GetBatchNumMessages() int64

func (*Configuration_Producer) GetIsIdempotent

func (x *Configuration_Producer) GetIsIdempotent() bool

func (*Configuration_Producer) GetIsTransactional

func (x *Configuration_Producer) GetIsTransactional() bool

func (*Configuration_Producer) GetMessageSendMaxRetries

func (x *Configuration_Producer) GetMessageSendMaxRetries() int64

func (*Configuration_Producer) GetMessageTimeoutMs

func (x *Configuration_Producer) GetMessageTimeoutMs() int64

func (*Configuration_Producer) GetQueueBufferingMaxKbytes

func (x *Configuration_Producer) GetQueueBufferingMaxKbytes() int64

func (*Configuration_Producer) GetQueueBufferingMaxMessages

func (x *Configuration_Producer) GetQueueBufferingMaxMessages() int64

func (*Configuration_Producer) GetQueueBufferingMaxMs

func (x *Configuration_Producer) GetQueueBufferingMaxMs() int64

func (*Configuration_Producer) GetRetryBackoffMs

func (x *Configuration_Producer) GetRetryBackoffMs() int64

func (*Configuration_Producer) GetTransactionTimeoutMs

func (x *Configuration_Producer) GetTransactionTimeoutMs() int64

func (*Configuration_Producer) GetTransactionalId

func (x *Configuration_Producer) GetTransactionalId() string

func (*Configuration_Producer) ProtoMessage

func (*Configuration_Producer) ProtoMessage()

func (*Configuration_Producer) ProtoReflect

func (x *Configuration_Producer) ProtoReflect() protoreflect.Message

func (*Configuration_Producer) Reset

func (x *Configuration_Producer) Reset()

func (*Configuration_Producer) String

func (x *Configuration_Producer) String() string

type Consumer

type Consumer struct {
	*kafka.Consumer

	MessagesCh chan *WrappedEvent
	// contains filtered or unexported fields
}

func (*Consumer) MustCommitEvent

func (c *Consumer) MustCommitEvent(event *WrappedEvent)

func (*Consumer) Start

func (c *Consumer) Start()

func (*Consumer) Stop

func (c *Consumer) Stop()

type Event

type Event struct {
	Name       string              `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Data       []byte              `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	MessageKey string              `protobuf:"bytes,3,opt,name=message_key,json=messageKey,proto3" json:"message_key,omitempty"`
	Topic      string              `protobuf:"bytes,4,opt,name=topic,proto3" json:"topic,omitempty"`
	Claim      *auth.ClaimEnvelope `protobuf:"bytes,5,opt,name=claim,proto3" json:"claim,omitempty"`
	// contains filtered or unexported fields
}

func (*Event) Descriptor deprecated

func (*Event) Descriptor() ([]byte, []int)

Deprecated: Use Event.ProtoReflect.Descriptor instead.

func (*Event) GetClaim

func (x *Event) GetClaim() *auth.ClaimEnvelope

func (*Event) GetData

func (x *Event) GetData() []byte

func (*Event) GetMessageKey

func (x *Event) GetMessageKey() string

func (*Event) GetName

func (x *Event) GetName() string

func (*Event) GetTopic

func (x *Event) GetTopic() string

func (*Event) Log

func (x *Event) Log(ev *zerolog.Event) *zerolog.Event

func (*Event) MarshalProto

func (x *Event) MarshalProto(msg protoreflect.ProtoMessage)

func (*Event) MessageKeyFromContext

func (x *Event) MessageKeyFromContext(ctx context.Context, key ...EventMessageKey) *Event

func (*Event) ProtoMessage

func (*Event) ProtoMessage()

func (*Event) ProtoReflect

func (x *Event) ProtoReflect() protoreflect.Message

func (*Event) Reset

func (x *Event) Reset()

func (*Event) String

func (x *Event) String() string

func (*Event) UnmarshalProto

func (x *Event) UnmarshalProto(msg protoreflect.ProtoMessage)

type EventMessageKey

type EventMessageKey int
const (
	StandardMessageKey EventMessageKey = -1
)

type IdentityFieldToMessageKey

type IdentityFieldToMessageKey int32
const (
	IdentityFieldToMessageKey_TENANT_ID IdentityFieldToMessageKey = 0
	IdentityFieldToMessageKey_RANDOM    IdentityFieldToMessageKey = 1
)

func (IdentityFieldToMessageKey) Descriptor

func (IdentityFieldToMessageKey) Enum

func (IdentityFieldToMessageKey) EnumDescriptor deprecated

func (IdentityFieldToMessageKey) EnumDescriptor() ([]byte, []int)

Deprecated: Use IdentityFieldToMessageKey.Descriptor instead.

func (IdentityFieldToMessageKey) Number

func (IdentityFieldToMessageKey) String

func (x IdentityFieldToMessageKey) String() string

func (IdentityFieldToMessageKey) Type

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

func (*Producer) Emit

func (p *Producer) Emit(event *Event)

func (*Producer) Start

func (p *Producer) Start()

type WrappedEvent

type WrappedEvent struct {
	Event
	Raw *kafka.Message
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL