Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Channels ¶
type Channels struct {
// contains filtered or unexported fields
}
Channels Pipeline channels
func (*Channels) CloseS3Channel ¶
func (c *Channels) CloseS3Channel()
CloseS3Channel closes S3 channel
func (*Channels) CloseSQSChannel ¶
func (c *Channels) CloseSQSChannel()
CloseSQSChannel closes SQS channel
func (*Channels) GetS3Channel ¶
func (c *Channels) GetS3Channel() chan *S3Object
GetS3Channel gets S3 channel
func (*Channels) GetSQSChannel ¶
GetSQSChannel gets SQS channel
type S3ImportsChannels ¶
type S3ImportsChannels struct {
// contains filtered or unexported fields
}
S3ImportsChannels Pipeline channels used on s3imports command
func NewS3ImportsChannels ¶
func NewS3ImportsChannels() *S3ImportsChannels
NewS3ImportsChannels creates a new s3 list pipeline channels object
func (*S3ImportsChannels) CloseS3Channel ¶
func (c *S3ImportsChannels) CloseS3Channel()
CloseS3Channel closes S3 channel
func (*S3ImportsChannels) CloseS3ListChannel ¶
func (c *S3ImportsChannels) CloseS3ListChannel()
CloseS3ListChannel closes S3 list channel
func (*S3ImportsChannels) GetS3Channel ¶
func (c *S3ImportsChannels) GetS3Channel() chan *S3Object
GetS3Channel gets S3 channel
func (*S3ImportsChannels) GetS3ListChannel ¶
func (c *S3ImportsChannels) GetS3ListChannel() chan *S3List
GetS3ListChannel gets S3 list channel
type S3List ¶
type S3List struct { *aws.S3 *S3ReaderInformation // contains filtered or unexported fields }
S3List S3 list object to send thru pipeline
type S3ListerWorker ¶
type S3ListerWorker struct {
// contains filtered or unexported fields
}
S3ListerWorker is a worker to list an S3 bucket and pass them to the output (out channel)
func NewS3ListerWorker ¶
func NewS3ListerWorker(in <-chan *S3List, out chan<- *S3Object, wgS3Objects eventCounter) *S3ListerWorker
NewS3ListerWorker creates an S3ListerWorker
func (*S3ListerWorker) Start ¶
func (w *S3ListerWorker) Start()
Start starts the SQS consumer workers
func (*S3ListerWorker) Stop ¶
func (w *S3ListerWorker) Stop()
Stop sends notification to stop to workers and wait untill all workers finish
func (*S3ListerWorker) Wait ¶
func (w *S3ListerWorker) Wait()
Wait waits until all workers have finished
type S3Object ¶
type S3Object struct { *aws.S3Object *S3ReaderInformation // contains filtered or unexported fields }
S3Object S3 object element to send thru pipeline
func NewS3Object ¶
func NewS3Object(awsS3Object *aws.S3Object, ri *S3ReaderInformation, s3ObjectProcessNotifications S3ObjectProcessNotifications) *S3Object
NewS3Object creates a new S3 object to be sent thru pipeline
type S3ObjectProcessNotifications ¶
type S3ObjectProcessNotifications interface { // EventACKed executed when an event is ACKed EventACKed() // EventSent executed when an event is sent to beats pipeline to be published EventSent() // S3ObjectProcessed executed when an S3 object is completely processed S3ObjectProcessed() }
S3ObjectProcessNotifications interface implemented by elements passed on event.Private
func NewS3ObjectProcessNotificationsIgnorer ¶
func NewS3ObjectProcessNotificationsIgnorer() S3ObjectProcessNotifications
NewS3ObjectProcessNotificationsIgnorer creates an S3 object process notifications which ignores all events
type S3ReaderInformation ¶
type S3ReaderInformation struct {
// contains filtered or unexported fields
}
S3ReaderInformation information present on inputs needed at S3 reader stage
func NewS3ReaderInformation ¶
func NewS3ReaderInformation(logParser logparser.LogParser, keyRegexFields *regexp.Regexp, metadataType string) *S3ReaderInformation
NewS3ReaderInformation creates a new S3 reader information
func (S3ReaderInformation) GetKeyFields ¶
func (ri S3ReaderInformation) GetKeyFields(key string) (*common.MapStr, error)
GetKeyFields extract fields from key if input set it
func (*S3ReaderInformation) GetLogParser ¶
func (ri *S3ReaderInformation) GetLogParser() logparser.LogParser
GetLogParser obtains the log parser
func (*S3ReaderInformation) GetMetadataType ¶
func (ri *S3ReaderInformation) GetMetadataType() string
GetMetadataType obtains metadata type
type S3ReaderWorker ¶
type S3ReaderWorker struct {
// contains filtered or unexported fields
}
S3ReaderWorker is a worker to read objects from S3, parse their content, and send events to output
func NewS3ReaderWorker ¶
func NewS3ReaderWorker(in <-chan *S3Object, out beat.Client, wgEvents eventCounter, wgS3Objects eventCounter) *S3ReaderWorker
NewS3ReaderWorker creates a new S3ReaderWorker
func (*S3ReaderWorker) Stop ¶
func (w *S3ReaderWorker) Stop()
Stop sends notification to stop to workers and wait untill all workers finish. We will not accept more S3 objects
func (*S3ReaderWorker) Wait ¶
func (w *S3ReaderWorker) Wait()
Wait waits until all workers have finished
type SQSConsumerWorker ¶
type SQSConsumerWorker struct {
// contains filtered or unexported fields
}
SQSConsumerWorker is a worker to read SQS notifications for reading messages from AWS (present on in channel), extract new S3 objects present on messages and pass to the output (out channel)
func NewSQSConsumerWorker ¶
func NewSQSConsumerWorker(in <-chan *SQS, out chan<- *S3Object, wgSQSMessages eventCounter, wgS3Objects eventCounter, keepSQSMessages bool) *SQSConsumerWorker
NewSQSConsumerWorker creates an SQSConsumerWorker
func (*SQSConsumerWorker) Start ¶
func (w *SQSConsumerWorker) Start()
Start starts the SQS consumer workers
func (*SQSConsumerWorker) Stop ¶
func (w *SQSConsumerWorker) Stop()
Stop sends notification to stop to workers and wait untill all workers finish
func (*SQSConsumerWorker) StopAcceptingMessages ¶
func (w *SQSConsumerWorker) StopAcceptingMessages()
StopAcceptingMessages sends notification to stop to workers and wait until all workers finish
func (*SQSConsumerWorker) Wait ¶
func (w *SQSConsumerWorker) Wait()
Wait waits until all workers have finished
type SQSMessage ¶
type SQSMessage struct { *aws.SQSMessage // contains filtered or unexported fields }
SQSMessage SQS message to be passed thru pipeline. We have to keep how much S3 objects and how much events are generated from this message in order to delete it from SQS once it finishes
func NewSQSMessage ¶
func NewSQSMessage(sqs *SQS, sqsMessage *aws.SQSMessage, keepOnCompleted bool) *SQSMessage
NewSQSMessage is a construct function for creating the object with session and url of the queue as arguments
func (*SQSMessage) EventACKed ¶
func (s *SQSMessage) EventACKed()
EventACKed reduces the number of events to the counter (to know the number of events pending to ACK). If all events have been processed, the SQS message is deleted.
func (*SQSMessage) EventSent ¶
func (s *SQSMessage) EventSent()
EventSent adds the number of events to the counter (to know the number of events pending to ACK)
func (*SQSMessage) ExtractNewS3Objects ¶
func (s *SQSMessage) ExtractNewS3Objects(mh func(s3object *S3Object) error) error
ExtractNewS3Objects extracts those new S3 objects present on an SQS message This function is executed on a mutex to avoid the following case: Time 0 -> Goroutine A (GA) : executes ExtractNewS3Objects with first S3 element and keeps on the loop Time 1 -> Goroutine B (GB) : downloads S3 object and is empty. It executes DeleteOnJobCompleted and deletes SQS message Time 2 -> app crashes Problem: as SQS message has already been deleted, it can not be processed again
func (*SQSMessage) OnDelete ¶
func (s *SQSMessage) OnDelete(f func())
OnDelete adds callback for OnDelete event
func (*SQSMessage) S3ObjectProcessed ¶
func (s *SQSMessage) S3ObjectProcessed()
S3ObjectProcessed reduces the number of pending S3 objects to process and executed DeleteOnJobCompleted