service

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var GetElasticSearchClient = func(ctx context.Context, cfg *config.Config) (dpESClient.Client, error) {
	esConfig := dpESClient.Config{
		ClientLib: dpESClient.GoElasticV710,
		Address:   cfg.ElasticSearchAPIURL,
	}

	if cfg.SignElasticsearchRequests {
		awsSigner, err := dpawsauth.NewAWSSignerRoundTripper("", "", cfg.AwsRegion, cfg.AwsService)
		if err != nil {
			return nil, fmt.Errorf("failed to create aws v4 signer: %w", err)
		}
		esConfig.Transport = awsSigner
	}

	esClient, esClientErr := dpES.NewClient(esConfig)
	if esClientErr != nil {
		log.Error(ctx, "Failed to create dp-elasticsearch client", esClientErr)
		return nil, esClientErr
	}
	return esClient, nil
}

GetElasticSearchClient returns an Elastic Search Client with the AWS signer if the flag 'SignElasticsearchRequests' is enabled

View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer {
	s := dphttp.NewServer(bindAddr, router)
	s.HandleOSSignals = false
	return s
}

GetHTTPServer creates an HTTP Server with the provided bind address and router

View Source
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) {
	versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version)
	if err != nil {
		return nil, fmt.Errorf("failed to get version info: %w", err)
	}
	hc := healthcheck.New(
		versionInfo,
		cfg.HealthCheckCriticalTimeout,
		cfg.HealthCheckInterval,
	)
	return &hc, nil
}

GetHealthCheck creates a healthcheck with versionInfo

View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Kafka) (kafka.IConsumerGroup, error) {
	if cfg == nil {
		return nil, errors.New("cannot create a kafka consumer without kafka config")
	}
	kafkaOffset := kafka.OffsetNewest
	if cfg.OffsetOldest {
		kafkaOffset = kafka.OffsetOldest
	}
	cgConfig := &kafka.ConsumerGroupConfig{
		BrokerAddrs:       cfg.Addr,
		Topic:             cfg.PublishedContentTopic,
		GroupName:         cfg.PublishedContentGroup,
		MinBrokersHealthy: &cfg.ConsumerMinBrokersHealthy,
		KafkaVersion:      &cfg.Version,
		Offset:            &kafkaOffset,
	}
	if cfg.SecProtocol == config.KafkaTLSProtocol {
		cgConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.SecCACerts,
			cfg.SecClientCert,
			cfg.SecClientKey,
			cfg.SecSkipVerify,
		)
	}
	return kafka.NewConsumerGroup(ctx, cgConfig)
}

GetKafkaConsumer returns a Kafka Consumer group

Functions

This section is empty.

Types

type ElasticSearch added in v0.13.0

type ElasticSearch = dpelasticsearch.Client

ElasticSearch is an alias for the dp-elasticsearch client interface

type HTTPServer

type HTTPServer interface {
	ListenAndServe() error
	Shutdown(ctx context.Context) error
}

HTTPServer defines the required methods from the HTTP server

type HealthChecker

type HealthChecker interface {
	Handler(w http.ResponseWriter, req *http.Request)
	Start(ctx context.Context)
	Stop()
	AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error)
	Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check)
}

HealthChecker defines the required methods from Healthcheck

type Service

type Service struct {
	Cfg         *config.Config
	Server      HTTPServer
	HealthCheck HealthChecker
	Consumer    dpkafka.IConsumerGroup
	EsCli       dpelasticsearch.Client
}

Service contains all the configs, server and clients to run the event handler service

func New added in v0.13.0

func New() *Service

func (*Service) Close

func (svc *Service) Close(ctx context.Context) error

Close gracefully shuts the service down in the required order, with timeout

func (*Service) Init added in v0.13.0

func (svc *Service) Init(ctx context.Context, cfg *config.Config, buildTime, gitCommit, version string) error

func (*Service) Start added in v0.13.0

func (svc *Service) Start(ctx context.Context, svcErrors chan error) error

Start the service

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL