outputs

package
v0.0.0-...-16240a8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2022 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

func NewTLSConfig

func NewTLSConfig(config *Configuration, clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error)

func SignalExitCond

func SignalExitCond(outputDone *sync.Cond)

Types

type BaseOutput

type BaseOutput struct {
	OutputHandler
}

func NewNGS3OutputFromConfig

func NewNGS3OutputFromConfig(cfg *Configuration) *BaseOutput

func (*BaseOutput) Go

func (baseOutputHandler *BaseOutput) Go(messages <-chan string, signalChan <-chan os.Signal, exitCond *sync.Cond) error

type BufferOutput

type BufferOutput struct {
	// contains filtered or unexported fields
}

type BundleBehavior

type BundleBehavior interface {
	Upload(fileName string, fp *os.File) UploadStatus
	Initialize(connString string) error
	Statistics() interface{}
	Key() string
	String() string
}

Each bundled output plugin must implement the BundleBehavior interface, specifying how to upload files, initialize itself, and report back statistics.

type BundleStatistics

type BundleStatistics struct {
	FilesUploaded        int64       `json:"files_uploaded"`
	UploadErrors         int64       `json:"upload_errors"`
	LastErrorTime        time.Time   `json:"last_error_time"`
	LastErrorText        string      `json:"last_error_text"`
	LastSuccessfulUpload time.Time   `json:"last_successful_upload"`
	HoldingArea          interface{} `json:"file_holding_area"`
	StorageStatistics    interface{} `json:"storage_statistics"`
	BundleSendTimeout    int64       `json:"bundle_send_timeout"`
	BundleSizeMax        int64       `json:"bundle_size_max"`
	UploadEmptyFiles     bool        `json:"upload_empty_files"`
}

type BundledOutput

type BundledOutput struct {
	Behavior BundleBehavior

	Config *Configuration

	// TODO: make this thread-safe from the status page
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewHTTPOutputFromConfig

func NewHTTPOutputFromConfig(cfg *Configuration) *BundledOutput

func NewS3OutputFromConfig

func NewS3OutputFromConfig(cfg *Configuration) *BundledOutput

func NewSplunkOutputFromConfig

func NewSplunkOutputFromConfig(cfg *Configuration) *BundledOutput

func (*BundledOutput) Go

func (o *BundledOutput) Go(messages <-chan string, signals <-chan os.Signal, exitCond *sync.Cond) error

func (*BundledOutput) Initialize

func (o *BundledOutput) Initialize(connString string) error

func (*BundledOutput) Key

func (o *BundledOutput) Key() string

func (*BundledOutput) Statistics

func (o *BundledOutput) Statistics() interface{}

func (*BundledOutput) String

func (o *BundledOutput) String() string

type FileOutput

type FileOutput struct {
	Config *Configuration

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewFileOutputFromConfig

func NewFileOutputFromConfig(cfg *Configuration) *FileOutput

func (*FileOutput) Go

func (o *FileOutput) Go(messages <-chan string, signalChan <-chan os.Signal, exitCond *sync.Cond) error

func (*FileOutput) Initialize

func (o *FileOutput) Initialize(fileName string) error

func (*FileOutput) Key

func (o *FileOutput) Key() string

func (*FileOutput) Statistics

func (o *FileOutput) Statistics() interface{}

func (*FileOutput) String

func (o *FileOutput) String() string

type FileStatistics

type FileStatistics struct {
	LastOpenTime time.Time `json:"last_open_time"`
	FileName     string    `json:"file_name"`
}

type HTTPBehavior

type HTTPBehavior struct {
	Config *Configuration

	HTTPPostTemplate *template.Template
	// contains filtered or unexported fields
}

This is the HTTP implementation of the OutputHandler interface defined in main.go

func (*HTTPBehavior) CreateTransport

func (this *HTTPBehavior) CreateTransport() http.RoundTripper

createTransport returns Transport which will be used in http.Client.

func (*HTTPBehavior) Initialize

func (this *HTTPBehavior) Initialize(dest string) error

Construct the HTTPBehavior object

func (*HTTPBehavior) Key

func (this *HTTPBehavior) Key() string

func (*HTTPBehavior) Statistics

func (this *HTTPBehavior) Statistics() interface{}

func (*HTTPBehavior) String

func (this *HTTPBehavior) String() string

func (*HTTPBehavior) Upload

func (this *HTTPBehavior) Upload(fileName string, fp *os.File) UploadStatus

This function does a POST of the given event to this.dest. UploadBehavior is called from within its own

goroutine so we can do some expensive work here.

type HTTPStatistics

type HTTPStatistics struct {
	Destination string `json:"destination"`
}

type KafkaOutput

type KafkaOutput struct {
	Config *Configuration

	EventSent    metrics.Meter
	DroppedEvent metrics.Meter
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewKafkaOutputFromConfig

func NewKafkaOutputFromConfig(cfg *Configuration) *KafkaOutput

func (*KafkaOutput) Go

func (o *KafkaOutput) Go(messages <-chan string, signals <-chan os.Signal, exitCond *sync.Cond) error

func (*KafkaOutput) Initialize

func (o *KafkaOutput) Initialize(unused string) (err error)

func (*KafkaOutput) Key

func (o *KafkaOutput) Key() string

func (*KafkaOutput) Statistics

func (o *KafkaOutput) Statistics() interface{}

func (*KafkaOutput) String

func (o *KafkaOutput) String() string

type KafkaStatistics

type KafkaStatistics struct {
	DroppedEventCount int64 `json:"dropped_event_count"`
	EventSentCount    int64 `json:"event_sent_count"`
}

type NGS3Output

type NGS3Output struct {
	Config *Configuration
	// contains filtered or unexported fields
}

func (*NGS3Output) ExitCleanup

func (so *NGS3Output) ExitCleanup()

func (*NGS3Output) HandleHup

func (so *NGS3Output) HandleHup() error

func (*NGS3Output) HandleMessage

func (so *NGS3Output) HandleMessage(message string) error

func (*NGS3Output) HandleTerm

func (so *NGS3Output) HandleTerm()

func (*NGS3Output) HandleTick

func (so *NGS3Output) HandleTick() error

func (*NGS3Output) Initialize

func (so *NGS3Output) Initialize(connString string) error

func (*NGS3Output) Key

func (so *NGS3Output) Key() string

func (*NGS3Output) Start

func (so *NGS3Output) Start() (err error)

func (*NGS3Output) Statistics

func (so *NGS3Output) Statistics() interface{}

func (*NGS3Output) String

func (so *NGS3Output) String() string

type NetOutput

type NetOutput struct {
	Config *Configuration

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewNetOutputfromConfig

func NewNetOutputfromConfig(cfg *Configuration) *NetOutput

func (*NetOutput) Go

func (o *NetOutput) Go(messages <-chan string, signals <-chan os.Signal, exitCond *sync.Cond) error

func (*NetOutput) Initialize

func (o *NetOutput) Initialize(netConn string) error

Initialize expects a connection string in the following format: (protocol):(hostname/IP):(port) for example: tcp:destination.server.example.com:512

func (*NetOutput) Key

func (o *NetOutput) Key() string

func (*NetOutput) Statistics

func (o *NetOutput) Statistics() interface{}

func (*NetOutput) String

func (o *NetOutput) String() string

type NetStatistics

type NetStatistics struct {
	LastOpenTime      time.Time `json:"last_open_time"`
	Protocol          string    `json:"connection_protocol"`
	RemoteHostname    string    `json:"remote_hostname"`
	DroppedEventCount int64     `json:"dropped_event_count"`
	Connected         bool      `json:"connected"`
}

type Output

type Output interface {
	Go(messages <-chan string, signalChan <-chan os.Signal, exitCond *sync.Cond) error
	OutputKeys
	OutputInitializer
}

type OutputHandler

type OutputHandler interface {
	Start() error
	HandleMessage(message string) error
	HandleHup() error
	HandleTick() error
	ExitCleanup()
	HandleTerm()
	OutputKeys
	OutputInitializer
}

type OutputInitializer

type OutputInitializer interface {
	Initialize(string) error
}

type OutputKeys

type OutputKeys interface {
	String() string
	Statistics() interface{}
	Key() string
}

type S3Behavior

type S3Behavior struct {
	Config *Configuration
	// contains filtered or unexported fields
}

func (*S3Behavior) Initialize

func (o *S3Behavior) Initialize(connString string) error

func (*S3Behavior) Key

func (o *S3Behavior) Key() string

func (*S3Behavior) Statistics

func (o *S3Behavior) Statistics() interface{}

func (*S3Behavior) String

func (o *S3Behavior) String() string

func (*S3Behavior) Upload

func (o *S3Behavior) Upload(fileName string, fp *os.File) UploadStatus

type S3ChunkingPublisher

type S3ChunkingPublisher struct {
	S3Publisher

	Input chan string
	// contains filtered or unexported fields
}

func NewS3ChunkingPublisher

func NewS3ChunkingPublisher(cfg *Configuration, uploader WrappedUploader, bucketName string) *S3ChunkingPublisher

func (*S3ChunkingPublisher) LaunchInputWorkers

func (chunkingPublisher *S3ChunkingPublisher) LaunchInputWorkers(workerNum int) error

func (*S3ChunkingPublisher) RollChunkIf

func (chunkingPublisher *S3ChunkingPublisher) RollChunkIf(uploadEmpty bool) (err error)

func (*S3ChunkingPublisher) RollChunkIfTimeElapsed

func (chunkingPublisher *S3ChunkingPublisher) RollChunkIfTimeElapsed(uploadEmpty bool, duration time.Duration) (err error)

func (*S3ChunkingPublisher) Start

func (chunkingPublisher *S3ChunkingPublisher) Start() error

func (*S3ChunkingPublisher) Stop

func (chunkingPublisher *S3ChunkingPublisher) Stop()

type S3OutputChunk

type S3OutputChunk struct {
	Closed bool
	// contains filtered or unexported fields
}

func NewS3OutputChunk

func NewS3OutputChunk(cfg *Configuration, chunkSize, flushSize int64, fileName, bucketName string) (*S3OutputChunk, error)

func (*S3OutputChunk) CloseChunkReader

func (chunk *S3OutputChunk) CloseChunkReader() error

func (*S3OutputChunk) CloseChunkWriters

func (chunk *S3OutputChunk) CloseChunkWriters() error

func (*S3OutputChunk) FlushIfNeeded

func (chunk *S3OutputChunk) FlushIfNeeded() error

func (*S3OutputChunk) Full

func (chunk *S3OutputChunk) Full() bool

func (*S3OutputChunk) MarkSent

func (chunk *S3OutputChunk) MarkSent()

func (*S3OutputChunk) NeedsFlush

func (chunk *S3OutputChunk) NeedsFlush() bool

func (*S3OutputChunk) PrepareS3UploadInput

func (chunk *S3OutputChunk) PrepareS3UploadInput(workerId int) *s3manager.UploadInput

func (*S3OutputChunk) Read

func (chunk *S3OutputChunk) Read(buffer []byte) (n int, err error)

Read is called by the S3 uploader. It is repeatedly called to pull data off the pipe, so it can be written to S3. Once the writer end of the pipe is closed, the current S3 upload will be completed and no more reading will be performed on that pipe.

func (*S3OutputChunk) Write

func (chunk *S3OutputChunk) Write(message string) error

type S3OutputChunkWorker

type S3OutputChunkWorker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewS3ChunkWorker

func NewS3ChunkWorker(cfg *Configuration, uploads chan<- *S3OutputChunk, chunkSize, flushSize int64, fileName, bucketName string) (*S3OutputChunkWorker, error)

func (*S3OutputChunkWorker) CloseCurrentChunk

func (chunkWorker *S3OutputChunkWorker) CloseCurrentChunk() error

func (*S3OutputChunkWorker) CurrentChunkTime

func (chunkWorker *S3OutputChunkWorker) CurrentChunkTime() time.Time

func (*S3OutputChunkWorker) RollChunk

func (chunkWorker *S3OutputChunkWorker) RollChunk() error

RollChunk ends the current chunk and creates another. Ending it is accomplished by closing the pipe writer which results in the read and then the S3 upload completing. The chunk is created before any data is received.

func (*S3OutputChunkWorker) RollChunkAndSend

func (chunkWorker *S3OutputChunkWorker) RollChunkAndSend() error

RollChunkAndSend creates a new chunk and sends it to the thead doing the S3 uploading via a channel.

func (*S3OutputChunkWorker) RollChunkIf

func (chunkWorker *S3OutputChunkWorker) RollChunkIf(emptyOk bool) error

func (*S3OutputChunkWorker) RollChunkIfTimeElapsed

func (chunkWorker *S3OutputChunkWorker) RollChunkIfTimeElapsed(emptyOk bool, duration time.Duration) error

func (*S3OutputChunkWorker) SendChunk

func (chunkWorker *S3OutputChunkWorker) SendChunk()

SendChunk - sends the details of a chunk (which includes the pipe being used to send data) and is one of the first things that is done. Doing this triggers the creation of a new S3 uploader, which will continuously read from the pipe until it is closed. This does not send RabbitMQ data, but rather a chunk object.

func (*S3OutputChunkWorker) Work

func (chunkWorker *S3OutputChunkWorker) Work(workerId int, wg *sync.WaitGroup, input <-chan string)

type S3Publisher

type S3Publisher struct {
	WrappedUploader
	// contains filtered or unexported fields
}

func NewS3Publisher

func NewS3Publisher(waitGroup *sync.WaitGroup, uploader WrappedUploader, uploadInputs <-chan *S3OutputChunk) S3Publisher

func (*S3Publisher) LaunchUploadWorkers

func (publisher *S3Publisher) LaunchUploadWorkers(workerNum int)

type S3PublisherWorker

type S3PublisherWorker struct {
	// contains filtered or unexported fields
}

func NewS3PublisherWorker

func NewS3PublisherWorker(workerId int, waitGroup *sync.WaitGroup, uploads <-chan *S3OutputChunk, publisher WrappedUploader) *S3PublisherWorker

func (*S3PublisherWorker) Work

func (worker *S3PublisherWorker) Work()

Work waits for a new chunk on the channel and then uses it to setup a new S3 uploader. That uploader will make calls to the above S3OutputChunk.Read() to pull data off the internal pipe where we are also writing data that is received from RabbitMQ. Once the pipe's writer is closed, the data will be uploaded and this will again wait for the next chunk. Note that chunks are created before any data is received.

type S3Statistics

type S3Statistics struct {
	BucketName        string `json:"bucket_name"`
	Region            string `json:"region"`
	EncryptionEnabled bool   `json:"encryption_enabled"`
}

type SplunkBehavior

type SplunkBehavior struct {
	Config *Configuration

	HTTPPostTemplate *template.Template
	// contains filtered or unexported fields
}

This is the Splunk HTTP Event Collector (HEC) implementation of the OutputHandler interface defined in main.go

func (*SplunkBehavior) Initialize

func (this *SplunkBehavior) Initialize(dest string) error

Construct the syslog_output.go object

func (*SplunkBehavior) Key

func (this *SplunkBehavior) Key() string

func (*SplunkBehavior) Statistics

func (this *SplunkBehavior) Statistics() interface{}

func (*SplunkBehavior) String

func (this *SplunkBehavior) String() string

func (*SplunkBehavior) Upload

func (this *SplunkBehavior) Upload(fileName string, fp *os.File) UploadStatus

This function does a POST of the given event to this.dest. UploadBehavior is called from within its own

goroutine so we can do some expensive work here.

type SplunkStatistics

type SplunkStatistics struct {
	Destination string `json:"destination"`
}

type SyslogOutput

type SyslogOutput struct {
	Config *Configuration

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSyslogOutputFromConfig

func NewSyslogOutputFromConfig(cfg *Configuration) *SyslogOutput

func (*SyslogOutput) Go

func (o *SyslogOutput) Go(messages <-chan string, signals <-chan os.Signal, exitCond *sync.Cond) error

func (*SyslogOutput) Initialize

func (o *SyslogOutput) Initialize(netConn string) error

Initialize expects a connection string in the following format: [protocol]:host[:port] For example: tcp+tls:destination.server.example.com:512 - protocol is optional (but the colon before host is not) and should be something like:

  • tcp, udp, tcp+tls

- port is optional and if not provided will default to 514.

func (*SyslogOutput) Key

func (o *SyslogOutput) Key() string

func (*SyslogOutput) Statistics

func (o *SyslogOutput) Statistics() interface{}

func (*SyslogOutput) String

func (o *SyslogOutput) String() string

type SyslogStatistics

type SyslogStatistics struct {
	LastOpenTime       time.Time `json:"last_open_time"`
	Protocol           string    `json:"protocol"`
	RemoteHostnamePort string    `json:"remote_hostname_port"`
	DroppedEventCount  int64     `json:"dropped_event_count"`
	Connected          bool      `json:"connected"`
}

type UploadData

type UploadData struct {
	FileName string
	FileSize int64
	Events   chan UploadEvent
}

type UploadEvent

type UploadEvent struct {
	EventSeq  int64
	EventText string
}

func NewUploadEvent

func NewUploadEvent(eventSeq int64, eventText string, eventTextAsJsonByteArray bool) UploadEvent

NewUploadEvent creates an instance of UploadEvent.

type UploadStatus

type UploadStatus struct {
	// contains filtered or unexported fields
}

type WrappedUploader

type WrappedUploader interface {
	Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
}

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL