internal

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const OpAMPPlainHTTPMethod = "POST"

Variables

View Source
var (
	ErrAgentDescriptionMissing      = errors.New("AgentDescription is nil")
	ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined")
	ErrHealthMissing                = errors.New("health is nil")
	ErrReportsEffectiveConfigNotSet = errors.New("ReportsEffectiveConfig capability is not set")
	ErrReportsRemoteConfigNotSet    = errors.New("ReportsRemoteConfig capability is not set")
	ErrPackagesStateProviderNotSet  = errors.New("PackagesStateProvider must be set")
	ErrAcceptsPackagesNotSet        = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
)

Functions

func NewPackagesSyncer

func NewPackagesSyncer(
	logger types.Logger,
	available *protobufs.PackagesAvailable,
	sender Sender,
	clientSyncedState *ClientSyncedState,
	packagesStateProvider types.PackagesStateProvider,
) *packagesSyncer

NewPackagesSyncer creates a new packages syncer.

func NewWSReceiver

func NewWSReceiver(
	logger types.Logger,
	callbacks types.Callbacks,
	conn *websocket.Conn,
	sender *WSSender,
	clientSyncedState *ClientSyncedState,
	packagesStateProvider types.PackagesStateProvider,
	capabilities protobufs.AgentCapabilities,
) *wsReceiver

NewWSReceiver creates a new Receiver that uses WebSocket to receive messages from the server.

Types

type ClientCommon

type ClientCommon struct {
	Logger    types.Logger
	Callbacks types.Callbacks

	// Agent's capabilities defined at Start() time.
	Capabilities protobufs.AgentCapabilities

	// Client state storage. This is needed if the Server asks to report the state.
	ClientSyncedState ClientSyncedState

	// PackagesStateProvider provides access to the local state of packages.
	PackagesStateProvider types.PackagesStateProvider
	// contains filtered or unexported fields
}

ClientCommon contains the OpAMP logic that is common between WebSocket and plain HTTP transports.

func NewClientCommon

func NewClientCommon(logger types.Logger, sender Sender) ClientCommon

NewClientCommon creates a new ClientCommon.

func (*ClientCommon) AgentDescription

func (c *ClientCommon) AgentDescription() *protobufs.AgentDescription

AgentDescription returns the current state of the AgentDescription.

func (*ClientCommon) IsStopping

func (c *ClientCommon) IsStopping() bool

IsStopping returns true if Stop() was called.

func (*ClientCommon) PrepareFirstMessage

func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error

PrepareFirstMessage prepares the initial state of NextMessage struct that client sends when it first establishes a connection with the Server.

func (*ClientCommon) PrepareStart

func (c *ClientCommon) PrepareStart(
	_ context.Context, settings types.StartSettings,
) error

PrepareStart prepares the client state for the next Start() call. It returns an error if the client is already started, or if the settings are invalid.

func (*ClientCommon) RequestConnectionSettings added in v0.11.0

func (c *ClientCommon) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error

func (*ClientCommon) SendCustomMessage added in v0.13.0

func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)

SendCustomMessage sends the specified custom message to the server.

func (*ClientCommon) SetAgentDescription

func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) error

SetAgentDescription sends a status update to the Server with the new AgentDescription and remembers the AgentDescription in the client state so that it can be sent to the Server when the Server asks for it.

func (*ClientCommon) SetCustomCapabilities added in v0.13.0

func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error

SetCustomCapabilities sends a message to the Server with the new custom capabilities.

func (*ClientCommon) SetHealth added in v0.3.0

func (c *ClientCommon) SetHealth(health *protobufs.ComponentHealth) error

SetHealth sends a status update to the Server with the new agent health and remembers the health in the client state so that it can be sent to the Server when the Server asks for it.

func (*ClientCommon) SetPackageStatuses

func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error

SetPackageStatuses sends a status update to the Server if the new PackageStatuses are different from the ones we already have in the state. It also remembers the new PackageStatuses in the client state so that it can be sent to the Server when the Server asks for it.

func (*ClientCommon) SetRemoteConfigStatus

func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error

SetRemoteConfigStatus sends a status update to the Server if the new RemoteConfigStatus is different from the status we already have in the state. It also remembers the new RemoteConfigStatus in the client state so that it can be sent to the Server when the Server asks for it.

func (*ClientCommon) StartConnectAndRun

func (c *ClientCommon) StartConnectAndRun(runner func(ctx context.Context))

StartConnectAndRun initiates the connection with the Server and starts the background goroutine that handles the communication unitl client is stopped.

func (*ClientCommon) Stop

func (c *ClientCommon) Stop(ctx context.Context) error

Stop stops the client. It returns an error if the client is not started.

func (*ClientCommon) UpdateEffectiveConfig

func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error

UpdateEffectiveConfig fetches the current local effective config using GetEffectiveConfig callback and sends it to the Server using provided Sender.

type ClientSyncedState

type ClientSyncedState struct {
	// contains filtered or unexported fields
}

ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to have access to synchronize to the Server. 4 messages can be stored in this store: AgentDescription, ComponentHealth, RemoteConfigStatus and PackageStatuses.

See OpAMP spec for more details on how state synchronization works: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#Agent-to-Server-state-synchronization

Note that the EffectiveConfig is subject to the same synchronization logic, however it is not stored in this struct since it can be large, and we do not want to always keep it in memory. To avoid storing it in memory the EffectiveConfig is supposed to be stored by the Agent implementation (e.g. it can be stored on disk) and is fetched via GetEffectiveConfig callback when it is needed by OpAMP client and then it is discarded from memory. See implementation of UpdateEffectiveConfig().

It is safe to call methods of this struct concurrently.

func (*ClientSyncedState) AgentDescription

func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription

func (*ClientSyncedState) CustomCapabilities added in v0.13.0

func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities

func (*ClientSyncedState) HasCustomCapability added in v0.13.0

func (s *ClientSyncedState) HasCustomCapability(capability string) bool

HasCustomCapability returns true if the provided capability is in the CustomCapabilities.

func (*ClientSyncedState) Health added in v0.3.0

func (*ClientSyncedState) PackageStatuses

func (s *ClientSyncedState) PackageStatuses() *protobufs.PackageStatuses

func (*ClientSyncedState) RemoteConfigStatus

func (s *ClientSyncedState) RemoteConfigStatus() *protobufs.RemoteConfigStatus

func (*ClientSyncedState) SetAgentDescription

func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error

SetAgentDescription sets the AgentDescription in the state.

func (*ClientSyncedState) SetCustomCapabilities added in v0.13.0

func (s *ClientSyncedState) SetCustomCapabilities(capabilities *protobufs.CustomCapabilities) error

SetCustomCapabilities sets the CustomCapabilities in the state.

func (*ClientSyncedState) SetHealth added in v0.3.0

func (s *ClientSyncedState) SetHealth(health *protobufs.ComponentHealth) error

SetHealth sets the agent health in the state.

func (*ClientSyncedState) SetPackageStatuses

func (s *ClientSyncedState) SetPackageStatuses(status *protobufs.PackageStatuses) error

SetPackageStatuses sets the PackageStatuses in the state.

func (*ClientSyncedState) SetRemoteConfigStatus

func (s *ClientSyncedState) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error

SetRemoteConfigStatus sets the RemoteConfigStatus in the state.

type HTTPSender

type HTTPSender struct {
	SenderCommon
	// contains filtered or unexported fields
}

HTTPSender allows scheduling messages to send. Once run, it will loop through a request/response cycle for each message to send and will process all received responses using a receivedProcessor. If there are no pending messages to send the HTTPSender will wait for the configured polling interval.

func NewHTTPSender

func NewHTTPSender(logger types.Logger) *HTTPSender

NewHTTPSender creates a new Sender that uses HTTP to send messages with default settings.

func (*HTTPSender) AddTLSConfig added in v0.6.0

func (h *HTTPSender) AddTLSConfig(config *tls.Config)

func (*HTTPSender) EnableCompression added in v0.4.0

func (h *HTTPSender) EnableCompression()

func (*HTTPSender) Run

func (h *HTTPSender) Run(
	ctx context.Context,
	url string,
	callbacks types.Callbacks,
	clientSyncedState *ClientSyncedState,
	packagesStateProvider types.PackagesStateProvider,
	capabilities protobufs.AgentCapabilities,
)

Run starts the processing loop that will perform the HTTP request/response. When there are no more messages to send Run will suspend until either there is a new message to send or the polling interval elapses. Should not be called concurrently with itself. Can be called concurrently with modifying NextMessage(). Run continues until ctx is cancelled.

func (*HTTPSender) SetPollingInterval

func (h *HTTPSender) SetPollingInterval(duration time.Duration)

SetPollingInterval sets the interval between polling. Has effect starting from the next polling cycle.

func (*HTTPSender) SetRequestHeader

func (h *HTTPSender) SetRequestHeader(header http.Header)

SetRequestHeader sets additional HTTP headers to send with all future requests. Should not be called concurrently with any other method.

type InMemPackagesStore

type InMemPackagesStore struct {
	// contains filtered or unexported fields
}

InMemPackagesStore is a package store used for testing. Keeps the packages in memory.

func NewInMemPackagesStore

func NewInMemPackagesStore() *InMemPackagesStore

func (*InMemPackagesStore) AllPackagesHash

func (l *InMemPackagesStore) AllPackagesHash() ([]byte, error)

func (*InMemPackagesStore) CreatePackage

func (l *InMemPackagesStore) CreatePackage(packageName string, typ protobufs.PackageType) error

func (*InMemPackagesStore) DeletePackage

func (l *InMemPackagesStore) DeletePackage(packageName string) error

func (*InMemPackagesStore) FileContentHash

func (l *InMemPackagesStore) FileContentHash(packageName string) ([]byte, error)

func (*InMemPackagesStore) GetContent

func (l *InMemPackagesStore) GetContent() map[string][]byte

func (*InMemPackagesStore) LastReportedStatuses

func (l *InMemPackagesStore) LastReportedStatuses() (*protobufs.PackageStatuses, error)

func (*InMemPackagesStore) PackageState

func (l *InMemPackagesStore) PackageState(packageName string) (state types.PackageState, err error)

func (*InMemPackagesStore) Packages

func (l *InMemPackagesStore) Packages() ([]string, error)

func (*InMemPackagesStore) SetAllPackagesHash

func (l *InMemPackagesStore) SetAllPackagesHash(hash []byte) error

func (*InMemPackagesStore) SetLastReportedStatuses

func (l *InMemPackagesStore) SetLastReportedStatuses(statuses *protobufs.PackageStatuses) error

func (*InMemPackagesStore) SetPackageState

func (l *InMemPackagesStore) SetPackageState(packageName string, state types.PackageState) error

func (*InMemPackagesStore) UpdateContent

func (l *InMemPackagesStore) UpdateContent(_ context.Context, packageName string, data io.Reader, contentHash []byte) error

type MockServer

type MockServer struct {
	Endpoint    string
	OnRequest   func(w http.ResponseWriter, r *http.Request)
	OnConnect   func(r *http.Request)
	OnWSConnect func(conn *websocket.Conn)
	OnMessage   func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent
	// contains filtered or unexported fields
}

func StartMockServer

func StartMockServer(t *testing.T) *MockServer

func StartTLSMockServer added in v0.6.0

func StartTLSMockServer(t *testing.T) *MockServer

func (*MockServer) Close

func (m *MockServer) Close()

func (*MockServer) EnableCompression added in v0.4.0

func (m *MockServer) EnableCompression()

func (*MockServer) EnableExpectMode

func (m *MockServer) EnableExpectMode()

EnableExpectMode enables the expect mode that allows using Expect() method to describe what message is expected to be received.

func (*MockServer) EventuallyExpect

func (m *MockServer) EventuallyExpect(
	msg string,
	handler func(msg *protobufs.AgentToServer) (*protobufs.ServerToAgent, bool),
)

EventuallyExpect expects to receive a message and calls the handler for every received message until eventually the handler returns true for the second element of the return tuple. Typically used when we know we expect to receive a particular message but 0 or more other messages may be received before that.

func (*MockServer) Expect

func (m *MockServer) Expect(handler receivedMessageHandler)

Expect defines a handler that will be called when a message is received. Expect must be called when we are certain that the message will be received (if it is not received a "time out" error will be recorded.

func (*MockServer) GetHTTPTestServer added in v0.6.0

func (m *MockServer) GetHTTPTestServer() *httptest.Server

type NextMessage

type NextMessage struct {
	// contains filtered or unexported fields
}

NextMessage encapsulates the next message to be sent and provides a concurrency-safe interface to work with the message.

func NewNextMessage

func NewNextMessage() NextMessage

NewNextMessage returns a new empty NextMessage.

func (*NextMessage) PopPending

func (s *NextMessage) PopPending() *protobufs.AgentToServer

PopPending returns the next message to be sent, if it is pending or nil otherwise. Clears the "pending" flag.

func (*NextMessage) Update

func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) (messageSendingChannel chan struct{})

Update applies the specified modifier function to the next message that will be sent and marks the message as pending to be sent.

The messageSendingChannel returned by this function is closed when the modified message is popped in PopPending before being sent to the server. After this channel is closed, additional calls to Update will be applied to the next message and will return a channel corresponding to that message.

type Sender

type Sender interface {
	// NextMessage gives access to the next message that will be sent by this Sender.
	// Can be called concurrently with any other method.
	NextMessage() *NextMessage

	// ScheduleSend signals to Sender that the message in NextMessage struct
	// is now ready to be sent.  The Sender should send the NextMessage as soon as possible.
	// If there is no pending message (e.g. the NextMessage was already sent and
	// "pending" flag is reset) then no message will be sent.
	ScheduleSend()

	// SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent.
	SetInstanceUid(instanceUid string) error
}

Sender is an interface of the sending portion of OpAMP protocol that stores the NextMessage to be sent and can be ordered to send the message.

type SenderCommon

type SenderCommon struct {
	// contains filtered or unexported fields
}

SenderCommon is partial Sender implementation that is common between WebSocket and plain HTTP transports. This struct is intended to be embedded in the WebSocket and HTTP Sender implementations.

func NewSenderCommon

func NewSenderCommon() SenderCommon

NewSenderCommon creates a new SenderCommon. This is intended to be used by the WebSocket and HTTP Sender implementations.

func (*SenderCommon) NextMessage

func (h *SenderCommon) NextMessage() *NextMessage

NextMessage gives access to the next message that will be sent by this looper. Can be called concurrently with any other method.

func (*SenderCommon) ScheduleSend

func (h *SenderCommon) ScheduleSend()

ScheduleSend signals to HTTPSender that the message in NextMessage struct is now ready to be sent. If there is no pending message (e.g. the NextMessage was already sent and "pending" flag is reset) then no message will be sent.

func (*SenderCommon) SetInstanceUid

func (h *SenderCommon) SetInstanceUid(instanceUid string) error

SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. Can be called concurrently, normally is called when a message is received from the Server that instructs us to change our instance UID.

type WSSender

type WSSender struct {
	SenderCommon
	// contains filtered or unexported fields
}

WSSender implements the WebSocket client's sending portion of OpAMP protocol.

func NewSender

func NewSender(logger types.Logger) *WSSender

NewSender creates a new Sender that uses WebSocket to send messages to the server.

func (*WSSender) IsStopped added in v0.14.0

func (s *WSSender) IsStopped() <-chan struct{}

IsStopped returns a channel that's closed when the sender is stopped.

func (*WSSender) Start

func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error

Start the sender and send the first message that was set via NextMessage().Update() earlier. To stop the WSSender cancel the ctx.

func (*WSSender) StoppingErr added in v0.14.0

func (s *WSSender) StoppingErr() error

StoppingErr returns an error if there was a problem with stopping the sender. If stopping was successful will return nil. StoppingErr() can be called only after IsStopped() is signalled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL