client

package module
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 5, 2023 License: MIT Imports: 18 Imported by: 3

README

Go Reference GitHub go.mod Go version GitHub release (latest by date) Go Report Card Actions Status

HomeDashboard Message Client

Message client receives messages from given Kafka topics. Consumers, e.g. a renderer, can use this client to get latest or all messages.

Config

By config you can define Kafka servers and a queue to topic forwarding. If no topic is specified given queue name is taken.

kafka:
  servers: localhost
  poll_sleep: 1s
  fetch_timeout: 1s
  subsriptions:
    - topic: data.
      limit: "5"
      datasource: exchangerate
fetch_timeout

Timeout to fetch new messages. Client will pause message consumption if there're no new messages. Default is 3 seconds.

poll_sleep

If there're no new message client will pause message consumption for given time. Default is 10 minutes.

subsriptions

Defines topics you want to listen to. For HomeDashboard topics retrieved messages are converted to events. Target event will be defined by datasource. For available datasources see Datasources. Number of which will be hold in a stack can be defined by limit param, default is 1 message.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoNewEvents = errors.New("No new events available")

Functions

This section is empty.

Types

type Client

type Client interface {

	// Runable core interface. Used to run message fetch n background.
	core.Runable

	// Latest willreturn latest element for given datasource.
	Latest(core.DataSource) (proto.Message, error)

	// All returns all available message for given datasource.
	All(core.DataSource) ([]proto.Message, error)

	// Observe returns a channel clients can subsribe to get new messages.
	Observe(*[]core.DataSource) <-chan proto.Message

	// IsReady will ensure at least one event in local storage. In case datasource
	// filters are set it will ensure that there's one event for each datasource.
	IsReady() bool

	// Metrics returns stats of consumed topics and local message count.
	Metrics() []metrics.Measurement
}

func New

func New(conf config.Config, logger log.Logger) Client

New returns a new client which subsribes to Kafka topics and provides received messages for clients.

type MessageClient

type MessageClient struct {
	// contains filtered or unexported fields
}

func (*MessageClient) All

func (client *MessageClient) All(datasource core.DataSource) ([]proto.Message, error)

All returns all available events for passed datasource.

func (*MessageClient) IsReady added in v1.1.14

func (client *MessageClient) IsReady() bool

IsReady will ensure at least one event in local storage. In case datasource filters are set it will ensure that there's one event for each datasource.

func (*MessageClient) Latest

func (client *MessageClient) Latest(datasource core.DataSource) (proto.Message, error)

Latest returns latest event in local queue.

func (*MessageClient) Metrics added in v1.2.0

func (client *MessageClient) Metrics() []metrics.Measurement

Metrics returns count of messages for each available datasource.

func (*MessageClient) Observe added in v1.1.0

func (client *MessageClient) Observe(filter *[]core.DataSource) <-chan proto.Message

Observe returns a channel you can use to listen for new messages. Filter can be passed if you're interested in a specific type of events. If channel sappacitiy is reached new events will be discarded. Channel capacity can be defined by config key "kafka.channel_size".

func (*MessageClient) Run

func (client *MessageClient) Run(ctx context.Context, waitGroup *sync.WaitGroup) error

Run starts message observing. There's no contimuous subsription for new messages. Message client will listen for new message until no new messages come in and will pause messagae receiving for configured duration. Pause duration can be set by config key kafka.poll_sleep".

func (*MessageClient) String added in v1.1.12

func (client *MessageClient) String() string

String returns log information for current datasource.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL