service

package
v1.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2023 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const VaultRetries = 3

Variables

View Source
var GetCantabularClient = func(cfg *config.Config) CantabularClient {
	return cantabular.NewClient(
		cantabular.Config{
			Host:           cfg.CantabularURL,
			ExtApiHost:     cfg.CantabularExtURL,
			GraphQLTimeout: cfg.DefaultRequestTimeout,
		},
		dphttp.ClientWithTimeout(nil, cfg.DefaultRequestTimeout),
		nil,
	)
}

GetCantabularClient gets and initialises the Cantabular Client

View Source
var GetDatasetAPIClient = func(cfg *config.Config) DatasetAPIClient {
	return dataset.NewAPIClient(cfg.DatasetAPIURL)
}

GetDatasetAPIClient gets and initialises the DatasetAPI Client

View Source
var GetFilterAPIClient = func(cfg *config.Config) FilterAPIClient {
	return filter.New(cfg.FilterAPIURL)
}

GetFilterAPIClient gets and initialises the FilterAPI Client

View Source
var GetGenerator = func() Generator {
	return generator.New()
}
View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer {
	s := dphttp.NewServer(bindAddr, router)
	s.HandleOSSignals = false
	return s
}

GetHTTPServer creates an http server and sets the Server

View Source
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) {
	versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version)
	if err != nil {
		return nil, fmt.Errorf("failed to get version info: %w", err)
	}

	hc := healthcheck.New(
		versionInfo,
		cfg.HealthCheckCriticalTimeout,
		cfg.HealthCheckInterval,
	)
	return &hc, nil
}

GetHealthCheck creates a healthcheck with versionInfo

View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) {
	kafkaOffset := kafka.OffsetNewest
	if cfg.KafkaConfig.OffsetOldest {
		kafkaOffset = kafka.OffsetOldest
	}
	cgConfig := &kafka.ConsumerGroupConfig{
		BrokerAddrs:       cfg.KafkaConfig.Addr,
		Topic:             cfg.KafkaConfig.ExportStartTopic,
		GroupName:         cfg.KafkaConfig.ExportStartGroup,
		MinBrokersHealthy: &cfg.KafkaConfig.ConsumerMinBrokersHealthy,
		KafkaVersion:      &cfg.KafkaConfig.Version,
		Offset:            &kafkaOffset,
	}
	if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
		cgConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.KafkaConfig.SecCACerts,
			cfg.KafkaConfig.SecClientCert,
			cfg.KafkaConfig.SecClientKey,
			cfg.KafkaConfig.SecSkipVerify,
		)
	}
	return kafka.NewConsumerGroup(ctx, cgConfig)
}

GetKafkaConsumer creates a Kafka consumer

View Source
var GetKafkaProducer = func(ctx context.Context, cfg *config.Config) (kafka.IProducer, error) {
	pConfig := &kafka.ProducerConfig{
		BrokerAddrs:       cfg.KafkaConfig.Addr,
		Topic:             cfg.KafkaConfig.CsvCreatedTopic,
		MinBrokersHealthy: &cfg.KafkaConfig.ProducerMinBrokersHealthy,
		KafkaVersion:      &cfg.KafkaConfig.Version,
		MaxMessageBytes:   &cfg.KafkaConfig.MaxBytes,
	}
	if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
		pConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.KafkaConfig.SecCACerts,
			cfg.KafkaConfig.SecClientCert,
			cfg.KafkaConfig.SecClientKey,
			cfg.KafkaConfig.SecSkipVerify,
		)
	}
	return kafka.NewProducer(ctx, pConfig)
}

GetKafkaProducer creates a Kafka producer

View Source
var GetS3Clients = func(cfg *config.Config) (private, public S3Client, err error) {
	if cfg.LocalObjectStore != "" {
		s3Config := &aws.Config{
			Credentials:      credentials.NewStaticCredentials(cfg.MinioAccessKey, cfg.MinioSecretKey, ""),
			Endpoint:         aws.String(cfg.LocalObjectStore),
			Region:           aws.String(cfg.AWSRegion),
			DisableSSL:       aws.Bool(true),
			S3ForcePathStyle: aws.Bool(true),
		}

		s, err := session.NewSession(s3Config)
		if err != nil {
			return nil, nil, fmt.Errorf("failed to create aws session: %w", err)
		}
		return dps3.NewClientWithSession(cfg.PrivateUploadBucketName, s),
			dps3.NewClientWithSession(cfg.PublicUploadBucketName, s),
			nil
	}

	private, err = dps3.NewClient(cfg.AWSRegion, cfg.PrivateUploadBucketName)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to create S3 Client: %w", err)
	}
	public = dps3.NewClientWithSession(cfg.PublicUploadBucketName, private.Session())
	return private, public, nil
}

GetS3Clients creates the private and public S3 Clients using the same AWS session

View Source
var GetVault = func(cfg *config.Config) (VaultClient, error) {
	return vault.CreateClient(cfg.VaultToken, cfg.VaultAddress, VaultRetries)
}

GetVault creates a VaultClient

Functions

This section is empty.

Types

type CantabularClient added in v0.2.0

type CantabularClient interface {
	StaticDatasetQueryStreamCSV(ctx context.Context, req cantabular.StaticDatasetQueryRequest, consume cantabular.Consumer) (rowCount int32, err error)
	Checker(context.Context, *healthcheck.CheckState) error
	CheckerAPIExt(ctx context.Context, state *healthcheck.CheckState) error
}

type DatasetAPIClient added in v0.2.0

type DatasetAPIClient interface {
	GetInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID, ifMatch string) (m dataset.Instance, eTag string, err error)
	PutVersion(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, datasetID, edition, version string, v dataset.Version) error
	Checker(context.Context, *healthcheck.CheckState) error
}

type FilterAPIClient added in v1.0.0

type FilterAPIClient interface {
	GetDimensions(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, filterID string, q *filter.QueryParams) (dims filter.Dimensions, eTag string, err error)
	GetOutput(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceToken, collectionID, filterOutputID string) (m filter.Model, err error)
	UpdateFilterOutput(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceToken, filterOutputID string, m *filter.Model) error
	Checker(context.Context, *healthcheck.CheckState) error
}

type Generator added in v0.4.0

type Generator interface {
	NewPSK() ([]byte, error)
	Timestamp() time.Time
}

Generator contains methods for dynamically required strings and tokens e.g. UUIDs, PSKs.

type HTTPServer

type HTTPServer interface {
	ListenAndServe() error
	Shutdown(ctx context.Context) error
}

HTTPServer defines the required methods from the HTTP server

type HealthChecker

type HealthChecker interface {
	Handler(w http.ResponseWriter, req *http.Request)
	Start(ctx context.Context)
	Stop()
	AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error)
	Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check)
}

HealthChecker defines the required methods from Healthcheck

type Initialiser

type Initialiser interface {
	DoGetHTTPServer(bindAddr string, router http.Handler) HTTPServer
	DoGetHealthCheck(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error)
	DoGetKafkaConsumer(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error)
}

Initialiser defines the methods to initialise external services

type S3Client added in v0.2.0

type S3Client interface {
	Head(key string) (*s3.HeadObjectOutput, error)
	UploadWithContext(ctx context.Context, input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
	UploadWithPSK(input *s3manager.UploadInput, psk []byte) (*s3manager.UploadOutput, error)
	BucketName() string
	Session() *session.Session
	Checker(context.Context, *healthcheck.CheckState) error
}

type Service

type Service struct {
	Cfg              *config.Config
	Server           HTTPServer
	HealthCheck      HealthChecker
	Consumer         kafka.IConsumerGroup
	Producer         kafka.IProducer
	DatasetAPIClient DatasetAPIClient
	FilterAPIClient  FilterAPIClient
	CantabularClient CantabularClient
	S3PrivateClient  S3Client
	S3PublicClient   S3Client
	VaultClient      VaultClient
	// contains filtered or unexported fields
}

Service contains all the configs, server and clients to run the event handler service

func New added in v0.2.0

func New() *Service

func (*Service) Close

func (svc *Service) Close(ctx context.Context) error

Close gracefully shuts the service down in the required order, with timeout

func (*Service) Init added in v0.2.0

func (svc *Service) Init(ctx context.Context, cfg *config.Config, buildTime, gitCommit, version string) error

Init initialises the service and it's dependencies

func (*Service) Start added in v0.2.0

func (svc *Service) Start(ctx context.Context, svcErrors chan error) error

Start the service

type VaultClient added in v0.3.0

type VaultClient interface {
	WriteKey(path, key, value string) error
	Checker(context.Context, *healthcheck.CheckState) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL