inprocess

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2022 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Overview

Package inprocess contains code for spinning up M3 resources in-process for the sake of integration testing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConfigurePlacementsForAggregation added in v1.4.2

func ConfigurePlacementsForAggregation(
	nodes resources.Nodes,
	coord resources.Coordinator,
	aggs resources.Aggregators,
	specs ClusterSpecification,
	opts resources.ClusterOptions,
) error

ConfigurePlacementsForAggregation sets up the correct placement information for coordinators and aggregators when aggregation is enabled.

func GenerateAggregatorConfigsForCluster added in v1.4.0

func GenerateAggregatorConfigsForCluster(
	configs ClusterConfigs,
	opts *resources.AggregatorClusterOptions,
) ([]aggcfg.Configuration, error)

GenerateAggregatorConfigsForCluster generates the unique configs for each aggregator instance.

func NewAggregator

func NewAggregator(cfg config.Configuration, opts AggregatorOptions) (resources.Aggregator, error)

NewAggregator creates a new in-process aggregator based on the configuration and options provided.

func NewAggregatorFromYAML added in v1.4.0

func NewAggregatorFromYAML(yamlCfg string, opts AggregatorOptions) (resources.Aggregator, error)

NewAggregatorFromYAML creates a new in-process aggregator based on the yaml configuration and options provided.

func NewCluster

func NewCluster(
	configs ClusterConfigs,
	opts resources.ClusterOptions,
) (resources.M3Resources, error)

NewCluster creates a new M3 cluster based on the ClusterOptions provided. Expects at least a coordinator, a dbnode and an aggregator config.

func NewClusterFromSpecification added in v1.4.0

func NewClusterFromSpecification(
	specs ClusterSpecification,
	opts resources.ClusterOptions,
) (resources.M3Resources, error)

NewClusterFromSpecification creates a new M3 cluster with the given ClusterSpecification.

func NewCoordinator

NewCoordinator creates a new in-process coordinator based on the configuration and options provided. Use NewCoordinator or any of the convenience constructors (e.g. NewCoordinatorFromYAML, NewCoordinatorFromConfigFile) to get a running coordinator.

The most typical usage of this method will be in an integration test to validate some behavior. For example, assuming we have a running DB node already, we could do the following to create a new namespace and write to it (note: ignoring error checking):

coord, _ := NewCoordinatorFromYAML(defaultCoordConfig, CoordinatorOptions{})
coord.AddNamespace(admin.NamespaceAddRequest{...})
coord.WaitForNamespace(namespaceName)
coord.WriteProm("cpu", map[string]string{"host", host}, samples)

The coordinator will start up as you specify in your config. However, there is some helper logic to avoid port and filesystem collisions when spinning up multiple components within the process. If you specify a GeneratePorts: true in the CoordinatorOptions, address ports will be replaced with an open port.

Similarly, filepath fields will be updated with a temp directory that will be cleaned up when the coordinator is destroyed. This should ensure that many of the same component can be spun up in-process without any issues with collisions.

func NewCoordinatorFromConfigFile

func NewCoordinatorFromConfigFile(pathToCfg string, opts CoordinatorOptions) (resources.Coordinator, error)

NewCoordinatorFromConfigFile creates a new in-process coordinator based on the config file and options provided.

func NewCoordinatorFromYAML

func NewCoordinatorFromYAML(yamlCfg string, opts CoordinatorOptions) (resources.Coordinator, error)

NewCoordinatorFromYAML creates a new in-process coordinator based on the YAML configuration string and options provided.

func NewDBNode

func NewDBNode(cfg config.Configuration, opts DBNodeOptions) (resources.Node, error)

NewDBNode creates a new in-process DB node based on the configuration and options provided. Use NewDBNode or any of the convenience constructors (e.g. NewDBNodeFromYAML, NewDBNodeFromConfigFile) to get a running dbnode.

The most typical usage of this method will be in an integration test to validate some behavior. For example, assuming we have a valid placement available already we could do the following to read and write to a namespace (note: ignoring error checking):

dbnode, _ := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{})
dbnode.WaitForBootstrap()
dbnode.WriteTaggedPoint(&rpc.WriteTaggedRequest{...}))
res, _ = dbnode.FetchTagged(&rpc.FetchTaggedRequest{...})

The dbnode will start up as you specify in your config. However, there is some helper logic to avoid port and filesystem collisions when spinning up multiple components within the process. If you specify a GeneratePorts: true in the DBNodeOptions, address ports will be replaced with an open port.

Similarly, filepath fields will be updated with a temp directory that will be cleaned up when the dbnode is destroyed. This should ensure that many of the same component can be spun up in-process without any issues with collisions.

func NewDBNodeFromConfigFile

func NewDBNodeFromConfigFile(pathToCfg string, opts DBNodeOptions) (resources.Node, error)

NewDBNodeFromConfigFile creates a new in-process DB node based on the config file and options provided.

func NewDBNodeFromYAML

func NewDBNodeFromYAML(yamlCfg string, opts DBNodeOptions) (resources.Node, error)

NewDBNodeFromYAML creates a new in-process DB node based on the YAML configuration string and options provided.

func NewEmbeddedCoordinator

func NewEmbeddedCoordinator(d *DBNode) (resources.Coordinator, error)

NewEmbeddedCoordinator creates a coordinator from one embedded within an existing db node. This method expects that the DB node has already been started before being called.

func NewM3Resources

func NewM3Resources(options ResourceOptions) resources.M3Resources

NewM3Resources returns an implementation of resources.M3Resources backed by in-process implementations of the M3 components.

Types

type Aggregator added in v1.4.0

type Aggregator struct {
	// contains filtered or unexported fields
}

Aggregator is an in-process implementation of resources.Aggregator for use in integration tests.

func (*Aggregator) Close added in v1.4.0

func (a *Aggregator) Close() error

Close closes the wrapper and releases any held resources, including deleting docker containers.

func (*Aggregator) Configuration added in v1.4.2

func (a *Aggregator) Configuration() config.Configuration

Configuration returns a copy of the configuration used to start this aggregator.

func (*Aggregator) HostDetails added in v1.4.0

func (a *Aggregator) HostDetails() (*resources.InstanceInfo, error)

HostDetails returns the aggregator's host details.

func (*Aggregator) IsHealthy added in v1.4.0

func (a *Aggregator) IsHealthy() error

IsHealthy determines whether an instance is healthy.

func (*Aggregator) Resign added in v1.4.0

func (a *Aggregator) Resign() error

Resign asks an aggregator instance to give up its current leader role if applicable.

func (*Aggregator) Start added in v1.4.0

func (a *Aggregator) Start()

Start starts the aggregator instance.

func (*Aggregator) Status added in v1.4.0

func (a *Aggregator) Status() (m3agg.RuntimeStatus, error)

Status returns the instance status.

type AggregatorOptions

type AggregatorOptions struct {
	// Logger is the logger to use for the in-process aggregator.
	Logger *zap.Logger
	// StartFn is a custom function that can be used to start the Aggregator.
	StartFn AggregatorStartFn
	// Start indicates whether to start the aggregator instance
	Start bool
	// GeneratePorts will automatically update the config to use open ports
	// if set to true. If false, configuration is used as-is re: ports.
	GeneratePorts bool
	// GenerateHostID will automatically update the host ID specified in
	// the config if set to true. If false, configuration is used as-is re: host ID.
	GenerateHostID bool
}

AggregatorOptions are options of starting an in-process aggregator.

type AggregatorStartFn added in v1.4.2

type AggregatorStartFn func(cfg *aggcfg.Configuration) (chan<- error, <-chan struct{})

AggregatorStartFn is a custom function that can be used to start an aggregator. Function must return a channel for interrupting the server and a channel for receiving notifications that the server has shut down.

type ClusterConfigs added in v1.4.0

type ClusterConfigs struct {
	// DBNode is the configuration for db nodes.
	DBNode dbcfg.Configuration
	// Coordinator is the configuration for the coordinator.
	Coordinator coordinatorcfg.Configuration
	// Aggregator is the configuration for aggregators.
	// If Aggregator is nil, the cluster contains only m3coordinator and dbnodes.
	Aggregator *aggcfg.Configuration
}

ClusterConfigs contain the input config to use for components within the cluster. There is one default configuration for each type of component. Given a set of ClusterConfigs, the function NewCluster can spin up an m3 cluster. Or one can use GenerateClusterSpecification to get the per-instance configuration and options based on the given ClusterConfigs.

func NewClusterConfigsFromConfigFile added in v1.4.0

func NewClusterConfigsFromConfigFile(
	pathToDBNodeCfg string,
	pathToCoordCfg string,
	pathToAggCfg string,
) (ClusterConfigs, error)

NewClusterConfigsFromConfigFile creates a new ClusterConfigs object from the provided filepaths for dbnode and coordinator configuration.

func NewClusterConfigsFromYAML added in v1.4.0

func NewClusterConfigsFromYAML(dbnodeYaml string, coordYaml string, aggYaml string) (ClusterConfigs, error)

NewClusterConfigsFromYAML creates a new ClusterConfigs object from YAML strings representing component configs.

type ClusterSpecification added in v1.4.0

type ClusterSpecification struct {
	// Configs contains the per-instance configuration for all components in the cluster.
	Configs PerInstanceConfigs
	// Options contains the per-insatance options for setting up the cluster.
	Options PerInstanceOptions
}

ClusterSpecification contain the per-instance configuration and options to use for starting each components within the cluster. The function NewClusterFromSpecification will spin up an m3 cluster with the given ClusterSpecification.

func GenerateClusterSpecification added in v1.4.0

func GenerateClusterSpecification(
	configs ClusterConfigs,
	opts resources.ClusterOptions,
) (ClusterSpecification, error)

GenerateClusterSpecification generates the per-instance configuration and options for the cluster set up based on the given input configuation and options.

type Coordinator added in v1.4.0

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator is an in-process implementation of resources.Coordinator for use in integration tests.

func (*Coordinator) AddM3msgTopicConsumer added in v1.4.0

func (c *Coordinator) AddM3msgTopicConsumer(
	opts resources.M3msgTopicOptions,
	req admin.TopicAddRequest,
) (admin.TopicGetResponse, error)

AddM3msgTopicConsumer adds a consumer service to an m3msg topic.

func (*Coordinator) AddNamespace added in v1.4.0

AddNamespace adds a namespace.

func (*Coordinator) ApplyKVUpdate added in v1.4.0

func (c *Coordinator) ApplyKVUpdate(update string) error

ApplyKVUpdate applies a KV update.

func (*Coordinator) Close added in v1.4.0

func (c *Coordinator) Close() error

Close closes the wrapper and releases any held resources, including deleting docker containers.

func (*Coordinator) Configuration added in v1.4.2

func (c *Coordinator) Configuration() config.Configuration

Configuration returns a copy of the configuration used to start this coordinator.

func (*Coordinator) CreateDatabase added in v1.4.0

CreateDatabase creates a database.

func (*Coordinator) DeleteAllPlacements added in v1.4.0

func (c *Coordinator) DeleteAllPlacements(
	opts resources.PlacementRequestOptions,
) error

DeleteAllPlacements deletes all placements for the service specified in the PlacementRequestOptions.

func (*Coordinator) DeleteNamespace added in v1.4.0

func (c *Coordinator) DeleteNamespace(namespaceID string) error

DeleteNamespace removes the namespace.

func (*Coordinator) GetM3msgTopic added in v1.4.0

func (c *Coordinator) GetM3msgTopic(
	opts resources.M3msgTopicOptions,
) (admin.TopicGetResponse, error)

GetM3msgTopic gets an m3msg topic.

func (*Coordinator) GetNamespace added in v1.4.0

func (c *Coordinator) GetNamespace() (admin.NamespaceGetResponse, error)

GetNamespace gets namespaces.

func (*Coordinator) GetPlacement added in v1.4.0

GetPlacement gets placements.

func (*Coordinator) GraphiteQuery added in v1.4.0

GraphiteQuery retrieves graphite raw data.

func (*Coordinator) HostDetails added in v1.4.0

func (c *Coordinator) HostDetails() (*resources.InstanceInfo, error)

HostDetails returns the coordinator's host details.

func (*Coordinator) InitM3msgTopic added in v1.4.0

InitM3msgTopic initializes an m3msg topic.

func (*Coordinator) InitPlacement added in v1.4.0

InitPlacement initializes placements.

func (*Coordinator) InstantQuery added in v1.4.0

func (c *Coordinator) InstantQuery(
	req resources.QueryRequest,
	headers resources.Headers,
) (model.Vector, error)

InstantQuery runs an instant query with provided headers

func (*Coordinator) InstantQueryWithEngine added in v1.4.2

func (c *Coordinator) InstantQueryWithEngine(
	req resources.QueryRequest,
	engine options.QueryEngine,
	headers resources.Headers,
) (model.Vector, error)

InstantQueryWithEngine runs an instant query with provided headers and the specified query engine.

func (*Coordinator) LabelNames added in v1.4.0

func (c *Coordinator) LabelNames(
	req resources.LabelNamesRequest,
	headers resources.Headers,
) (model.LabelNames, error)

LabelNames return matching label names based on the request.

func (*Coordinator) LabelValues added in v1.4.0

func (c *Coordinator) LabelValues(
	req resources.LabelValuesRequest,
	headers resources.Headers,
) (model.LabelValues, error)

LabelValues returns matching label values based on the request.

func (*Coordinator) RangeQuery added in v1.4.0

func (c *Coordinator) RangeQuery(
	req resources.RangeQueryRequest,
	headers resources.Headers,
) (model.Matrix, error)

RangeQuery runs a range query with provided headers

func (*Coordinator) RangeQueryWithEngine added in v1.4.2

func (c *Coordinator) RangeQueryWithEngine(
	req resources.RangeQueryRequest,
	engine options.QueryEngine,
	headers resources.Headers,
) (model.Matrix, error)

RangeQueryWithEngine runs a range query with provided headers and the specified query engine.

func (*Coordinator) RunQuery added in v1.4.0

func (c *Coordinator) RunQuery(
	verifier resources.ResponseVerifier,
	query string,
	headers resources.Headers,
) error

RunQuery runs the given query with a given verification function.

func (*Coordinator) Series added in v1.4.0

func (c *Coordinator) Series(
	req resources.SeriesRequest,
	headers resources.Headers,
) ([]model.Metric, error)

Series returns matching series based on the request.

func (*Coordinator) Start added in v1.4.2

func (c *Coordinator) Start()

Start is the start method for the coordinator.

func (*Coordinator) UpdateNamespace added in v1.4.0

UpdateNamespace updates the namespace.

func (*Coordinator) WaitForClusterReady added in v1.4.0

func (c *Coordinator) WaitForClusterReady() error

WaitForClusterReady waits until the cluster is ready to receive reads and writes.

func (*Coordinator) WaitForInstances added in v1.4.0

func (c *Coordinator) WaitForInstances(ids []string) error

WaitForInstances blocks until the given instance is available.

func (*Coordinator) WaitForNamespace added in v1.4.0

func (c *Coordinator) WaitForNamespace(name string) error

WaitForNamespace blocks until the given namespace is enabled.

func (*Coordinator) WaitForShardsReady added in v1.4.0

func (c *Coordinator) WaitForShardsReady() error

WaitForShardsReady waits until all shards gets ready.

func (*Coordinator) WriteCarbon added in v1.4.0

func (c *Coordinator) WriteCarbon(port int, metric string, v float64, t time.Time) error

WriteCarbon writes a carbon metric datapoint at a given time.

func (*Coordinator) WriteProm added in v1.4.0

func (c *Coordinator) WriteProm(
	name string,
	tags map[string]string,
	samples []prompb.Sample,
	headers resources.Headers,
) error

WriteProm writes a prometheus metric. Takes tags/labels as a map for convenience.

func (*Coordinator) WritePromWithRequest added in v1.4.2

func (c *Coordinator) WritePromWithRequest(writeRequest prompb.WriteRequest, headers resources.Headers) error

WritePromWithRequest executes a prometheus write request. Allows you to provide the request directly which is useful for batch metric requests.

type CoordinatorOptions

type CoordinatorOptions struct {
	// GeneratePorts will automatically update the config to use open ports
	// if set to true. If false, configuration is used as-is re: ports.
	GeneratePorts bool
	// StartFn is a custom function that can be used to start the Coordinator.
	StartFn CoordinatorStartFn
	// Start indicates whether to start the coordinator instance.
	Start bool
	// Logger is the logger to use for the coordinator. If not provided,
	// a default one will be created.
	Logger *zap.Logger
}

CoordinatorOptions are options for starting a coordinator server.

type CoordinatorStartFn added in v1.4.2

type CoordinatorStartFn func(cfg *coordcfg.Configuration) (chan<- error, <-chan struct{})

CoordinatorStartFn is a custom function that can be used to start a coordinator. Function must return a channel for interrupting the server and a channel for receiving notifications that the server has shut down.

type DBNode added in v1.4.0

type DBNode struct {
	// contains filtered or unexported fields
}

DBNode is an in-process implementation of resources.Node.

func (*DBNode) AggregateTiles added in v1.4.0

func (d *DBNode) AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error)

AggregateTiles starts tiles aggregation, waits until it will complete and returns the amount of aggregated tiles.

func (*DBNode) Close added in v1.4.0

func (d *DBNode) Close() error

Close closes the wrapper and releases any held resources, including deleting docker containers.

func (*DBNode) Configuration added in v1.4.2

func (d *DBNode) Configuration() config.Configuration

Configuration returns a copy of the configuration used to start this dbnode.

func (*DBNode) Exec added in v1.4.0

func (d *DBNode) Exec(commands ...string) (string, error)

Exec executes the given commands on the node container, returning stdout and stderr from the container.

func (*DBNode) Fetch added in v1.4.0

func (d *DBNode) Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error)

Fetch fetches datapoints.

func (*DBNode) FetchTagged added in v1.4.0

func (d *DBNode) FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error)

FetchTagged fetches datapoints by tag.

func (*DBNode) GoalStateExec added in v1.4.0

func (d *DBNode) GoalStateExec(verifier resources.GoalStateVerifier, commands ...string) error

GoalStateExec executes the given commands on the node container, retrying until applying the verifier returns no error or the default timeout.

func (*DBNode) Health added in v1.4.0

func (d *DBNode) Health() (*rpc.NodeHealthResult_, error)

Health gives this node's health.

func (*DBNode) HostDetails added in v1.4.0

func (d *DBNode) HostDetails(_ int) (*admin.Host, error)

HostDetails returns this node's host details on the given port.

func (*DBNode) Restart added in v1.4.0

func (d *DBNode) Restart() error

Restart restarts this container.

func (*DBNode) Start added in v1.4.2

func (d *DBNode) Start()

Start starts the DBNode instance

func (*DBNode) WaitForBootstrap added in v1.4.0

func (d *DBNode) WaitForBootstrap() error

WaitForBootstrap blocks until the node has bootstrapped.

func (*DBNode) WritePoint added in v1.4.0

func (d *DBNode) WritePoint(req *rpc.WriteRequest) error

WritePoint writes a datapoint to the node directly.

func (*DBNode) WriteTaggedBatchRaw added in v1.4.0

func (d *DBNode) WriteTaggedBatchRaw(req *rpc.WriteTaggedBatchRawRequest) error

WriteTaggedBatchRaw writes a batch of writes to the node directly.

func (*DBNode) WriteTaggedPoint added in v1.4.0

func (d *DBNode) WriteTaggedPoint(req *rpc.WriteTaggedRequest) error

WriteTaggedPoint writes a datapoint with tags to the node directly.

type DBNodeOptions

type DBNodeOptions struct {
	// GeneratePorts will automatically update the config to use open ports
	// if set to true. If false, configuration is used as-is re: ports.
	GeneratePorts bool
	// GenerateHostID will automatically update the host ID specified in
	// the config if set to true. If false, configuration is used as-is re: host ID.
	GenerateHostID bool
	// StartFn is a custom function that can be used to start the DBNode.
	StartFn DBNodeStartFn
	// Start indicates whether to start the dbnode instance.
	Start bool
	// Logger is the logger to use for the dbnode. If not provided,
	// a default one will be created.
	Logger *zap.Logger
}

DBNodeOptions are options for starting a DB node server.

func GenerateDBNodeConfigsForCluster added in v1.4.0

func GenerateDBNodeConfigsForCluster(
	configs ClusterConfigs,
	opts *resources.DBNodeClusterOptions,
) ([]dbcfg.Configuration, []DBNodeOptions, environment.Configuration, error)

GenerateDBNodeConfigsForCluster generates the unique configs and options for each DB node that will be instantiated. Additionally, provides default environment config that can be used to connect to embedded KV within the DB nodes.

type DBNodeStartFn added in v1.4.2

type DBNodeStartFn func(cfg *dbnodecfg.Configuration) (chan<- error, <-chan struct{})

DBNodeStartFn is a custom function that can be used to start a DB node. Function must return a channel for interrupting the server and a channel for receiving notifications that the server has shut down.

type PerInstanceConfigs added in v1.4.0

type PerInstanceConfigs struct {
	// DBNodes contains the per-instance configuration for db nodes.
	DBNodes []dbcfg.Configuration
	// Coordinator is the configuration for the coordinator.
	Coordinator coordinatorcfg.Configuration
	// Aggregators is the configuration for aggregators.
	// If Aggregators is nil, the cluster contains only m3coordinator and dbnodes.
	Aggregators []aggcfg.Configuration
}

PerInstanceConfigs contain the per-instance configuration for all components.

type PerInstanceOptions added in v1.4.0

type PerInstanceOptions struct {
	// DBNodes contains the per-instance options for db nodes in the cluster.
	DBNode []DBNodeOptions
}

PerInstanceOptions contain the per-instance options for setting up the cluster.

type ResourceOptions

type ResourceOptions struct {
	Coordinator resources.Coordinator
	DBNodes     resources.Nodes
	Aggregators resources.Aggregators
}

ResourceOptions are the options for creating new resources.M3Resources.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL