Documentation ¶
Index ¶
Constants ¶
const VaultRetries = 3
Variables ¶
var GetCantabularClient = func(cfg *config.Config) CantabularClient { return cantabular.NewClient( cantabular.Config{ Host: cfg.CantabularURL, ExtApiHost: cfg.CantabularExtURL, GraphQLTimeout: cfg.DefaultRequestTimeout, }, dphttp.ClientWithTimeout(nil, cfg.DefaultRequestTimeout), nil, ) }
GetCantabularClient gets and initialises the Cantabular Client
var GetDatasetAPIClient = func(cfg *config.Config) DatasetAPIClient { return dataset.NewAPIClient(cfg.DatasetAPIURL) }
GetDatasetAPIClient gets and initialises the DatasetAPI Client
var GetFilterAPIClient = func(cfg *config.Config) FilterAPIClient { return filter.New(cfg.FilterAPIURL) }
GetFilterAPIClient gets and initialises the FilterAPI Client
var GetGenerator = func() Generator { return generator.New() }
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer { s := dphttp.NewServer(bindAddr, router) s.HandleOSSignals = false return s }
GetHTTPServer creates an http server and sets the Server
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) { versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version) if err != nil { return nil, fmt.Errorf("failed to get version info: %w", err) } hc := healthcheck.New( versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval, ) return &hc, nil }
GetHealthCheck creates a healthcheck with versionInfo
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) { kafkaOffset := kafka.OffsetNewest if cfg.KafkaConfig.OffsetOldest { kafkaOffset = kafka.OffsetOldest } cgConfig := &kafka.ConsumerGroupConfig{ BrokerAddrs: cfg.KafkaConfig.Addr, Topic: cfg.KafkaConfig.ExportStartTopic, GroupName: cfg.KafkaConfig.ExportStartGroup, MinBrokersHealthy: &cfg.KafkaConfig.ConsumerMinBrokersHealthy, KafkaVersion: &cfg.KafkaConfig.Version, Offset: &kafkaOffset, } if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag { cgConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.KafkaConfig.SecCACerts, cfg.KafkaConfig.SecClientCert, cfg.KafkaConfig.SecClientKey, cfg.KafkaConfig.SecSkipVerify, ) } return kafka.NewConsumerGroup(ctx, cgConfig) }
GetKafkaConsumer creates a Kafka consumer
var GetKafkaProducer = func(ctx context.Context, cfg *config.Config) (kafka.IProducer, error) { pConfig := &kafka.ProducerConfig{ BrokerAddrs: cfg.KafkaConfig.Addr, Topic: cfg.KafkaConfig.CsvCreatedTopic, MinBrokersHealthy: &cfg.KafkaConfig.ProducerMinBrokersHealthy, KafkaVersion: &cfg.KafkaConfig.Version, MaxMessageBytes: &cfg.KafkaConfig.MaxBytes, } if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag { pConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.KafkaConfig.SecCACerts, cfg.KafkaConfig.SecClientCert, cfg.KafkaConfig.SecClientKey, cfg.KafkaConfig.SecSkipVerify, ) } return kafka.NewProducer(ctx, pConfig) }
GetKafkaProducer creates a Kafka producer
var GetS3Clients = func(cfg *config.Config) (private, public S3Client, err error) { if cfg.LocalObjectStore != "" { s3Config := &aws.Config{ Credentials: credentials.NewStaticCredentials(cfg.MinioAccessKey, cfg.MinioSecretKey, ""), Endpoint: aws.String(cfg.LocalObjectStore), Region: aws.String(cfg.AWSRegion), DisableSSL: aws.Bool(true), S3ForcePathStyle: aws.Bool(true), } s, err := session.NewSession(s3Config) if err != nil { return nil, nil, fmt.Errorf("failed to create aws session: %w", err) } return dps3.NewClientWithSession(cfg.PrivateUploadBucketName, s), dps3.NewClientWithSession(cfg.PublicUploadBucketName, s), nil } private, err = dps3.NewClient(cfg.AWSRegion, cfg.PrivateUploadBucketName) if err != nil { return nil, nil, fmt.Errorf("failed to create S3 Client: %w", err) } public = dps3.NewClientWithSession(cfg.PublicUploadBucketName, private.Session()) return private, public, nil }
GetS3Clients creates the private and public S3 Clients using the same AWS session
var GetVault = func(cfg *config.Config) (VaultClient, error) { return vault.CreateClient(cfg.VaultToken, cfg.VaultAddress, VaultRetries) }
GetVault creates a VaultClient
Functions ¶
This section is empty.
Types ¶
type CantabularClient ¶ added in v0.2.0
type CantabularClient interface { StaticDatasetQueryStreamCSV(ctx context.Context, req cantabular.StaticDatasetQueryRequest, consume cantabular.Consumer) (rowCount int32, err error) Checker(context.Context, *healthcheck.CheckState) error CheckerAPIExt(ctx context.Context, state *healthcheck.CheckState) error }
type DatasetAPIClient ¶ added in v0.2.0
type DatasetAPIClient interface { GetInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID, ifMatch string) (m dataset.Instance, eTag string, err error) PutVersion(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, datasetID, edition, version string, v dataset.Version) error Checker(context.Context, *healthcheck.CheckState) error }
type FilterAPIClient ¶ added in v1.0.0
type FilterAPIClient interface { GetDimensions(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, filterID string, q *filter.QueryParams) (dims filter.Dimensions, eTag string, err error) GetOutput(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceToken, collectionID, filterOutputID string) (m filter.Model, err error) UpdateFilterOutput(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceToken, filterOutputID string, m *filter.Model) error Checker(context.Context, *healthcheck.CheckState) error }
type Generator ¶ added in v0.4.0
Generator contains methods for dynamically required strings and tokens e.g. UUIDs, PSKs.
type HTTPServer ¶
HTTPServer defines the required methods from the HTTP server
type HealthChecker ¶
type HealthChecker interface { Handler(w http.ResponseWriter, req *http.Request) Start(ctx context.Context) Stop() AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error) Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check) }
HealthChecker defines the required methods from Healthcheck
type Initialiser ¶
type Initialiser interface { DoGetHTTPServer(bindAddr string, router http.Handler) HTTPServer DoGetHealthCheck(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) DoGetKafkaConsumer(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) }
Initialiser defines the methods to initialise external services
type S3Client ¶ added in v0.2.0
type S3Client interface { Head(key string) (*s3.HeadObjectOutput, error) UploadWithContext(ctx context.Context, input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) UploadWithPSK(input *s3manager.UploadInput, psk []byte) (*s3manager.UploadOutput, error) BucketName() string Session() *session.Session Checker(context.Context, *healthcheck.CheckState) error }
type Service ¶
type Service struct { Cfg *config.Config Server HTTPServer HealthCheck HealthChecker Consumer kafka.IConsumerGroup Producer kafka.IProducer DatasetAPIClient DatasetAPIClient FilterAPIClient FilterAPIClient CantabularClient CantabularClient S3PrivateClient S3Client S3PublicClient S3Client VaultClient VaultClient // contains filtered or unexported fields }
Service contains all the configs, server and clients to run the event handler service
type VaultClient ¶ added in v0.3.0
type VaultClient interface { WriteKey(path, key, value string) error Checker(context.Context, *healthcheck.CheckState) error }