vivoupdater

package module
v0.0.0-...-3b54913 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2022 License: MIT Imports: 21 Imported by: 0

README

vivoupdater

Vivoupdater subscribes to a Kafka Topic containing recently loaded Vivo triples. It will post into both Vivo and Vivo Widgets to trigger selective re-indexing of relevant people and organizations.

Dependencies

Dependencies are managed with Go Modules

To install:

go get github.com/OIT-ADS-Web/vivoupdater

cd $GOPATH/src/github.com/OIT-ADS-Web/vivoupdater
     
go install github.com/OIT-ADS-Web/vivoupdater...

This will create $GOPATH/bin/vivo_indexer and $GOPATH/bin/fake_produce.

Configuration

The following environment variables are used to control behavior:

APP_ENVIRONMENT

The environment the code is running in (e.g. development|acceptance|production). Also determines where Vault looks for keys (see next)

VAULT_ENDPOINT

The endpoint api of your vault installation

VAULT_KEY

Using the app_role auth method - this is the key value

VAULT_ROLE_ID

The vault role id

VAULT_SECRET_ID

The vault secret id value

BATCH_SIZE

default = 200

BATCH_TIMEOUT

default = 10

VIVO_INDEXER_URL

Where the indexer should post vivo updates

For example:

http://localhost:9080/searchService/updateUrisInSearch

VIVO_EMAIL

The vivo instance admin email, for example:

vivo_root@duke.edu

VIVO_PASSWORD

The vivo instance admin password

WIDGETS_INDEXER_BASE_URL

Where the indexer should post widgets updates

this will be be appened to with either a /person or /organization specification

for example:

http://localhost:8080/widgets/updates

WIDGETS_USER

widgets instance user

WIDGETS_PASSWORD

widgets instance password

BOOTSTRAP_SERVERS

The Kafka list of servers (as comma separated list)

UPDATES_TOPIC

The Kafka topic that has the updates - just one at this point

METRICS_TOPIC

The kafka topic to send metrics

CLIENT_ID

A kafka client name

GROUP_NAME

A kafka group name

Debugging

I've found pprof helpful. You can send in the flag pprof like this:

vivo_indexer -pprof=true

and it will enable (see go pprof):

For example (with Graphviz installed):

go tool pprof -png http://localhost:8484/debug/pprof/heap > out.png

Documentation

Index

Constants

View Source
const OrgFilterRegex = `.*individual/org[0-9]{8}`
View Source
const PersonFilterRegex = `.*individual/per[0-9A-Za-z]{3,}`

Variables

View Source
var AppEnvironment string
View Source
var BatchSize int

misc

View Source
var BatchTimeout int
View Source
var ClientCert string
View Source
var ClientId string
View Source
var ClientKey string
View Source
var GroupName string
View Source
var LogFile string

logging

View Source
var LogMaxAge int
View Source
var LogMaxBackups int
View Source
var LogMaxSize int
View Source
var MetricsTopic string
View Source
var NotificationFrom string
View Source
var NotificationSmtp string

TODO: get rid of notification values entirely?

View Source
var ServerCert string
View Source
var UpdatesTopic string

var Topics CSV

View Source
var VaultEndpoint string

vault

View Source
var VaultKey string
View Source
var VaultRoleId string
View Source
var VaultSecretId string
View Source
var VivoEmail string
View Source
var VivoIndexerUrl string

vivo

View Source
var VivoPassword string
View Source
var WidgetsIndexerBaseUrl string

widgets

View Source
var WidgetsPassword string
View Source
var WidgetsUser string

Functions

func FetchSecrets

func FetchSecrets(config *VaultConfig, paths map[string]string,
	obj interface{}) error

TODO: this is rather convoluted bdcause it matches the methodology used by scholars-data-project kotlin client which maps to java properties one at a time (rather than as group of key/value)

So the SecretMap can (in theory) map a local property to a completely different vault secret e.g. kafka.clientKey = apps/scholars/development/kafka/anotherNameEntirely

func FetchToken

func FetchToken(config *VaultConfig) error

func FluxLine

func FluxLine(measurement string, c interface{}, tags map[string]string) (bytes.Buffer, error)

func GetCertsFromVault

func GetCertsFromVault(env string, config *VaultConfig, kafka *KafkaSubscriber)

func GetProducer

func GetProducer() sarama.AsyncProducer

func IndexBatch

func IndexBatch(idx BatchIndexer, b map[string]bool, logger *log.Logger)

func NewTLSConfig

func NewTLSConfig(ks *KafkaSubscriber) (*tls.Config, error)

func PostToVivo

func PostToVivo(idx VivoIndexer, batch map[string]bool) error

func PostToWidgets

func PostToWidgets(idx WidgetsIndexer, postUrl string, uris ...string) error

NOTE: idx has username, password (and base url)

func Produce

func Produce(topic string, val string)

func SecretsMap

func SecretsMap(appEnv string) map[string]string

func SendMetrics

func SendMetrics(metrics IndexMetrics)

func SetProducer

func SetProducer(p sarama.AsyncProducer)

func SetSubscriber

func SetSubscriber(s *KafkaSubscriber)

func SetupConsumer

func SetupConsumer(ks *KafkaSubscriber) error

NOTE: not sure this is necessary

func SetupProducer

func SetupProducer(ks *KafkaSubscriber) error

func StartConsumer

func StartConsumer(ctx context.Context, ks KafkaSubscriber, handler ConsumerGroupHandler) error

Types

type BatchIndexer

type BatchIndexer interface {
	Name() string
	Index(b map[string]bool, logger *log.Logger) (map[string]bool, error)
}

type CSV

type CSV []string

stole code from here: https://godoc.org/github.com/namsral/flag

var BootstrapFlag CSV

kafka

var NotificationEmail CSV

func (*CSV) Set

func (c *CSV) Set(value string) error

Set is the method to set the flag value, part of the flag.Value interface. Set's argument is a string to be parsed to set the flag. It's a comma-separated list, so we split it.

func (*CSV) String

func (c *CSV) String() string

String is the method to format the flag's value, part of the flag.Value interface. The String method's output will be used in diagnostics.

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	Context context.Context
	Logger  *log.Logger
	Updates chan UpdateMessage
	Quit    chan bool
}

func (ConsumerGroupHandler) Cleanup

func (ConsumerGroupHandler) Setup

type IndexMetrics

type IndexMetrics struct {
	Start   time.Time
	End     time.Time
	Uris    []string
	Name    string
	Success bool
}

type Indexer

type Indexer interface {
	Name() string
}

type KafkaConfig

type KafkaConfig struct {
	BootstrapFlag CSV
	//Topics        CSV
	MetricsTopic string
	UpdatesTopic string
	ClientCert   string
	ClientKey    string
	ServerCert   string
	ClientId     string
	GroupName    string
}

type KafkaSubscriber

type KafkaSubscriber struct {
	Brokers    []string
	Topic      string
	ClientCert string
	ClientKey  string
	ServerCert string
	ClientID   string
	GroupName  string
}
var Subscriber *KafkaSubscriber

func GetSubscriber

func GetSubscriber() *KafkaSubscriber

func (KafkaSubscriber) Subscribe

func (ks KafkaSubscriber) Subscribe(ctx context.Context,
	logger *log.Logger, updates chan UpdateMessage, quit chan bool) error

TODO: seems like too many parameters

type Secrets

type Secrets struct {
	KafkaClientKey  string `mapstructure:"kafka_clientKey"`
	KafkaClientCert string `mapstructure:"kafka_clientCert"`
	KafkaServerCert string `mapstructure:"kafka_serverCert"`
}

type Triple

type Triple struct {
	Subject   string
	Predicate string
	Object    string
}

type UpdateMessage

type UpdateMessage struct {
	Type   string
	Phase  string
	Name   string
	Triple Triple
}

type UpdateSubscriber

type UpdateSubscriber interface {
	MaxConnectAttempts() int
	RetryInterval() int
}

type UriBatcher

type UriBatcher struct {
	BatchSize    int
	BatchTimeout time.Duration
}

func (UriBatcher) Batch

func (ub UriBatcher) Batch(ctx context.Context, logger *log.Logger,
	updates chan UpdateMessage, quit chan bool) chan map[string]bool

type VaultConfig

type VaultConfig struct {
	Endpoint string
	RoleId   string
	SecretId string
	AppId    string
	Token    string
}

type VaultData

type VaultData struct {
	Version Version `json:"data"`
}

type Version

type Version struct {
	Data     map[string]interface{} `json:"data"`
	Metadata map[string]interface{} `json:"metadata"` // NOTE: not using
}

type VivoIndexer

type VivoIndexer struct {
	Url      string
	Username string
	Password string
	Metrics  bool
}

func (VivoIndexer) Index

func (vi VivoIndexer) Index(batch map[string]bool, logger *log.Logger) (map[string]bool, error)

func (VivoIndexer) Name

func (wi VivoIndexer) Name() string

type WidgetsBatchIndexer

type WidgetsBatchIndexer struct {
	Indexer WidgetsIndexer
	Suffix  string
	Regex   *regexp.Regexp
	Uris    []string
}

func NewWidgetsBatchIndexer

func NewWidgetsBatchIndexer(wi WidgetsIndexer, suffix string, regex *regexp.Regexp) *WidgetsBatchIndexer

func (*WidgetsBatchIndexer) Gather

func (wbi *WidgetsBatchIndexer) Gather(u string)

This pulls out URLs the particular indexer might be interested in typically they would look like one of the following:

https://scholars.duke.edu/individual/per3336942 https://scholars.duke.edu/individual/org50344699

func (WidgetsBatchIndexer) IndexUris

func (wbi WidgetsBatchIndexer) IndexUris(logger *log.Logger) error

type WidgetsIndexer

type WidgetsIndexer struct {
	Url      string
	Username string
	Password string
	Metrics  bool
}

func (WidgetsIndexer) Index

func (wi WidgetsIndexer) Index(batch map[string]bool, logger *log.Logger) (map[string]bool, error)

func (WidgetsIndexer) Name

func (wi WidgetsIndexer) Name() string

type WidgetsUpdateMessage

type WidgetsUpdateMessage struct {
	Uris []string `json:"uris"`
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL