kafka

package
v7.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: Apache-2.0 Imports: 14 Imported by: 1

Documentation

Overview

* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * * NOTE: The kafka client is used to publish events on Kafka in voltha * release 2.9. It is no longer used for inter voltha container * communication.

  • Copyright 2020-2024 Open Networking Foundation (ONF) and the ONF Contributors *
  • Licensed under the Apache License, Version 2.0 (the "License");
  • you may not use this file except in compliance with the License.
  • You may obtain a copy of the License at *
  • http://www.apache.org/licenses/LICENSE-2.0 *
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License.

* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

Index

Constants

View Source
const (
	PartitionConsumer = iota
	GroupCustomer     = iota
)
View Source
const (
	OffsetNewest = -1
	OffsetOldest = -2
)
View Source
const (
	GroupIdKey = "groupId"
	Offset     = "offset"
)
View Source
const (
	DefaultKafkaAddress             = "127.0.0.1:9092"
	DefaultGroupName                = "voltha"
	DefaultSleepOnError             = 1
	DefaultProducerFlushFrequency   = 10
	DefaultProducerFlushMessages    = 10
	DefaultProducerFlushMaxmessages = 100
	DefaultProducerReturnSuccess    = true
	DefaultProducerReturnErrors     = true
	DefaultProducerRetryMax         = 3
	DefaultProducerRetryBackoff     = time.Millisecond * 100
	DefaultConsumerMaxwait          = 100
	DefaultMaxProcessingTime        = 100
	DefaultConsumerType             = PartitionConsumer
	DefaultNumberPartitions         = 3
	DefaultNumberReplicas           = 1
	DefaultAutoCreateTopic          = false
	DefaultMetadataMaxRetry         = 3
	DefaultMaxRetries               = 3
	DefaultLivenessChannelInterval  = time.Second * 30
)
View Source
const (
	TopicSeparator = "_"
	DeviceIdLength = 24
)

Variables

This section is empty.

Functions

func GetDeviceIdFromTopic

func GetDeviceIdFromTopic(topic Topic) string

TODO: Remove and provide better may to get the device id GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as:

<any string> or <any string>_<deviceId>.  The device Id is 24 characters long.

func MonitorKafkaReadiness

func MonitorKafkaReadiness(ctx context.Context,
	kClient Client,
	liveProbeInterval, notLiveProbeInterval time.Duration,
	serviceName string)

* MonitorKafkaReadiness checks the liveliness and readiness of the kafka service and update the status in the probe.

func StartAndWaitUntilKafkaConnectionIsUp

func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error

WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the context times out.

Types

type Client

type Client interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context)
	CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error
	DeleteTopic(ctx context.Context, topic *Topic) error
	Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error)
	UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error
	SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time))
	Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error
	SendLiveness(ctx context.Context) error
	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
	EnableHealthinessChannel(ctx context.Context, enable bool) chan bool
	ListTopics(ctx context.Context) ([]string, error)
}

MsgClient represents the set of APIs a Kafka MsgClient must implement

type KVArg

type KVArg struct {
	Key   string
	Value interface{}
}

type RpcMType

type RpcMType int
const (
	RpcFormattingError RpcMType = iota
	RpcSent
	RpcReply
	RpcTimeout
	RpcTransportError
	RpcSystemClosing
)

type RpcResponse

type RpcResponse struct {
	MType RpcMType
	Err   error
	Reply *any.Any
}

func NewResponse

func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse

type SaramaClient

type SaramaClient struct {
	KafkaAddress string
	// contains filtered or unexported fields
}

SaramaClient represents the messaging proxy

func NewSaramaClient

func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient

func (*SaramaClient) CreateTopic

func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error

CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to ensure no two go routines are performing operations on the same topic

func (*SaramaClient) DeleteTopic

func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error

DeleteTopic removes a topic from the kafka Broker

func (*SaramaClient) EnableHealthinessChannel

func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool

Enable the Healthiness monitor channel. This channel will report "false" if the kafka consumers die, or some other problem occurs which is catastrophic that would require re-creating the client.

func (*SaramaClient) EnableLivenessChannel

func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool

Enable the liveness monitor channel. This channel will report a "true" or "false" on every publish, which indicates whether or not the channel is still live. This channel is then picked up by the service (i.e. rw_core / ro_core) to update readiness status and/or take other actions.

func (*SaramaClient) ListTopics

func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error)

func (*SaramaClient) Send

func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error

send formats and sends the request onto the kafka messaging bus.

func (*SaramaClient) SendLiveness

func (sc *SaramaClient) SendLiveness(ctx context.Context) error

send an empty message on the liveness channel to check whether connectivity has been restored.

func (*SaramaClient) Start

func (sc *SaramaClient) Start(ctx context.Context) error

func (*SaramaClient) Stop

func (sc *SaramaClient) Stop(ctx context.Context)

func (*SaramaClient) Subscribe

func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error)

Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive messages from that topic

func (*SaramaClient) SubscribeForMetadata

func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time))

func (*SaramaClient) UnSubscribe

func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error

UnSubscribe unsubscribe a consumer from a given topic

type SaramaClientOption

type SaramaClientOption func(*SaramaClient)

func Address

func Address(address string) SaramaClientOption

func AutoCreateTopic

func AutoCreateTopic(opt bool) SaramaClientOption

func ConsumerGroupName

func ConsumerGroupName(name string) SaramaClientOption

func ConsumerGroupPrefix

func ConsumerGroupPrefix(prefix string) SaramaClientOption

func ConsumerMaxWait

func ConsumerMaxWait(wait int) SaramaClientOption

func ConsumerType

func ConsumerType(consumer int) SaramaClientOption

func LivenessChannelInterval

func LivenessChannelInterval(opt time.Duration) SaramaClientOption

func MaxProcessingTime

func MaxProcessingTime(pTime int) SaramaClientOption

func MetadatMaxRetries

func MetadatMaxRetries(retry int) SaramaClientOption

func NumPartitions

func NumPartitions(number int) SaramaClientOption

func NumReplicas

func NumReplicas(number int) SaramaClientOption

func ProducerFlushFrequency

func ProducerFlushFrequency(frequency int) SaramaClientOption

func ProducerFlushMaxMessages

func ProducerFlushMaxMessages(num int) SaramaClientOption

func ProducerFlushMessages

func ProducerFlushMessages(num int) SaramaClientOption

func ProducerMaxRetries

func ProducerMaxRetries(num int) SaramaClientOption

func ProducerRetryBackoff

func ProducerRetryBackoff(duration time.Duration) SaramaClientOption

func ProducerReturnOnErrors

func ProducerReturnOnErrors(opt bool) SaramaClientOption

func ProducerReturnOnSuccess

func ProducerReturnOnSuccess(opt bool) SaramaClientOption

type Topic

type Topic struct {
	// The name of the topic. It must start with a letter,
	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
	// signs (`%`).
	Name string
}

A Topic definition - may be augmented with additional attributes eventually

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL