devosender

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2023 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package devosender implements the tools to send data to Devo in a different ways and scenarios

Interfaces to grant abstraction between implementations are defined and complex objects has associated their own builder

Index

Examples

Constants

View Source
const (
	// DevoCentralRelayUS is the public entrypoint of Devo central-relay on USA site
	DevoCentralRelayUS = "tcp://us.elb.relay.logtrust.net:443"
	// DevoCentralRelayEU is the public entrypoint of Devo central-relay on Europe site
	DevoCentralRelayEU = "tcp://eu.elb.relay.logtrust.net:443"
	// DefaultSyslogLevel is the code for facility and level used at raw syslog protocol. <14> = facility:user and level:info
	DefaultSyslogLevel = "<14>"

	// ClientBuilderRelayUS select DevoCentralRelayUS in builder
	ClientBuilderRelayUS ClienBuilderDevoCentralRelay = iota
	// ClientBuilderRelayEU select DevoCentralRelayEU in builder
	ClientBuilderRelayEU

	// ClientBuilderDefaultCompressorMinSize is the default min size of payload to apply compression
	// Following discussion in https://webmasters.stackexchange.com/questions/31750/what-is-recommended-minimum-object-size-for-gzip-performance-benefits
	// this value is set by CDN providers as Akamai, other services like goolge are more
	// aggrsive and set 150 bytes as fringe value.
	ClientBuilderDefaultCompressorMinSize = 860
)
View Source
const (
	// LCBDefaultBufferEventsSize is the default BufferSize value set in LazyClientBuilder
	// when it is created with NewLazyClientBuilder
	LCBDefaultBufferEventsSize uint32 = 256000
	// LCBDefaultFlushTimeout is the default FlushTimeout value set in LazyClientBuilder
	// when it is created with NewLazyClientBuilder
	LCBDefaultFlushTimeout = time.Second * 2
)
View Source
const (
	// DefaultClientReconnDaemonWaitBtwChecks is the default time that client reconnect daemon
	// must wait between run checks or works
	DefaultClientReconnDaemonWaitBtwChecks = 10 * time.Second
	// DefaultClientReconnDaemonInitDelay is the default delay time that client reconnect daemon
	// must wait before start to work
	DefaultClientReconnDaemonInitDelay = time.Second * 30
	// DefaultDaemonMicroWait is the default micro delay to check in the midle of daemon
	// sleep time if daemon was marked to be stopped, then interrup sleep operation
	DefaultDaemonMicroWait = time.Millisecond * 200
	// DefaultEnableStandByModeTimeout is the Default timeout to wait for all pending
	// async messages managed byclient when StandBy func is called . If timeout is
	// reached then error will be send
	DefaultEnableStandByModeTimeout = time.Second
	// DefaultDaemonStopTimeout is the default timeout to wait when stopping (Close) each daemon
	DefaultDaemonStopTimeout = time.Second * 2
	// DefaultFlushAsyncTimeout is the Default timeout to wait for all pending async
	// messages managed by client when Flush func is called. If timeout is reached
	// then error will be send
	DefaultFlushAsyncTimeout = time.Millisecond * 500
	// DefaultHouseKeepingDaemonWaitBtwChecks is the default time that status housekeeping daemon must wait
	// between run checks or do any action
	DefaultHouseKeepingDaemonWaitBtwChecks = time.Minute * 5
	// DefaultHouseKeepingDaemonInitDelay is the default delay time that status housekeeping daemon
	// must wait before start to work
	DefaultHouseKeepingDaemonInitDelay = time.Second * 5
	// DefaultResendEventsDaemonWaitBtwChecks is the default time that resend pending events daemon must wait
	// between run checks or do any action
	DefaultResendEventsDaemonWaitBtwChecks = time.Second * 30
	// DefaultResendEventsDaemonInitDelay is the default delay time that resend pending events daemon
	// must wait before start to work
	DefaultResendEventsDaemonInitDelay = time.Second * 20
	// DefaultMaxRecordsResendByFlush is the default value for the max numbers of pending events to resend
	// when Flush operation is called
	DefaultMaxRecordsResendByFlush = 1000
)

Variables

View Source
var (
	// ErrBufferOverflow is the error returned when buffer is full and element was lost
	ErrBufferOverflow = errors.New("overwriting item(s) because buffer is full")
	//ErrBufferFull is the error returned when buffer is full and any other action taken
	ErrBufferFull = errors.New("buffer is full")
)
View Source
var ErrNilPointerReceiver = errors.New("receiver func call with nil pointer")

ErrNilPointerReceiver is the error returned when received funcs are call over nil pointer

View Source
var ErrPayloadNoDefined = errors.New("payload to check connection is not defined")

ErrPayloadNoDefined is the error returned when payload is required by was not defined

View Source
var ErrWaitAsyncTimeout = errors.New("timeout while wait for pending items")

ErrWaitAsyncTimeout is the error returned while timeout is reached in "WaitFor" functions

View Source
var ErrorTagEmpty error = errors.New("tag can not be empty")

ErrorTagEmpty is returneed when Devo tag is empty string

Functions

This section is empty.

Types

type ClienBuilderDevoCentralRelay

type ClienBuilderDevoCentralRelay int

ClienBuilderDevoCentralRelay is the type used to set Devo central relay as entrypoint

func ParseDevoCentralEntrySite

func ParseDevoCentralEntrySite(s string) (ClienBuilderDevoCentralRelay, error)

ParseDevoCentralEntrySite returns ClientBuilderDevoCentralRelay based on site code. valid codes are 'US' and 'EU'

type Client

type Client struct {
	ReplaceSequences map[string]string
	// contains filtered or unexported fields
}

Client is the engine that can send data to Devo throug central (tls) or in-house (clean) realy

func (*Client) AddReplaceSequences

func (dsc *Client) AddReplaceSequences(old, new string) error

AddReplaceSequences is helper function to add elements to Client.ReplaceSequences old is the string to search in message and new is the replacement string. Replacement will be done using strings.ReplaceAll

func (*Client) AreAsyncOps

func (dsc *Client) AreAsyncOps() bool

AreAsyncOps returns true is there is any Async operation running

func (*Client) AsyncError added in v1.0.0

func (dsc *Client) AsyncError(id string) (bool, error)

AsyncError return true and last error detected fo ID if error was captured or false, nil in other case One special case is when dsc Pointer is nil, that this method returns (false, ErrNilPointerReceiver) AsyncError is thread-safe mode

func (*Client) AsyncErrors

func (dsc *Client) AsyncErrors() map[string]error

AsyncErrors return errors from async calls collected until now. WARNING that map returned IS NOT thread safe.

func (*Client) AsyncErrorsIds added in v1.0.0

func (dsc *Client) AsyncErrorsIds() []string

AsyncErrorsIds returns the request IDs with error registered. This method is different from AsyncErrors because is thread safe at cost of performance.

func (*Client) AsyncErrorsNumber

func (dsc *Client) AsyncErrorsNumber() int

AsyncErrorsNumber return then number of errors from async calls collected until now

func (*Client) AsyncIds

func (dsc *Client) AsyncIds() []string

AsyncIds return asyncIds that are currently runnig

func (*Client) AsyncsNumber

func (dsc *Client) AsyncsNumber() int

AsyncsNumber return the number of async operations pending. This is more optimal that call len(dsc.AsyncIds())

func (*Client) Close

func (dsc *Client) Close() error

Close is the method to close all interanl elements like connection that should be closed at end

func (*Client) GetEntryPoint

func (dsc *Client) GetEntryPoint() string

GetEntryPoint return entrypoint used by client

func (*Client) IsAsyncActive

func (dsc *Client) IsAsyncActive(id string) bool

IsAsyncActive returns true if id is present in AsyncIds(). This function is more optimal that look into result of AsyncIds

func (*Client) IsConnWorking added in v1.0.0

func (dsc *Client) IsConnWorking() (bool, error)

IsConnWorking check if connection is opened and make a test writing data to ensure that is working If payload to check is not defined (ClientBuilder.IsConnWorkingCheckPayload) then ErrPayloadNoDefined will be returned

func (*Client) LastSendCallTimestamp

func (dsc *Client) LastSendCallTimestamp() time.Time

LastSendCallTimestamp returns the timestamp of last time that any of SendXXXX func was called with valid parameters If Client is nil default time.Time value will be returned

func (*Client) PurgeAsyncErrors

func (dsc *Client) PurgeAsyncErrors()

PurgeAsyncErrors cleans internal AsyncErrors captured until now

func (*Client) Send

func (dsc *Client) Send(m string) error

Send func send message using default tag (SetDefaultTag). Meessage will be transformed before send, using ReplaceAll with values from Client.ReplaceSequences

func (*Client) SendAsync

func (dsc *Client) SendAsync(m string) string

SendAsync is similar to Send but send events in async way (goroutine). Empty string is returned in Client is nil

func (*Client) SendWTag

func (dsc *Client) SendWTag(t, m string) error

SendWTag is similar to Send but using a specific tag

func (*Client) SendWTagAndCompressor

func (dsc *Client) SendWTagAndCompressor(t, m string, c *compressor.Compressor) error

SendWTagAndCompressor is similar to SendWTag but using a specific Compressor. This can be usefull, for example, to force disable compression for one message using Client.SendWTagAndCompressor(t, m, nil)

func (*Client) SendWTagAndCompressorAsync

func (dsc *Client) SendWTagAndCompressorAsync(t, m string, c *compressor.Compressor) string

SendWTagAndCompressorAsync is similar to SendWTagAsync but send events with specific compressor. This can be useful, for example, to force disable compression for one message using Client.SendWTagAndCompressorAsync(t, m, nil) Empty string is returned in Client is nil

func (*Client) SendWTagAsync

func (dsc *Client) SendWTagAsync(t, m string) string

SendWTagAsync is similar to SendWTag but send events in async way (goroutine). Empty string is returned in Client is nil

func (*Client) SetDefaultTag

func (dsc *Client) SetDefaultTag(t string) error

SetDefaultTag set tag used when call funcs to send messages without splicit tag

func (*Client) SetSyslogHostName

func (dsc *Client) SetSyslogHostName(host string)

SetSyslogHostName overwrite hostname send in raw Syslog payload

func (*Client) String

func (dsc *Client) String() string

func (*Client) WaitForPendingAsyncMessages

func (dsc *Client) WaitForPendingAsyncMessages() error

WaitForPendingAsyncMessages wait for all Async messages that are pending to send

func (*Client) WaitForPendingAsyncMsgsOrTimeout

func (dsc *Client) WaitForPendingAsyncMsgsOrTimeout(timeout time.Duration) error

WaitForPendingAsyncMsgsOrTimeout is similar to WaitForPendingAsyncMessages but return ErrWaitAsyncTimeout error if timeout is reached

func (*Client) Write

func (dsc *Client) Write(p []byte) (n int, err error)

Write allow Client struct to follow io.Writer interface

type ClientBuilder

type ClientBuilder struct {
	// contains filtered or unexported fields
}

ClientBuilder defines builder for easy DevoSender instantiation

func NewClientBuilder

func NewClientBuilder() *ClientBuilder

NewClientBuilder returns new DevoSenderBuilder

func (*ClientBuilder) Build

func (dsb *ClientBuilder) Build() (*Client, error)

Build implements build method of builder returning Client instance.

func (*ClientBuilder) CompressorMinSize

func (dsb *ClientBuilder) CompressorMinSize(s int) *ClientBuilder

CompressorMinSize set the minium size to be applied when compress data.

func (*ClientBuilder) ConnectionExpiration

func (dsb *ClientBuilder) ConnectionExpiration(t time.Duration) *ClientBuilder

ConnectionExpiration set expiration time used to recreate connection from last time was used

func (*ClientBuilder) DefaultCompressor

func (dsb *ClientBuilder) DefaultCompressor(c compressor.CompressorAlgorithm) *ClientBuilder

DefaultCompressor set and enable ompression when send messages

func (*ClientBuilder) DefaultDevoTag

func (dsb *ClientBuilder) DefaultDevoTag(t string) *ClientBuilder

DefaultDevoTag set the default tag to be used when send data to Devo in target client

func (*ClientBuilder) DevoCentralEntryPoint

func (dsb *ClientBuilder) DevoCentralEntryPoint(relay ClienBuilderDevoCentralRelay) *ClientBuilder

DevoCentralEntryPoint Set One of the available Devo cental relays. This value overwrite (and is overwritten) by EntryPoint

func (*ClientBuilder) EntryPoint

func (dsb *ClientBuilder) EntryPoint(entrypoint string) *ClientBuilder

EntryPoint sets entrypoint in builder used to create Client This value overwrite (and is overwritten) by DevoCentralEntryPoint

func (*ClientBuilder) IsConnWorkingCheckPayload added in v1.0.0

func (dsb *ClientBuilder) IsConnWorkingCheckPayload(s string) *ClientBuilder

IsConnWorkingCheckPayload sets the payload of the raw message that will be sent (Write) to check conection during IsConnWorking call Empty string implies that IsConnWorking will return an error. The payload size must be less that 4 characters Recommended value for this payload when you like to enable this check is zero-character: "\x00"

func (*ClientBuilder) TCPKeepAlive

func (dsb *ClientBuilder) TCPKeepAlive(t time.Duration) *ClientBuilder

TCPKeepAlive allow to set KeepAlive value configured in net.Dialer

func (*ClientBuilder) TCPTimeout

func (dsb *ClientBuilder) TCPTimeout(t time.Duration) *ClientBuilder

TCPTimeout allow to set Timeout value configured in net.Dialer

func (*ClientBuilder) TLSCerts

func (dsb *ClientBuilder) TLSCerts(key []byte, cert []byte, chain []byte) *ClientBuilder

TLSCerts sets keys and certs used to make Client using TLS connection key and cert are the key and cert that identifies destination Devo domain and chain is the content of the private CA chain used to trust in Devo collector entry-point Call to this method overwrite TLSFiles

func (*ClientBuilder) TLSFiles

func (dsb *ClientBuilder) TLSFiles(keyFileName string, certFileName string, chainFileName *string) *ClientBuilder

TLSFiles sets keys and certs from files used to make Client using TLS connection keyFileName and certFileName are the key and cert that identifies destination Devo domain and chainFileName is the file with the private CA chain used to trust in Devo collector entry-point TLSCerts overwrites calls to this method

func (*ClientBuilder) TLSInsecureSkipVerify

func (dsb *ClientBuilder) TLSInsecureSkipVerify(insecureSkipVerify bool) *ClientBuilder

TLSInsecureSkipVerify sets InsecureSkipFlag, this value is used only with TLS connetions

func (*ClientBuilder) TLSNoLoadPublicCAs added in v1.0.3

func (dsb *ClientBuilder) TLSNoLoadPublicCAs(noLoadPublicCAs bool) *ClientBuilder

TLSNoLoadPublicCAs prevent to load public CA certificates from OS when key and cert are provided and append to chain CA provided in TLSFiles or TLSCerts

func (*ClientBuilder) TLSRenegotiation

func (dsb *ClientBuilder) TLSRenegotiation(renegotiation tls.RenegotiationSupport) *ClientBuilder

TLSRenegotiation sets tlsRenegotiation support, this value is used only with TLS connections

type DevoSender

type DevoSender interface {
	io.WriteCloser
	Send(m string) error
	SetDefaultTag(t string) error
	SendWTag(t, m string) error
	SendAsync(m string) string
	SendWTagAsync(t, m string) string
	WaitForPendingAsyncMessages() error
	AsyncErrors() map[string]error
	AsyncErrorsNumber() int
	PurgeAsyncErrors()
	GetEntryPoint() string
	AreAsyncOps() bool
	AsyncIds() []string
	IsAsyncActive(id string) bool
	AsyncsNumber() int
	LastSendCallTimestamp() time.Time
	String() string
}

DevoSender interface define the minimum behaviour required for Send data to Devo

func NewDevoSender

func NewDevoSender(entrypoint string) (DevoSender, error)

NewDevoSender Create new DevoSender with clean comunication using ClientBuilder entrypoint is the Devo entrypoint where send events with protocol://fqdn:port format. You can use DevoCentralRelayXX constants to easy assign these value

func NewDevoSenderTLS

func NewDevoSenderTLS(entrypoint string, key []byte, cert []byte, chain []byte) (DevoSender, error)

NewDevoSenderTLS create TLS connection using ClientBuiler with minimal configuration

func NewDevoSenderTLSFiles

func NewDevoSenderTLSFiles(entrypoint string, keyFileName string, certFileName string, chainFileName *string) (DevoSender, error)

NewDevoSenderTLSFiles is similar to NewDevoSenderTLS but loading different certificates from files

type LazyClient

type LazyClient struct {
	*Client

	Stats LazyClientStats
	// contains filtered or unexported fields
}

LazyClient is a SwitchDevoSender that save events in a buffer when it is in "stand by" mode. Events are saved in a circular buffer, and when limit of buffer size is reached, new arrived events are saved at the begining of the buffer. when WakeUp is called, all events in buffer as send using Client.Async funcs in starting for first element in slide. This implies that arrived event order can not be maintained if buffer suffered an "oversize" situation.

Example
lc, err := NewLazyClientBuilder().
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://localhost:13000")). // udp protocol never return error
	Build()
if err != nil {
	panic(err)
}

fmt.Println("LazyClient", lc.String()[:146])

// send messages in connected mode
err = lc.SendWTag("test.keep.free", "message 1") // No error because client is connected
if err != nil {
	panic(err)
}
for i := 2; i <= 10; i++ {
	id := lc.SendWTagAsync(
		"test.keep.free",
		fmt.Sprintf("message %d", i),
	)
	fmt.Printf("ID of msg %d has non-conn- prefix: %v\n", i, strings.HasPrefix(id, "non-conn-"))
}
fmt.Println("Stats", lc.Stats)

// WakeUp should not have any effect
err = lc.WakeUp()
if err != nil {
	panic(err)
}
fmt.Println("Stats (after WakeUp)", lc.Stats)
fmt.Println("LazyClient (after WakeUp)", lc.String()[0:146])
fmt.Println("SwitchDevoSender.LastSendCallTimestamp (after WakeUp) is empty", lc.LastSendCallTimestamp() == time.Time{})

var sender SwitchDevoSender = lc
sender.Close()
fmt.Println("LazyClient as SwitchDevoSender closed", sender.String())

id := sender.SendWTagAsync("test.keep.free", "message after close is the same as after StandBy")
fmt.Printf("ID has non-conn- prefix: %v\n", strings.HasPrefix(id, "non-conn-"))
fmt.Println("SwitchDevoSender (pending events after close)", sender.String())
fmt.Println("Stats (pending events after close)", lc.Stats)
fmt.Println("SwitchDevoSender.LastSendCallTimestamp (pending events after close) is empty", lc.LastSendCallTimestamp() == time.Time{})
sender.Close()
fmt.Println("SwitchDevoSender (after last close)", sender.String())
fmt.Println("Stats (after last close)", lc.Stats)
Output:

LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000'
ID of msg 2 has non-conn- prefix: false
ID of msg 3 has non-conn- prefix: false
ID of msg 4 has non-conn- prefix: false
ID of msg 5 has non-conn- prefix: false
ID of msg 6 has non-conn- prefix: false
ID of msg 7 has non-conn- prefix: false
ID of msg 8 has non-conn- prefix: false
ID of msg 9 has non-conn- prefix: false
ID of msg 10 has non-conn- prefix: false
Stats AsyncEvents: 9, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 0
Stats (after WakeUp) AsyncEvents: 9, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 0
LazyClient (after WakeUp) bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000'
SwitchDevoSender.LastSendCallTimestamp (after WakeUp) is empty false
LazyClient as SwitchDevoSender closed bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}
ID has non-conn- prefix: true
SwitchDevoSender (pending events after close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 1, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}
Stats (pending events after close) AsyncEvents: 10, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 1
SwitchDevoSender.LastSendCallTimestamp (pending events after close) is empty true
SwitchDevoSender (after last close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}
Stats (after last close) AsyncEvents: 10, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 1 BufferCount: 0

func (*LazyClient) Close

func (lc *LazyClient) Close() error

Close send all events forcing WakeUp if it is in stand-by mode and then close the client

func (*LazyClient) Flush

func (lc *LazyClient) Flush() error

Flush send all events in buffer only if client is not in stand by mode (IsStandBy == false)

func (*LazyClient) IsLimitReachedLastFlush added in v1.0.0

func (lc *LazyClient) IsLimitReachedLastFlush() bool

IsLimitReachedLastFlush treturs true if there are pending events after flush because max limit was reached, or false in the other case.

Example
lc, err := NewLazyClientBuilder().
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://localhost:13000"). // udp does not return error
			DefaultDevoTag("test.keep.free")).
	MaxRecordsResendByFlush(1).
	Build()
if err != nil {
	panic(err)
}

// Pass to standby mode to queue events
err = lc.StandBy()
if err != nil {
	panic(err)
}

// Send events
lc.SendWTagAsync("test.keep.free", "event 1")
lc.SendWTagAsync("test.keep.free", "event 2")

fmt.Println("IsLimitReachedLastFlush after send events", lc.IsLimitReachedLastFlush())

// Wake up and
err = lc.WakeUp()
if err != nil {
	panic(err)
}
fmt.Println("IsLimitReachedLastFlush after Wakeup (Flush implicit)", lc.IsLimitReachedLastFlush())

//Flush one remaining event => limit reached too
err = lc.Flush()
if err != nil {
	panic(err)
}
fmt.Println("IsLimitReachedLastFlush after flush 1", lc.IsLimitReachedLastFlush())

err = lc.Flush()
if err != nil {
	panic(err)
}
fmt.Println("IsLimitReachedLastFlush after flush 2", lc.IsLimitReachedLastFlush())
Output:

IsLimitReachedLastFlush after send events false
IsLimitReachedLastFlush after Wakeup (Flush implicit) true
IsLimitReachedLastFlush after flush 1 true
IsLimitReachedLastFlush after flush 2 false

func (*LazyClient) IsStandBy

func (lc *LazyClient) IsStandBy() bool

IsStandBy returns true if client is in STandBy mode or false in otherwise

func (*LazyClient) OnlyInMemory added in v1.0.0

func (lc *LazyClient) OnlyInMemory() bool

OnlyInMemory is the implementation of SwitchDevoSender.OnlyInMemory. Allways return true

Example
lc, err := NewLazyClientBuilder().
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://localhost:13000")). // udp does not return error
	Build()
if err != nil {
	panic(err)
}

fmt.Printf("OnlyInMemory: %v\n", lc.OnlyInMemory())
Output:

OnlyInMemory: true

func (*LazyClient) PendingEventsNoConn added in v1.0.0

func (lc *LazyClient) PendingEventsNoConn() int

PendingEventsNoConn return the number of events that are pending to send and was created when the connection does not was available

Example
lc, err := NewLazyClientBuilder().
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://localhost:13000"). // udp does not return error
			DefaultDevoTag("test.keep.free")).
	Build()
if err != nil {
	panic(err)
}

// Pass to standby mode to queue events
err = lc.StandBy()
if err != nil {
	panic(err)
}

// Send events
lc.SendWTagAsync("test.keep.free", "event 1")
lc.SendWTagAsync("test.keep.free", "event 2")

fmt.Println("PendingEventsNoConn after StandBy", lc.PendingEventsNoConn())

// Wake up and
err = lc.WakeUp()
if err != nil {
	panic(err)
}
fmt.Println("PendingEventsNoConn after Wakeup (Flush implicit)", lc.PendingEventsNoConn())
Output:

PendingEventsNoConn after StandBy 2
PendingEventsNoConn after Wakeup (Flush implicit) 0

func (*LazyClient) SendAsync

func (lc *LazyClient) SendAsync(m string) string

SendAsync is the same as Client.SendAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer

Example
lc, err := NewLazyClientBuilder().
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://localhost:13000")). // udp protocol never return error
	Build()
if err != nil {
	panic(err)
}

fmt.Println("LazyClient", lc.String()[0:143])

// send messages in connected mode
id := lc.SendAsync("message 1") // Empty default tag implies error
// Wait until id was processed
for lc.IsAsyncActive(id) {
	time.Sleep(time.Millisecond * 100)
}
fmt.Printf("AsyncErrors associated with first id: %v\n", lc.AsyncErrors()[id])

lc.SetDefaultTag("test.keep.free")
id2 := lc.SendAsync("message 2")
fmt.Printf(
	"Second msg id is equal to first id: %v, len(AsyncErrors): %d, AsyncErrors associated with first id: %v\n",
	id == id2,
	len(lc.AsyncErrors()),
	lc.AsyncErrors()[id],
)

fmt.Println("Stats", lc.Stats)
fmt.Println("LazyClient (after events)", lc.String()[0:143])

err = lc.Close()
if err != nil {
	panic(err)
}

fmt.Println("Stats (after close)", lc.Stats)
fmt.Println("LazyClient (after close)", lc)
Output:

LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:130
AsyncErrors associated with first id: tag can not be empty
Second msg id is equal to first id: false, len(AsyncErrors): 1, AsyncErrors associated with first id: tag can not be empty
Stats AsyncEvents: 2, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 0
LazyClient (after events) bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:130
Stats (after close) AsyncEvents: 2, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 0
LazyClient (after close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}

func (*LazyClient) SendWTagAndCompressorAsync

func (lc *LazyClient) SendWTagAndCompressorAsync(t, m string, c *compressor.Compressor) string

SendWTagAndCompressorAsync is the same as Client.SendWTagAndCompressorAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer

func (*LazyClient) SendWTagAsync

func (lc *LazyClient) SendWTagAsync(t, m string) string

SendWTagAsync is the same as Client.SendWTagAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer

func (*LazyClient) StandBy

func (lc *LazyClient) StandBy() error

StandBy closes connection to Devo and pass the Client to admit Async send events calls only These events will be saved in a buffer. Client will wait for Pending events before to pass to StandBy, if timeout is triggered during this operation, error will be returned and stand by mode will not be active

Example
lc, err := NewLazyClientBuilder().
	ClientBuilder(
		// udp protocol never return error
		NewClientBuilder().EntryPoint("udp://localhost:13000"),
	).
	BufferSize(2).                         // very small buffers of two events only
	EnableStandByModeTimeout(time.Second). // Set to 0 to wait for ever for async events when pass to stand by mode
	MaxRecordsResendByFlush(10).           // Max records can be set too
	Build()
if err != nil {
	panic(err)
}

fmt.Println("LazyClient", lc.String()[:141])

// Pass to stand by mode
err = lc.StandBy()
if err != nil {
	panic(err)
}
fmt.Println("IsStandBy", lc.IsStandBy())

// send messages in stand by mode
err = lc.SendWTag("test.keep.free", "message 1") // Should return error because client is in standby mode
fmt.Println("SendWTag error", err)

id := lc.SendWTagAsync("test.keep.free", "message 2")
fmt.Println("ID has non-conn- prefix", strings.HasPrefix(id, "non-conn-"))

fmt.Println("Stats", lc.Stats)

lc.SendWTagAsync("test.keep.free", "message 3")
lc.SendWTagAsync("test.keep.free", "message 4")
fmt.Println("Stats", lc.Stats)
fmt.Println("LazyClient", lc)

err = lc.WakeUp()
if err != nil {
	panic(err)
}

fmt.Println("Stats after WakeUp", lc.Stats)
fmt.Println("LazyClient after WakeUp", lc.String()[0:141])

var sender SwitchDevoSender = lc
sender.Close()
fmt.Println("LazyClient as SwitchDevoSender closed", sender.String())
Output:

LazyClient bufferSize: 2, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {entryPoint: 'udp://localhost:13000'
IsStandBy true
SendWTag error receiver func call with nil pointer
ID has non-conn- prefix true
Stats AsyncEvents: 1, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 1
Stats AsyncEvents: 3, TotalBuffered: 3, BufferedLost: 1, SendFromBuffer: 0 BufferCount: 2
LazyClient bufferSize: 2, standByMode: true, #eventsInBuffer: 2, flushTimeout: 2s, standByModeTimeout: 1s, Client: {<nil>}
Stats after WakeUp AsyncEvents: 3, TotalBuffered: 3, BufferedLost: 1, SendFromBuffer: 2 BufferCount: 0
LazyClient after WakeUp bufferSize: 2, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {entryPoint: 'udp://localhost:13000'
LazyClient as SwitchDevoSender closed bufferSize: 2, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {<nil>}

func (*LazyClient) String

func (lc *LazyClient) String() string

func (*LazyClient) WakeUp

func (lc *LazyClient) WakeUp() error

WakeUp leave stand by mode creating new connection to Devo. Any action will be done if Client is not in stand-by mode of if connection is not working. ClientBuilder.IsConnWorkingCheckPayload must be set with a valid value ("\x00") as example to allow check if connection is working.

type LazyClientBuilder

type LazyClientBuilder struct {
	// contains filtered or unexported fields
}

LazyClientBuilder is the builder to build LazyClient

Example
lcb := &LazyClientBuilder{}
lc, err := lcb.
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://localhost:13000")).
	BufferSize(256).
	FlushTimeout(time.Second).
	Build()
lc, err = lcb.Build()

// Only print first n characters to easy test with output
lcStr := lc.String()[:143]
fmt.Println("error", err, "- LazyClient", lcStr, "...")
Output:

error <nil> - LazyClient bufferSize: 256, standByMode: false, #eventsInBuffer: 0, flushTimeout: 1s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ...
Example (InitErrors)
lcb := &LazyClientBuilder{}
lc, err := lcb.Build()
fmt.Println("1: error", err, "- LazyClient", lc)

lcb.ClientBuilder(NewClientBuilder())
lc, err = lcb.Build()
fmt.Println("2: error", err, "- LazyClient", lc)

lcb.BufferSize(123)
lc, err = lcb.Build()
fmt.Println("3: error", err, "- LazyClient", lc)

lcb.FlushTimeout(time.Second)
lc, err = lcb.Build()
fmt.Println("4: error", err, "- LazyClient", lc)
Output:

1: error undefined inner client builder - LazyClient <nil>
2: error buffer size less than 1 - LazyClient <nil>
3: error flush timeout empty or negative - LazyClient <nil>
4: error while initialize client: while create new DevoSender (Clear): entrypoint can not be empty - LazyClient <nil>

func NewLazyClientBuilder

func NewLazyClientBuilder() *LazyClientBuilder

NewLazyClientBuilder is the factory method of LazyClientBuilder with defautl values set

Example
lc, err := NewLazyClientBuilder().Build()
fmt.Println("1: error", err, "- LazyClient", lc)

lc, err = NewLazyClientBuilder().
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://localhost:13000")).
	Build()
// Only print first n characters to easy test with output
lcStr := lc.String()[:146]
fmt.Println("2: error", err, "- LazyClient", lcStr, "...")
Output:

1: error undefined inner client builder - LazyClient <nil>
2: error <nil> - LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ...

func (*LazyClientBuilder) AppLogger

AppLogger sets the applogger.SimpleAppLogger used to write log messages

func (*LazyClientBuilder) BufferSize

func (lcb *LazyClientBuilder) BufferSize(s uint32) *LazyClientBuilder

BufferSize sets the buffer size if s is greater than 0

func (*LazyClientBuilder) Build

func (lcb *LazyClientBuilder) Build() (*LazyClient, error)

Build creates new LazyClient instance

func (*LazyClientBuilder) ClientBuilder

func (lcb *LazyClientBuilder) ClientBuilder(cb *ClientBuilder) *LazyClientBuilder

ClientBuilder sets the ClientBuilder used to re-create connections after StandyBy - WakeUp situation

func (*LazyClientBuilder) EnableStandByModeTimeout

func (lcb *LazyClientBuilder) EnableStandByModeTimeout(d time.Duration) *LazyClientBuilder

EnableStandByModeTimeout sets and enable if value is greater than 0, the timeout to wait for pending async events in client when StandBy() func is called

func (*LazyClientBuilder) FlushTimeout

func (lcb *LazyClientBuilder) FlushTimeout(d time.Duration) *LazyClientBuilder

FlushTimeout sets the timeout when wait for pending async envents in client when Flush() func is called. Timeout is set only if parameter is greater than 0

func (*LazyClientBuilder) MaxRecordsResendByFlush added in v1.0.0

func (lcb *LazyClientBuilder) MaxRecordsResendByFlush(max int) *LazyClientBuilder

MaxRecordsResendByFlush sets the max number of pending events to be resend when Flush events is called. Zero or negative values deactivate the functionallity

type LazyClientStats

type LazyClientStats struct {
	AsyncEvents    uint
	BufferedLost   uint
	TotalBuffered  uint
	SendFromBuffer uint
	BufferCount    int
}

LazyClientStats is the metrics storage for LazyClient

func (LazyClientStats) String

func (lcs LazyClientStats) String() string

type ReliableClient

type ReliableClient struct {
	*Client
	// contains filtered or unexported fields
}

ReliableClient defines a Client with Reliable capatilities for Async operations only

Example
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://example.com:80"),
	).Build()

fmt.Println("error", err)
fmt.Println("rc.GetEntryPoint", rc.GetEntryPoint())
fmt.Println("rc.IsStandBy", rc.IsStandBy())

rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now()))
fmt.Printf("rc.Stats %+v\n", rc.Stats())

err = rc.Flush()
fmt.Printf("error flush: %+v\n", err)
fmt.Printf("rc.Stats after flush %+v\n", rc.Stats())

err = rc.Close()
fmt.Printf("error close: %+v\n", err)
Output:

error <nil>
rc.GetEntryPoint udp://example.com:80
rc.IsStandBy false
rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1}
error flush: <nil>
rc.Stats after flush {BufferCount:0 Updated:0 Finished:1 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:-1}
error close: <nil>
Example (AppLoggerError)
buf := &bytes.Buffer{}
lg := &applogger.WriterAppLogger{Writer: buf, Level: applogger.ERROR}

// Ensure path is clean
os.RemoveAll("/tmp/test")
rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	ClientBuilder(
		NewClientBuilder(),
	).
	AppLogger(lg).
	Build()

fmt.Println("rc.IsStandBy()", rc.IsStandBy(), "err", err)

// close
err = rc.Close()
fmt.Println("rc.Close", err)

rc.SendAsync("test message")
rc.SendWTagAsync("tag", "test message")
rc.SendWTagAndCompressorAsync("tag", "test message", &compressor.Compressor{Algorithm: compressor.CompressorGzip})

// We hide the  ID to easy check the output
logString := buf.String()
re := regexp.MustCompile(`-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}`)
logString = re.ReplaceAllString(logString, "-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX")

fmt.Println("Log:")
fmt.Println(logString)
Output:

rc.IsStandBy() false err <nil>
rc.Close <nil>
Log:
ERROR Uncontrolled error when create status record in SendAsync, ID: non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX: while update nutsdb: db is closed
ERROR Uncontrolled error when create status record in SendWTagAsync, ID: non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX: while update nutsdb: db is closed
ERROR Uncontrolled error when create status record in SendWTagAndCompressorAsync, ID: non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX: while update nutsdb: db is closed
Example (As_SwitchDevoSender)
// Ensure path is clean
os.RemoveAll("/tmp/test-reliable-client")

var sender SwitchDevoSender
var err error
sender, err = NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test-reliable-client")).
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://localhost:13000"),
	).Build()

fmt.Println("error", err)
fmt.Println("SwitchDevoSender", sender.String()[0:61])
fmt.Println("rc.IsStandBy", sender.IsStandBy())

sender.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now()))
rc, ok := sender.(*ReliableClient)
if ok {
	fmt.Printf("rc.Stats %+v\n", rc.Stats())
} else {
	panic(fmt.Sprintf("%v can not be casted to ReliableClient", sender))
}

err = sender.Flush()
fmt.Printf("error flush: %+v\n", err)
fmt.Printf("rc.Stats after flush %+v\n", rc.Stats())

err = sender.Close()
fmt.Printf("error close: %+v\n", err)

// Ensure path is clean
os.RemoveAll("/tmp/test-reliable-client")
Output:

error <nil>
SwitchDevoSender Client: {entryPoint: 'udp://localhost:13000', syslogHostname:
rc.IsStandBy false
rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1}
error flush: <nil>
rc.Stats after flush {BufferCount:0 Updated:0 Finished:1 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:-1}
error close: <nil>
Example (Dropped)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().
			DbPath("/tmp/test").
			BufferSize(2)).
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://example.com:80"),
	).
	Build()
if err != nil {
	panic(err)
}

// Pass to inactive
err = rc.StandBy()
fmt.Println("StandBy error", err)

// Send data on stand by mode
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 1 at %s", time.Now()))
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 2 at %s", time.Now()))
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 3 at %s", time.Now()))
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 4 at %s", time.Now()))
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 5 at %s", time.Now()))
fmt.Printf("rc.Stats %+v\n", rc.Stats())

err = rc.WakeUp()
fmt.Println("WakeUp error", err)
fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats())

// There should be Count: 2 after flush, because we do not have enough time
// to daemon to check that records were propertly resend
err = rc.Flush()
fmt.Println("Flush error", err)
fmt.Printf("rc.Stats after Flush and wait %+v\n", rc.Stats())

rc.Close()
fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output:

StandBy error <nil>
rc.Stats {BufferCount:2 Updated:0 Finished:0 Dropped:3 Evicted:0 DbIdxSize:2 DbMaxFileID:0 DbDataEntries:-1}
WakeUp error <nil>
rc.Stats after Wakeup and wait {BufferCount:2 Updated:0 Finished:0 Dropped:3 Evicted:0 DbIdxSize:2 DbMaxFileID:0 DbDataEntries:-1}
Flush error <nil>
rc.Stats after Flush and wait {BufferCount:2 Updated:2 Finished:0 Dropped:3 Evicted:0 DbIdxSize:2 DbMaxFileID:0 DbDataEntries:-1}
rc.Stats after closed {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
Example (Evicted)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().
			DbPath("/tmp/test").
			EventsTTLSeconds(1)).
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://localhost:13000"),
	).
	RetryDaemonInitDelay(time.Minute). // Prevents retry daemon to waste CPU
	RetryDaemonWaitBtwChecks(time.Millisecond * 100).
	Build()
if err != nil {
	panic(err)
}

// Pass to inactive
err = rc.StandBy()
fmt.Println("StandBy error", err)

// Send data on stand by mode
rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now()))
fmt.Printf("rc.Stats %+v\n", rc.Stats())

// Waiting for ensure event was expired
time.Sleep(time.Millisecond * 1100)

err = rc.WakeUp()
fmt.Println("WakeUp error", err)
fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats())

err = rc.Flush()
fmt.Println("Flush error", err)
fmt.Printf("rc.Stats after Flush and wait %+v\n", rc.Stats())

rc.Close()
fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output:

StandBy error <nil>
rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1}
WakeUp error <nil>
rc.Stats after Wakeup and wait {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1}
Flush error <nil>
rc.Stats after Flush and wait {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:1 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:-1}
rc.Stats after closed {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
Example (ExtendedStats)
// Ensure path is clean
os.RemoveAll("/tmp/test")

// Extended stats are enabled using environment variable. We add here in code but
// you can set in your shell session as well

err := os.Setenv("DEVOGO_DEBUG_SENDER_STATS_COUNT_DATA", "yes")
if err != nil {
	panic(err)
}

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://example.com:80"),
	).Build()

fmt.Println("error", err)
fmt.Println("rc.GetEntryPoint", rc.GetEntryPoint())
fmt.Println("rc.IsStandBy", rc.IsStandBy())

rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now()))
fmt.Printf("rc.Stats %+v\n", rc.Stats())

err = rc.Flush()
fmt.Printf("error flush: %+v\n", err)
fmt.Printf("rc.Stats after flush %+v\n", rc.Stats())

err = rc.Close()
fmt.Printf("error close: %+v\n", err)
Output:

error <nil>
rc.GetEntryPoint udp://example.com:80
rc.IsStandBy false
rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:1}
error flush: <nil>
rc.Stats after flush {BufferCount:0 Updated:0 Finished:1 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
error close: <nil>
Example (NilInnerClient)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	ClientBuilder(
		NewClientBuilder(),
	).
	Build()

// Call all inherit funcs
err = rc.AddReplaceSequences("old", "new")
fmt.Println("AddReplaceSequences:", err)
rc.SetSyslogHostName("hostname")
fmt.Println("Nothing to do when call SetSyslogHostName")
err = rc.SetDefaultTag("old")
fmt.Println("SetDefaultTag:", err)
err = rc.Send("msg")
fmt.Println("Send:", err)
err = rc.SendWTag("tag", "msg")
fmt.Println("SendWTag:", err)
s := rc.SendAsync("msg")
fmt.Printf("SendAsync returns not empty id: '%v'\n", s != "")
s = rc.SendWTagAsync("tag", "msg")
fmt.Printf("SendWTagAsync returns not empty id: '%v'\n", s != "")
s = rc.SendWTagAndCompressorAsync("tag", "msg", nil)
fmt.Printf("SendWTagAndCompressorAsync returns not empty id: '%v'\n", s != "")
err = rc.WaitForPendingAsyncMessages()
fmt.Println("WaitForPendingAsyncMessages:", err)
err = rc.WaitForPendingAsyncMsgsOrTimeout(0)
fmt.Println("WaitForPendingAsyncMsgsOrTimeout:", err)
m := rc.AsyncErrors()
fmt.Println("AsyncErrors:", m)
i := rc.AsyncErrorsNumber()
fmt.Printf("AsyncErrorsNumber: '%d'\n", i)
s = rc.GetEntryPoint()
fmt.Printf("GetEntryPoint returns: '%s'\n", s)
ss := rc.AsyncIds()
fmt.Printf("AsyncIds returns nil: '%v'\n", ss == nil)
b := rc.IsAsyncActive("")
fmt.Printf("IsAsyncActive returns: %v\n", b)
i = rc.AsyncsNumber()
fmt.Printf("AsyncsNumber returns: '%d'\n", i)
t := rc.LastSendCallTimestamp()
fmt.Printf("LastSendCallTimestamp returns: '%v'\n", t)
i, err = rc.Write([]byte{})
fmt.Println("Write i:", i, "err:", err)

err = rc.Close()
fmt.Println("Close:", err)
fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output:

AddReplaceSequences: receiver func call with nil pointer
Nothing to do when call SetSyslogHostName
SetDefaultTag: receiver func call with nil pointer
Send: receiver func call with nil pointer
SendWTag: receiver func call with nil pointer
SendAsync returns not empty id: 'true'
SendWTagAsync returns not empty id: 'true'
SendWTagAndCompressorAsync returns not empty id: 'true'
WaitForPendingAsyncMessages: receiver func call with nil pointer
WaitForPendingAsyncMsgsOrTimeout: receiver func call with nil pointer
AsyncErrors: map[:receiver func call with nil pointer]
AsyncErrorsNumber: '0'
GetEntryPoint returns: ''
AsyncIds returns nil: 'true'
IsAsyncActive returns: false
AsyncsNumber returns: '0'
LastSendCallTimestamp returns: '0001-01-01 00:00:00 +0000 UTC'
Write i: 0 err: receiver func call with nil pointer
Close: <nil>
rc.Stats after closed {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
Example (StandbyAndWakeUp)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://example.com:80"),
	).
	RetryDaemonWaitBtwChecks(time.Millisecond * 100).
	RetryDaemonInitDelay(time.Millisecond * 50).
	Build()
if err != nil {
	panic(err)
}

// Pass to inactive
err = rc.StandBy()
fmt.Println("StandBy error", err)

// Send data on stand by mode
rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now()))
fmt.Printf("rc.Stats %+v\n", rc.Stats())

rc.Flush()
fmt.Printf("rc.Stats after flush %+v\n", rc.Stats())

err = rc.WakeUp()
fmt.Println("WakeUp error", err)
// Waiting for ensure damon is retrying
time.Sleep(time.Millisecond * 300)
fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats())

rc.Close()
fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output:

StandBy error <nil>
rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1}
rc.Stats after flush {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1}
WakeUp error <nil>
rc.Stats after Wakeup and wait {BufferCount:0 Updated:1 Finished:1 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:-1}
rc.Stats after closed {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
Example (WithoutConnection)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	ClientBuilder(NewClientBuilder()).
	Build()

fmt.Println("error", err)
fmt.Println("rc.Client", rc.Client)
fmt.Println("rc.IsStandBy", rc.IsStandBy())

rc.SendWTagAsync("tag", "async msg")
fmt.Printf("rc.Stats %+v\n", rc.Stats())

err = rc.Flush()
fmt.Printf("error flush: %+v\n", err)
fmt.Printf("rc.Stats after flush %+v\n", rc.Stats())

err = rc.Close()
fmt.Printf("error close: %+v\n", err)
fmt.Printf("rc.Stats after close %+v\n", rc.Stats())
Output:

error <nil>
rc.Client <nil>
rc.IsStandBy false
rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1}
error flush: <nil>
rc.Stats after flush {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1}
error close: <nil>
rc.Stats after close {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}

func (*ReliableClient) Close

func (dsrc *ReliableClient) Close() error

Close closes current client. This implies operations like shutdown daemons, call Flush func, etc.

func (*ReliableClient) Flush

func (dsrc *ReliableClient) Flush() error

Flush checks all pending messages (sent with Async funcs), waits for pending async messages and update status of all of them. This func can call on demand but it is called by internal retry send events daemon too

func (*ReliableClient) IsConnWorking added in v1.0.0

func (dsrc *ReliableClient) IsConnWorking() (bool, error)

IsConnWorking is the same as Client.IsConnWorking but check first if client is in stand by mode

func (*ReliableClient) IsLimitReachedLastFlush added in v1.0.0

func (dsrc *ReliableClient) IsLimitReachedLastFlush() bool

IsLimitReachedLastFlush treturs true if there are pending events after flush because max limit was reached, or false in the other case.

Example
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://localhost:13000"),
	).
	MaxRecordsResendByFlush(1).
	Build()
if err != nil {
	panic(err)
}

// Pass to standby mode to queue events
err = rc.StandBy()
if err != nil {
	panic(err)
}

// Send events
rc.SendWTagAsync("test.keep.free", "event 1")
rc.SendWTagAsync("test.keep.free", "event 2")

fmt.Println("IsLimitReachedLastFlush after send events", rc.IsLimitReachedLastFlush())

// Wake up and
err = rc.WakeUp()
if err != nil {
	panic(err)
}
fmt.Println("IsLimitReachedLastFlush after Wakeup (Flush is not implicit)", rc.IsLimitReachedLastFlush())

//Flush
err = rc.Flush()
if err != nil {
	panic(err)
}
fmt.Println("IsLimitReachedLastFlush after flush", rc.IsLimitReachedLastFlush())
Output:

IsLimitReachedLastFlush after send events false
IsLimitReachedLastFlush after Wakeup (Flush is not implicit) false
IsLimitReachedLastFlush after flush true

func (*ReliableClient) IsStandBy

func (dsrc *ReliableClient) IsStandBy() bool

IsStandBy retursn true when client is in StandBy() mode

func (*ReliableClient) OnlyInMemory added in v1.0.0

func (dsrc *ReliableClient) OnlyInMemory() bool

OnlyInMemory is the implementation of SwitchDevoSender.OnlyInMemory. Allways return true

Example
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://localhost:13000"),
	).
	Build()
if err != nil {
	panic(err)
}

fmt.Printf("OnlyInMemory: %v\n", rc.OnlyInMemory())

// Ensure path is clean
os.RemoveAll("/tmp/test")
Output:

OnlyInMemory: false

func (*ReliableClient) PendingEventsNoConn added in v1.0.0

func (dsrc *ReliableClient) PendingEventsNoConn() int

PendingEventsNoConn return the number of events that are pending to send and was created when the connection does not was available. -1 is returned when value could not be solved

Example
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://localhost:13000"),
	).
	RetryDaemonInitDelay(50 * time.Millisecond).
	RetryDaemonInitDelay(75 * time.Millisecond). // Prevents retry daemon send events
	Build()
if err != nil {
	panic(err)
}

// Pass to standby mode to queue events
err = rc.StandBy()
if err != nil {
	panic(err)
}

// Send events
rc.SendWTagAsync("test.keep.free", "event 1")
rc.SendWTagAsync("test.keep.free", "event 2")

fmt.Println("PendingEventsNoConn after StandBy", rc.PendingEventsNoConn())

// Wake up and
err = rc.WakeUp()
if err != nil {
	panic(err)
}
time.Sleep(time.Millisecond * 100) // wait time to get resend daemon works

fmt.Println("PendingEventsNoConn after Wakeup (Resend daemon flushs data)", rc.PendingEventsNoConn())

err = rc.Close()
if err != nil {
	panic(err)
}

fmt.Println("PendingEventsNoConn after Close (-1 == error)", rc.PendingEventsNoConn())

// Ensure path is clean
os.RemoveAll("/tmp/test")
Output:

PendingEventsNoConn after StandBy 2
PendingEventsNoConn after Wakeup (Resend daemon flushs data) 0
PendingEventsNoConn after Close (-1 == error) -1

func (*ReliableClient) SendAsync

func (dsrc *ReliableClient) SendAsync(m string) string

SendAsync sends Async message in same way like Client.SendAsync but saving the message in status until can ensure, at certain level of confiance, that it was sent

func (*ReliableClient) SendWTagAndCompressorAsync

func (dsrc *ReliableClient) SendWTagAndCompressorAsync(t string, m string, c *compressor.Compressor) string

SendWTagAndCompressorAsync sends Async message in same way like Client.SendWTagAndCompressorAsync but saving the message,tag and Compressor in status until can ensure, at certain level of confiance, that it was sent

func (*ReliableClient) SendWTagAsync

func (dsrc *ReliableClient) SendWTagAsync(t, m string) string

SendWTagAsync sends Async message in same way like Client.SendWTagAsync but saving the message and tag in status until can ensure, at certain level of confiance, that it was sent

func (*ReliableClient) StandBy

func (dsrc *ReliableClient) StandBy() error

StandBy put current client in stand by mode closing active connection and saving new incoming events from Async operations in status. Note that after call StandBy, Send Sync operations will return errors.

func (*ReliableClient) Stats

func (dsrc *ReliableClient) Stats() status.Stats

Stats returns the curren stats (session + persisted). Erros when load stas are ignored DbDataEntries and DbKeysSize will be filled only if DEVOGO_DEBUG_SENDER_STATS_COUNT_DATA environment variable is set with "yes" value

func (*ReliableClient) String

func (dsrc *ReliableClient) String() string

func (*ReliableClient) WakeUp

func (dsrc *ReliableClient) WakeUp() error

WakeUp is the inverse oeration to call StandBy

type ReliableClientBuilder

type ReliableClientBuilder struct {
	// contains filtered or unexported fields
}

ReliableClientBuilder defines the Builder for build ReliableClient

Example (InitErrors)
rc, err := NewReliableClientBuilder().Build()
fmt.Println("1 error", err)
fmt.Println("1 rc", rc)

// Ensure path is clean
os.RemoveAll("/tmp/test")
rc, err = NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath("/tmp/test")).
	Build()
fmt.Println("2 error", err)
fmt.Println("2 rc", rc)

// No path permissions
os.RemoveAll("/this-is-not-valid-path")
rc, err = NewReliableClientBuilder().
	StatusBuilder(
		status.NewNutsDBStatusBuilder().DbPath(
			"/this-is-not-valid-path")).
	ClientBuilder(NewClientBuilder()).
	Build()
fmt.Println("3 error", errors.Unwrap(err)) // Unwrapped to decrese the verbose of the output
fmt.Println("3 rc", rc)
Output:

1 error undefined status builder
1 rc <nil>
2 error undefined inner client builder
2 rc <nil>
3 error while open nutsdb: mkdir /this-is-not-valid-path: permission denied
3 rc <nil>

func NewReliableClientBuilder

func NewReliableClientBuilder() *ReliableClientBuilder

NewReliableClientBuilder return ReliableClientBuilder with intialized to default values

func (*ReliableClientBuilder) AppLogger

AppLogger sets the AppLogger used to send logger messages in case that errors can not be returned. Debug traces can be saved usint this logger too.

func (*ReliableClientBuilder) Build

func (dsrcb *ReliableClientBuilder) Build() (*ReliableClient, error)

Build builds the ReliableClient based in current parameters.

func (*ReliableClientBuilder) ClientBuilder

func (dsrcb *ReliableClientBuilder) ClientBuilder(cb *ClientBuilder) *ReliableClientBuilder

ClientBuilder sets the ClientBuilder needed to build the underhood client. This is required to initial setup and it is used by reconnect daemon too. If IsConnWorkingCheckPayload is not defined in cb ClientBuilder then \x00 VALUE WILL BE fixed to ensure that ClientReconn daemon can recreate connection after any outage

func (*ReliableClientBuilder) ClientReconnDaemonInitDelay

func (dsrcb *ReliableClientBuilder) ClientReconnDaemonInitDelay(d time.Duration) *ReliableClientBuilder

ClientReconnDaemonInitDelay sets the initial time delay when reconnect daemon is started value is set only if d value is greater than 0

func (*ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks

func (dsrcb *ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder

ClientReconnDaemonWaitBtwChecks sets the time wait interval between checks for reconnect daemon value is set only if d value is greater than 0

func (*ReliableClientBuilder) DaemonStopTimeout

func (dsrcb *ReliableClientBuilder) DaemonStopTimeout(d time.Duration) *ReliableClientBuilder

DaemonStopTimeout sets the timeout to wait for each daemon when ReliableClient is closed value is set only if d value is greater than 0

func (*ReliableClientBuilder) EnableStandByModeTimeout

func (dsrcb *ReliableClientBuilder) EnableStandByModeTimeout(d time.Duration) *ReliableClientBuilder

EnableStandByModeTimeout sets and enable if value is greter than 0, the timeout to wait for pending async events in client when StandBy() func is called

func (*ReliableClientBuilder) FlushTimeout

func (dsrcb *ReliableClientBuilder) FlushTimeout(d time.Duration) *ReliableClientBuilder

FlushTimeout sets the timeout when wait for pending async envents in clien when Flush() func is called

func (*ReliableClientBuilder) HouseKeepingDaemonInitDelay added in v1.0.0

func (dsrcb *ReliableClientBuilder) HouseKeepingDaemonInitDelay(d time.Duration) *ReliableClientBuilder

HouseKeepingDaemonInitDelay sets the initial time delay to wait before HouseKeepingDaemon launchs the first execution. Value is set only if d value is greater than 0

func (*ReliableClientBuilder) HouseKeepingDaemonWaitBtwChecks added in v1.0.0

func (dsrcb *ReliableClientBuilder) HouseKeepingDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder

HouseKeepingDaemonWaitBtwChecks sets the interval time waited by daemon between executions of stattus HouseKeeping operations. Value is set only if d value is greater than 0

func (*ReliableClientBuilder) MaxRecordsResendByFlush added in v1.0.0

func (dsrcb *ReliableClientBuilder) MaxRecordsResendByFlush(max int) *ReliableClientBuilder

MaxRecordsResendByFlush sets the max number of pending events to be resend when Flush events is called. Zero or negative values deactivate the functionallity

func (*ReliableClientBuilder) RetryDaemonInitDelay

func (dsrcb *ReliableClientBuilder) RetryDaemonInitDelay(d time.Duration) *ReliableClientBuilder

RetryDaemonInitDelay sets the initial time delay when retry send events daemon is started value is set only if d value is greater than 0

func (*ReliableClientBuilder) RetryDaemonWaitBtwChecks

func (dsrcb *ReliableClientBuilder) RetryDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder

RetryDaemonWaitBtwChecks sets the time wait interval between checks for retry send events daemon value is set only if d value is greater than 0

func (*ReliableClientBuilder) StatusBuilder added in v1.0.0

func (dsrcb *ReliableClientBuilder) StatusBuilder(sb status.Builder) *ReliableClientBuilder

StatusBuilder sets the required status builder engine to save load ans manage events in a reliable way

type SwitchDevoSender

type SwitchDevoSender interface {
	io.Closer
	String() string
	SendAsync(m string) string
	SendWTagAsync(t, m string) string
	SendWTagAndCompressorAsync(t string, m string, c *compressor.Compressor) string
	WaitForPendingAsyncMsgsOrTimeout(timeout time.Duration) error
	AsyncErrors() map[string]error
	AsyncErrorsNumber() int
	AsyncIds() []string
	AreAsyncOps() bool
	IsAsyncActive(id string) bool
	Flush() error
	IsLimitReachedLastFlush() bool
	PendingEventsNoConn() int
	StandBy() error
	WakeUp() error
	IsStandBy() bool
	LastSendCallTimestamp() time.Time
	OnlyInMemory() bool
}

SwitchDevoSender represents a Client that can be paused. That is that can close connection to Devo and save in a buffer the events that are recieved. When client is waked up pending events are send to Devo.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL