data

package
v1.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2015 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEFAULT_DATA_ENCODING      = "gzip"
	DEFAULT_DATA_SEND_INTERVAL = 63
	DEFAULT_DATA_MAX_AGE       = 3600             // 1h
	DEFAULT_DATA_MAX_SIZE      = 1024 * 1024 * 10 // 10 MiB
	DEFAULT_DATA_MAX_FILES     = 100
)
View Source
const (
	MAX_SEND_ERRORS    = 3
	CONNECT_ERROR_WAIT = 3
)

Variables

View Source
var (
	BaseReportFormat  string = "%d files, %s, %s, %s net util, %s net speed"
	ErrorReportFormat        = "%d errors, %d API errors, %d timeouts, %d bad files"
)
View Source
var DebugStats = false
View Source
var ErrSpoolTimeout = errors.New("Timeout spooling data")

Functions

func FormatSentReport added in v1.0.10

func FormatSentReport(r SentReport) string

Types

type Config

type Config struct {
	Encoding     string
	SendInterval uint
	Blackhole    bool // don't send if true
	Limits       proto.DataSpoolLimits
}

type DiskvSpooler

type DiskvSpooler struct {
	// contains filtered or unexported fields
}

http://godoc.org/github.com/peterbourgon/diskv

func NewDiskvSpooler

func NewDiskvSpooler(logger *pct.Logger, dataDir, trashDir, hostname string, limits proto.DataSpoolLimits) *DiskvSpooler

func (*DiskvSpooler) CancelFiles added in v1.0.13

func (s *DiskvSpooler) CancelFiles()

func (*DiskvSpooler) Files

func (s *DiskvSpooler) Files() <-chan string

func (*DiskvSpooler) Purge added in v1.0.13

func (s *DiskvSpooler) Purge(now time.Time, limits proto.DataSpoolLimits) (int, map[string][]string)

func (*DiskvSpooler) PurgeChan added in v1.0.13

func (s *DiskvSpooler) PurgeChan(c chan time.Time)

func (*DiskvSpooler) Read

func (s *DiskvSpooler) Read(file string) ([]byte, error)

func (*DiskvSpooler) Reject added in v1.0.7

func (s *DiskvSpooler) Reject(file string) error

func (*DiskvSpooler) Remove

func (s *DiskvSpooler) Remove(file string) error

func (*DiskvSpooler) Start

func (s *DiskvSpooler) Start(sz Serializer) error

func (*DiskvSpooler) Status

func (s *DiskvSpooler) Status() map[string]string

func (*DiskvSpooler) Stop

func (s *DiskvSpooler) Stop() error

func (*DiskvSpooler) Write

func (s *DiskvSpooler) Write(service string, data interface{}) error

type JsonGzipSerializer

type JsonGzipSerializer struct {
	// contains filtered or unexported fields
}

func NewJsonGzipSerializer

func NewJsonGzipSerializer() *JsonGzipSerializer

func (*JsonGzipSerializer) Concurrent

func (s *JsonGzipSerializer) Concurrent() bool

func (*JsonGzipSerializer) Encoding

func (s *JsonGzipSerializer) Encoding() string

func (*JsonGzipSerializer) ToBytes

func (s *JsonGzipSerializer) ToBytes(data interface{}) ([]byte, error)

type JsonSerializer

type JsonSerializer struct {
}

func NewJsonSerializer

func NewJsonSerializer() *JsonSerializer

func (*JsonSerializer) Concurrent

func (s *JsonSerializer) Concurrent() bool

func (*JsonSerializer) Encoding

func (s *JsonSerializer) Encoding() string

func (*JsonSerializer) ToBytes

func (j *JsonSerializer) ToBytes(data interface{}) ([]byte, error)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(logger *pct.Logger, dataDir, trashDir, hostname string, client pct.WebsocketClient) *Manager

func (*Manager) GetConfig

func (m *Manager) GetConfig() ([]proto.AgentConfig, []error)

func (*Manager) Handle

func (m *Manager) Handle(cmd *proto.Cmd) *proto.Reply

func (*Manager) Sender

func (m *Manager) Sender() *Sender

func (*Manager) Spooler

func (m *Manager) Spooler() Spooler

func (*Manager) Start

func (m *Manager) Start() error

func (*Manager) Status

func (m *Manager) Status() map[string]string

func (*Manager) Stop

func (m *Manager) Stop() error

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

func NewSender

func NewSender(logger *pct.Logger, client pct.WebsocketClient) *Sender

func (*Sender) Start

func (s *Sender) Start(spool Spooler, tickerChan <-chan time.Time, timeout uint, blackhole bool) error

func (*Sender) Status

func (s *Sender) Status() map[string]string

func (*Sender) Stop

func (s *Sender) Stop() error

type SenderStats added in v1.0.10

type SenderStats struct {
	// contains filtered or unexported fields
}

func NewSenderStats added in v1.0.10

func NewSenderStats(d time.Duration) *SenderStats

func (*SenderStats) Dump added in v1.0.10

func (s *SenderStats) Dump() []SentInfo

func (*SenderStats) Report added in v1.0.10

func (s *SenderStats) Report() SentReport

func (*SenderStats) Sent added in v1.0.10

func (s *SenderStats) Sent(info SentInfo)

type SentInfo added in v1.0.10

type SentInfo struct {
	Begin    time.Time
	End      time.Time
	SendTime float64
	Files    uint
	Bytes    uint64
	Errs     uint
	ApiErrs  uint
	Timeouts uint
	BadFiles uint
}

type SentReport added in v1.0.10

type SentReport struct {

	// --
	Begin       time.Time
	End         time.Time
	Bytes       string // humanized bytes, e.g. 443.59 kB
	Duration    string // End - Begin, humanized
	Utilization string // bytes / (End - Begin), Mbps
	Throughput  string // bytes / sendTime, Mbps
	Files       uint
	Errs        uint
	ApiErrs     uint
	Timeouts    uint
	BadFiles    uint
	// contains filtered or unexported fields
}

type Serializer

type Serializer interface {
	ToBytes(data interface{}) ([]byte, error)
	Encoding() string
	Concurrent() bool
}

type Spooler

type Spooler interface {
	Start(Serializer) error
	Stop() error
	Status() map[string]string
	Write(service string, data interface{}) error
	Files() <-chan string
	CancelFiles()
	Read(file string) ([]byte, error)
	Remove(file string) error
	Reject(file string) error
	Purge(time.Time, proto.DataSpoolLimits) (int, map[string][]string)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL