transmission

package
v0.0.0-...-1215978 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	SendTraces bool
	SendEvents bool
)
View Source
var (
	DefaultAvailability = &Availability{
		online: false,
		mut:    sync.RWMutex{},
	}
)

Functions

This section is empty.

Types

type Auth

type Auth struct {
	Endpoint string
	Key      string
	Secret   string

	Timeout       time.Duration
	Proxy         *proxy.Proxy
	RetrySettings *retry.Config
	// contains filtered or unexported fields
}

func CreateNewAuth

func CreateNewAuth(endpoint, key, secret string, timeout time.Duration, retrySettings *retry.Config, livelinessInterval time.Duration, proxy *proxy.Proxy) (*Auth, error)

func (*Auth) GetToken

func (oauth *Auth) GetToken() string

GetToken returns the stored authToken

func (*Auth) Renew

func (oauth *Auth) Renew() (string, error)

func (*Auth) Start

func (oauth *Auth) Start() error

func (*Auth) Stop

func (oauth *Auth) Stop()

func (*Auth) UnaryClientInterceptor

func (oauth *Auth) UnaryClientInterceptor(c context.Context,
	method string,
	req interface{},
	reply interface{},
	cc *grpc.ClientConn,
	invoker grpc.UnaryInvoker,
	opts ...grpc.CallOption,
) error

func (*Auth) Valid

func (oauth *Auth) Valid() bool

Valid checks if the auth token is populated and expiry time greater than 0

type AuthTokenResponse

type AuthTokenResponse struct {
	AccessToken string `json:"access_token"`
	TokenType   string `json:"token_type"`
	ExpiresIn   int64  `json:"expires_in"`
	Scope       string `json:"scope"`
}

type Availability

type Availability struct {
	// contains filtered or unexported fields
}

func (*Availability) Set

func (a *Availability) Set(status bool)

func (*Availability) Status

func (a *Availability) Status() bool

type ConnConfig

type ConnConfig struct {
	Proxy *proxy.Proxy

	TAddr string
	TOpts []grpc.DialOption

	LAddr string
	LOpts []grpc.DialOption
}

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(c ConnConfig) (*Connection, error)

func (*Connection) Close

func (c *Connection) Close()

func (*Connection) GetLogClient

func (c *Connection) GetLogClient() collogspb.LogsServiceClient

func (*Connection) GetLogConn

func (c *Connection) GetLogConn() *grpc.ClientConn

func (*Connection) GetTraceClient

func (c *Connection) GetTraceClient() proxypb.TraceProxyServiceClient

func (*Connection) GetTraceConn

func (c *Connection) GetTraceConn() *grpc.ClientConn

func (*Connection) RenewConnection

func (c *Connection) RenewConnection() error

type DiscardSender

type DiscardSender struct {
	WriterSender
}

DiscardSender implements the Sender interface and drops all events.

func (*DiscardSender) Add

func (d *DiscardSender) Add(ev *Event)

type Event

type Event struct {
	// APIKey, if set, overrides whatever is found in Config
	// APIKey string
	// Dataset, if set, overrides whatever is found in Config
	Dataset string
	// SampleRate, if set, overrides whatever is found in Config
	SampleRate uint
	// APIHost, if set, overrides whatever is found in Config
	APIHost string
	// Timestamp, if set, specifies the time for this event. If unset, defaults
	// to Now()
	Timestamp time.Time
	// Metadata is a field for you to add in data that will be handed back to you
	// on the Response object read off the Responses channel. It is not sent to
	// Opsramp with the event.
	Metadata interface{}

	// Data contains the content of the event (all the fields and their values)
	Data map[string]interface{}

	APIToken    string
	APITenantId string

	SpanEvents []SpanEvent
}

func (*Event) MarshalJSON

func (e *Event) MarshalJSON() ([]byte, error)

Marshaling an Event for batching up to the Opsramp servers. Omits fields that aren't specific to this particular event, and allows for behavior like omitempty'ing a zero'ed out time.Time.

func (*Event) MarshalMsgpack

func (e *Event) MarshalMsgpack() (byts []byte, err error)

type Metrics

type Metrics interface {
	// Register(name string, metricType string)
	Gauge(string, interface{})
	Increment(string)
	Count(string, interface{})
}

Metrics is an interface that can be fulfilled by something like statsd

type MockSender

type MockSender struct {
	Started      int
	Stopped      int
	Flushed      int
	EventsCalled int

	BlockOnResponses bool
	*sync.Mutex
	// contains filtered or unexported fields
}

MockSender implements the Sender interface by retaining a slice of added events, for use in unit tests.

func (*MockSender) Add

func (m *MockSender) Add(ev *Event)

func (*MockSender) Flush

func (m *MockSender) Flush() error

func (*MockSender) SendResponse

func (m *MockSender) SendResponse(r Response) bool

func (*MockSender) Start

func (m *MockSender) Start() error

func (*MockSender) Stop

func (m *MockSender) Stop() error

func (*MockSender) TxResponses

func (m *MockSender) TxResponses() chan Response

type Response

type Response struct {
	// Err contains any error returned by the httpClient on sending or an error
	// indicating queue overflow
	Err error

	// StatusCode contains the HTTP Status Code returned by the Opsramp API
	// server
	StatusCode int

	// Body is the body of the HTTP response from the Opsramp API server.
	Body []byte

	// Duration is a measurement of how long the HTTP request to send an event
	// took to process. The actual time it takes libhoney to send an event may
	// be longer due to any time the event spends waiting in the queue before
	// being sent.
	Duration time.Duration

	// Metadata is whatever content you put in the Metadata field of the event for
	// which this is the response. It is passed through unmodified.
	Metadata interface{}
}

Response is a record of an event sent. It includes information about sending the event - how long it took, whether it succeeded, and so on. It also has a metadata field that is just a pass through - populate an Event's Metadata field and what you put there will be on the Response that corresponds to that Event. This allows you to track specific events.

func (*Response) MarshalMsgpack

func (r *Response) MarshalMsgpack() ([]byte, error)

func (*Response) UnmarshalJSON

func (r *Response) UnmarshalJSON(b []byte) error

func (*Response) UnmarshalMsgpack

func (r *Response) UnmarshalMsgpack(b []byte) error

type Sender

type Sender interface {
	// Add queues up an event to be sent
	Add(ev *Event)

	// Start initializes any background processes necessary to send events
	Start() error

	// Stop flushes any pending queues and blocks until everything in flight has
	// been sent. Once called, you cannot call Add unless Start has subsequently
	// been called.
	Stop() error

	// Flush flushes any pending queues and blocks until everything in flight has
	// been sent.
	Flush() error

	// Responses returns a channel that will contain a single Response for each
	// Event added. Note that they may not be in the same order as they came in
	TxResponses() chan Response

	// SendResponse adds a Response to the Responses queue. It should be added
	// for events handed to libhoney that are dropped before they even make it
	// to the Transmission Sender (for example because of sampling) to maintain
	// libhoney's guarantee that each event given to it will generate one event
	// in the Responses channel.
	SendResponse(Response) bool
}

Sender is responsible for handling events after Send() is called. Implementations of Add() must be safe for concurrent calls.

type SpanEvent

type SpanEvent struct {
	Attributes map[string]interface{}
	Timestamp  uint64
	Name       string
}

type TraceProxy

type TraceProxy struct {
	// How many events to collect into a batch before sending. A
	// batch could be sent before achieving this item limit if the
	// BatchTimeout has elapsed since the last batch is sent. If set
	// to zero, batches will only be sent upon reaching the
	// BatchTimeout. It is an error for both this and
	// the BatchTimeout to be zero.
	// Default: 50 (from Config.MaxBatchSize)
	MaxBatchSize uint

	// How often to send batches. Events queue up into a batch until
	// this time has elapsed or the batch item limit is reached
	// (MaxBatchSize), then the batch is sent to Honeycomb API.
	// If set to zero, batches will only be sent upon reaching the
	// MaxBatchSize item limit. It is an error for both this and
	// the MaxBatchSize to be zero.
	// Default: 100 milliseconds (from Config.SendFrequency)
	BatchTimeout time.Duration

	// The start-to-finish timeout for HTTP requests sending event
	// batches to the Honeycomb API. Transmission will retry once
	// when receiving a timeout, so total time spent attempting to
	// send events could be twice this value.
	// Default: 60 seconds.
	BatchSendTimeout time.Duration

	// number of batches that can be inflight simultaneously
	MaxConcurrentBatches uint

	// how many events to allow to pile up
	// if not specified, then the work channel becomes blocking
	// and attempting to add an event to the queue can fail
	PendingWorkCapacity uint

	// whether to block or drop events when the queue fills
	BlockOnSend bool

	// whether to block or drop responses when the queue fills
	BlockOnResponse bool

	UserAgentAddition string

	// toggles compression when sending batches of events
	DisableCompression bool

	// Deprecated, synonymous with DisableCompression
	DisableGzipCompression bool //nolint:all

	// set true to send events with msgpack encoding
	EnableMsgpackEncoding bool

	Logger  logger.Logger
	Metrics Metrics

	UseTls         bool
	UseTlsInsecure bool

	IsPeer bool

	TraceEndpoint string
	LogsEndpoint  string
	SendEvents    bool
	Proxy         *proxy.Proxy

	TenantId          string
	Dataset           string
	AuthTokenEndpoint string
	AuthTokenKey      string
	AuthTokenSecret   string
	RetrySettings     *retry.Config
	// contains filtered or unexported fields
}

func (*TraceProxy) Add

func (h *TraceProxy) Add(ev *Event)

Add enqueues ev to be sent. If a Flush is in-progress, this will block until it completes. Similarly, if BlockOnSend is set and the pending work is more than the PendingWorkCapacity, this will block a Flush until more pending work can be enqueued.

func (*TraceProxy) Flush

func (h *TraceProxy) Flush() (err error)

func (*TraceProxy) SendResponse

func (h *TraceProxy) SendResponse(r Response) bool

func (*TraceProxy) Start

func (h *TraceProxy) Start() error

func (*TraceProxy) Stop

func (h *TraceProxy) Stop() error

func (*TraceProxy) TxResponses

func (h *TraceProxy) TxResponses() chan Response

type WriterSender

type WriterSender struct {
	W io.Writer

	BlockOnResponses  bool
	ResponseQueueSize uint

	*sync.Mutex
	// contains filtered or unexported fields
}

WriterSender implements the Sender interface by marshalling events to JSON and writing to STDOUT, or to the writer W if one is specified.

func (*WriterSender) Add

func (w *WriterSender) Add(ev *Event)

func (*WriterSender) Flush

func (w *WriterSender) Flush() error

func (*WriterSender) SendResponse

func (w *WriterSender) SendResponse(r Response) bool

func (*WriterSender) Start

func (w *WriterSender) Start() error

func (*WriterSender) Stop

func (w *WriterSender) Stop() error

func (*WriterSender) TxResponses

func (w *WriterSender) TxResponses() chan Response

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL