signalflow

package
v1.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2023 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package signalflow contains a SignalFx SignalFlow client, which can be used to execute analytics jobs against the SignalFx backend.

The client currently only supports the execute request. Not all SignalFlow messages are handled at this time, and some will be silently dropped.

The client will automatically attempt to reconnect to the backend if the connection is broken. There is a 5 second delay between retries.

Example
c, err := NewClient(
	StreamURLForRealm("us1"),
	AccessToken("MY_ORG_ACCESS_TOKEN"))
if err != nil {
	log.Printf("Error creating client: %v", err)
	return
}

comp, err := c.Execute(&ExecuteRequest{
	Program: "data('cpu.utilization').publish()",
})
if err != nil {
	log.Printf("Could not send execute request: %v", err)
	return
}

fmt.Printf("Resolution: %v\n", comp.Resolution())
fmt.Printf("Max Delay: %v\n", comp.MaxDelay())
fmt.Printf("Detected Lag: %v\n", comp.Lag())

for msg := range comp.Data() {
	// This will run as long as there is data, or until the websocket gets
	// disconnected.  If a websocket error occurs, the job will NOT be
	// automatically restarted.
	if len(msg.Payloads) == 0 {
		fmt.Printf("\rNo data available")
		continue
	}
	for _, pl := range msg.Payloads {
		meta := comp.TSIDMetadata(pl.TSID)
		fmt.Printf("%s %v: %v\n", meta.OriginatingMetric, meta.CustomProperties, pl.Value())
	}
	fmt.Println("")
}

err = comp.Err()
if err != nil {
	log.Printf("Error: %v", comp.Err())
} else {
	log.Printf("Job completed")
}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var ReconnectDelay = 5 * time.Second

How long to wait between connections in case of a bad connection.

Functions

This section is empty.

Types

type AuthRequest

type AuthRequest struct {
	Type AuthType `json:"type"`
	// The Auth token for the org
	Token     string `json:"token"`
	UserAgent string `json:"userAgent,omitempty"`
}

type AuthType

type AuthType string

func (AuthType) MarshalJSON

func (at AuthType) MarshalJSON() ([]byte, error)

type Channel

type Channel struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Channel is a queue of messages that all pertain to the same computation.

func (*Channel) AcceptMessage

func (c *Channel) AcceptMessage(msg messages.Message)

AcceptMessage from a websocket. This might block if nothing is reading from the channel but generally a computation should always be doing so.

func (*Channel) Close

func (c *Channel) Close()

Close the channel. This does not actually stop a job in SignalFlow, for that use Computation.Stop().

func (*Channel) Messages

func (c *Channel) Messages() <-chan messages.Message

Messages returns a Go chan that will be pushed all of the deserialized SignalFlow messages from the websocket.

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Client for SignalFlow via websockets (SSE is not currently supported).

func NewClient

func NewClient(options ...ClientParam) (*Client, error)

NewClient makes a new SignalFlow client that will immediately try and connect to the SignalFlow backend.

func (*Client) Close

func (c *Client) Close()

Close the client and shutdown any ongoing connections and goroutines. The client cannot be reused after Close.

func (*Client) Detach added in v1.29.0

func (c *Client) Detach(req *DetachRequest) error

func (*Client) Execute

func (c *Client) Execute(req *ExecuteRequest) (*Computation, error)

Execute a SignalFlow job and return a channel upon which informational messages and data will flow.

func (*Client) Stop

func (c *Client) Stop(req *StopRequest) error

Stop sends a job stop request message to the backend. It does not wait for jobs to actually be stopped.

type ClientParam

type ClientParam func(*Client) error

ClientParam is the common type of configuration functions for the SignalFlow client

func AccessToken

func AccessToken(token string) ClientParam

AccessToken can be used to provide a SignalFx organization access token or user access token to the SignalFlow client.

func MetadataTimeout

func MetadataTimeout(timeout time.Duration) ClientParam

MetadataTimeout is the default amount of time that calls to metadata accessors on a SignalFlow Computation instance will wait to receive the metadata from the backend before failing and returning a zero value. Usually metadata comes in very quickly from the stream after the job start.

func ReadTimeout

func ReadTimeout(timeout time.Duration) ClientParam

ReadTimeout sets the duration to wait between messages that come on the websocket. If the resolution of the job is very low, this should be increased.

func StreamURL

func StreamURL(streamEndpoint string) ClientParam

StreamURL lets you set the full URL to the stream endpoint, including the path.

func StreamURLForRealm

func StreamURLForRealm(realm string) ClientParam

StreamURLForRealm can be used to configure the websocket url for a specific SignalFx realm.

func UserAgent

func UserAgent(userAgent string) ClientParam

UserAgent allows setting the `userAgent` field when authenticating to SignalFlow. This can be useful for accounting how many jobs are started from each client.

func WriteTimeout

func WriteTimeout(timeout time.Duration) ClientParam

WriteTimeout sets the maximum duration to wait to send a single message when writing messages to the SignalFlow server over the WebSocket connection.

type Computation

type Computation struct {

	// The timeout to wait for metadata when a metadata access function is
	// called.  This will default to what is set on the client, but can be
	// overridden by changing this field directly.
	MetadataTimeout time.Duration
	// contains filtered or unexported fields
}

Computation is a single running SignalFlow job

func (*Computation) Channel

func (c *Computation) Channel() *Channel

Channel returns the underlying Channel instance used by this computation.

func (*Computation) Data

func (c *Computation) Data() <-chan *messages.DataMessage

Data returns the channel on which data messages come.

func (*Computation) Detach added in v1.29.0

func (c *Computation) Detach() error

Detach the computation on the backend by using the channel name.

func (*Computation) DetachWithReason added in v1.29.0

func (c *Computation) DetachWithReason(reason string) error

DetachWithReason detaches the computation with a given reason. This reason will be reflected in the control message that signals the end of the job/channel

func (*Computation) Done

func (c *Computation) Done() <-chan struct{}

Done passes through the computation context's Done channel for use in select statements to know when the computation is finished or an error occurred.

func (*Computation) Err

func (c *Computation) Err() error

Err returns the last fatal error that caused the computation to stop, if any. Will be nil if the computation stopped in an expected manner.

func (*Computation) Events added in v1.7.14

func (c *Computation) Events() []*messages.EventMessage

Events returns the results from events or alerts queries.

func (*Computation) Expirations added in v1.6.2

func (c *Computation) Expirations() <-chan *messages.ExpiredTSIDMessage

Expirations returns a channel that will be sent messages about expired TSIDs, i.e. time series that are no longer valid for this computation.

func (*Computation) GroupByMissingProperties added in v1.7.4

func (c *Computation) GroupByMissingProperties() []string

GroupByMissingProperties are timeseries that don't contain the required dimensions. This will wait for a short while for the limit size message to come on the websocket, but will return nil after a timeout if it does not come.

func (*Computation) Handle

func (c *Computation) Handle() string

Handle of the computation

func (*Computation) IsFinished

func (c *Computation) IsFinished() bool

IsFinished returns true if the computation is done and no more data should be expected from it.

func (*Computation) Lag

func (c *Computation) Lag() time.Duration

Lag detected for the job. This will wait for a short while for the lag message to come on the websocket, but will return 0 after a timeout if it does not come.

func (*Computation) LimitSize added in v1.6.28

func (c *Computation) LimitSize() int

LimitSize detected of the job. This will wait for a short while for the limit size message to come on the websocket, but will return 0 after a timeout if it does not come.

func (*Computation) MatchedNoTimeseriesQuery added in v1.7.4

func (c *Computation) MatchedNoTimeseriesQuery() string

MatchedNoTimeseriesQuery if it matched no active timeseries. This will wait for a short while for the limit size message to come on the websocket, but will return "" after a timeout if it does not come.

func (*Computation) MatchedSize added in v1.6.28

func (c *Computation) MatchedSize() int

MatchedSize detected of the job. This will wait for a short while for the matched size message to come on the websocket, but will return 0 after a timeout if it does not come.

func (*Computation) MaxDelay

func (c *Computation) MaxDelay() time.Duration

MaxDelay detected of the job. This will wait for a short while for the max delay message to come on the websocket, but will return 0 after a timeout if it does not come.

func (*Computation) Resolution

func (c *Computation) Resolution() time.Duration

Resolution of the job. This will wait for a short while for the resolution message to come on the websocket, but will return 0 after a timeout if it does not come.

func (*Computation) Stop

func (c *Computation) Stop() error

Stop the computation on the backend.

func (*Computation) StopWithReason

func (c *Computation) StopWithReason(reason string) error

StopWithReason stops the computation with a given reason. This reason will be reflected in the control message that signals the end of the job/channel.

func (*Computation) TSIDMetadata

func (c *Computation) TSIDMetadata(tsid idtool.ID) *messages.MetadataProperties

TSIDMetadata for a particular tsid. This will wait for a short while for the tsid metadata message to come on the websocket, but will return nil after a timeout if it does not come.

type ComputationError added in v1.6.27

type ComputationError struct {
	Code      int
	Message   string
	ErrorType string
}

ComputationError exposes the underlying metadata of a computation error

func (*ComputationError) Error added in v1.6.27

func (e *ComputationError) Error() string

type DetachRequest

type DetachRequest struct {
	Type    DetachType `json:"type"`
	Channel string     `json:"channel"`
	Reason  string     `json:"reason"`
}

type DetachType

type DetachType string

func (DetachType) MarshalJSON

func (DetachType) MarshalJSON() ([]byte, error)

type ExecuteRequest

type ExecuteRequest struct {
	Type         ExecuteType   `json:"type"`
	Program      string        `json:"program"`
	Channel      string        `json:"channel"`
	Start        time.Time     `json:"-"`
	Stop         time.Time     `json:"-"`
	Resolution   time.Duration `json:"-"`
	MaxDelay     time.Duration `json:"-"`
	StartMs      int64         `json:"start"`
	StopMs       int64         `json:"stop"`
	ResolutionMs int64         `json:"resolution"`
	MaxDelayMs   int64         `json:"maxDelay"`
	Immediate    bool          `json:"immediate"`
	Timezone     string        `json:"timezone"`
}

func (ExecuteRequest) MarshalJSON

func (er ExecuteRequest) MarshalJSON() ([]byte, error)

MarshalJSON does some assignments to allow using more native Go types for time/duration.

type ExecuteType

type ExecuteType string

func (ExecuteType) MarshalJSON

func (ExecuteType) MarshalJSON() ([]byte, error)

type FakeBackend added in v1.6.2

type FakeBackend struct {
	sync.Mutex

	AccessToken string
	// contains filtered or unexported fields
}

FakeBackend is useful for testing, both internal to this package and externally. It supports basic messages and allows for the specification of metadata and data messages that map to a particular program.

func NewRunningFakeBackend added in v1.6.2

func NewRunningFakeBackend() *FakeBackend

func (*FakeBackend) AddProgramError added in v1.6.2

func (f *FakeBackend) AddProgramError(program string, errorMsg string)

func (*FakeBackend) AddProgramTSIDs added in v1.6.2

func (f *FakeBackend) AddProgramTSIDs(program string, tsids []idtool.ID)

func (*FakeBackend) AddTSIDMetadata added in v1.6.2

func (f *FakeBackend) AddTSIDMetadata(tsid idtool.ID, props *messages.MetadataProperties)

func (*FakeBackend) Client added in v1.6.2

func (f *FakeBackend) Client() (*Client, error)

func (*FakeBackend) KillExistingConnections added in v1.6.2

func (f *FakeBackend) KillExistingConnections()

func (*FakeBackend) RemoveTSIDData added in v1.6.2

func (f *FakeBackend) RemoveTSIDData(tsid idtool.ID)

func (*FakeBackend) Restart added in v1.6.2

func (f *FakeBackend) Restart()

func (*FakeBackend) RunningJobsForProgram added in v1.6.2

func (f *FakeBackend) RunningJobsForProgram(program string) int

RunningJobsForProgram returns how many currently executing jobs there are for a particular program text.

func (*FakeBackend) ServeHTTP added in v1.6.2

func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*FakeBackend) SetTSIDFloatData added in v1.6.2

func (f *FakeBackend) SetTSIDFloatData(tsid idtool.ID, val float64)

func (*FakeBackend) Start added in v1.6.2

func (f *FakeBackend) Start()

func (*FakeBackend) Stop added in v1.6.2

func (f *FakeBackend) Stop()

func (*FakeBackend) URL added in v1.6.2

func (f *FakeBackend) URL() string

type StopRequest

type StopRequest struct {
	Type   StopType `json:"type"`
	Handle string   `json:"handle"`
	Reason string   `json:"reason"`
}

type StopType

type StopType string

func (StopType) MarshalJSON

func (StopType) MarshalJSON() ([]byte, error)

Directories

Path Synopsis
module
package main shows a basic usage pattern of the SiganlFlow client.
package main shows a basic usage pattern of the SiganlFlow client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL