pipeline

package
v0.0.0-...-dbd657b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2019 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channels

type Channels struct {
	// contains filtered or unexported fields
}

Channels Pipeline channels

func NewChannels

func NewChannels() *Channels

NewChannels creates a new pipeline channels object

func (*Channels) CloseS3Channel

func (c *Channels) CloseS3Channel()

CloseS3Channel closes S3 channel

func (*Channels) CloseSQSChannel

func (c *Channels) CloseSQSChannel()

CloseSQSChannel closes SQS channel

func (*Channels) GetS3Channel

func (c *Channels) GetS3Channel() chan *S3Object

GetS3Channel gets S3 channel

func (*Channels) GetSQSChannel

func (c *Channels) GetSQSChannel() chan *SQS

GetSQSChannel gets SQS channel

type S3ImportsChannels

type S3ImportsChannels struct {
	// contains filtered or unexported fields
}

S3ImportsChannels Pipeline channels used on s3imports command

func NewS3ImportsChannels

func NewS3ImportsChannels() *S3ImportsChannels

NewS3ImportsChannels creates a new s3 list pipeline channels object

func (*S3ImportsChannels) CloseS3Channel

func (c *S3ImportsChannels) CloseS3Channel()

CloseS3Channel closes S3 channel

func (*S3ImportsChannels) CloseS3ListChannel

func (c *S3ImportsChannels) CloseS3ListChannel()

CloseS3ListChannel closes S3 list channel

func (*S3ImportsChannels) GetS3Channel

func (c *S3ImportsChannels) GetS3Channel() chan *S3Object

GetS3Channel gets S3 channel

func (*S3ImportsChannels) GetS3ListChannel

func (c *S3ImportsChannels) GetS3ListChannel() chan *S3List

GetS3ListChannel gets S3 list channel

type S3List

type S3List struct {
	*aws.S3
	*S3ReaderInformation
	// contains filtered or unexported fields
}

S3List S3 list object to send thru pipeline

func NewS3List

func NewS3List(session *session.Session, s3prefix *aws.S3Object, ri *S3ReaderInformation, since, to time.Time) *S3List

NewS3List creates a new S3 to be sent thru pipeline

type S3ListerWorker

type S3ListerWorker struct {
	// contains filtered or unexported fields
}

S3ListerWorker is a worker to list an S3 bucket and pass them to the output (out channel)

func NewS3ListerWorker

func NewS3ListerWorker(in <-chan *S3List, out chan<- *S3Object, wgS3Objects eventCounter) *S3ListerWorker

NewS3ListerWorker creates an S3ListerWorker

func (*S3ListerWorker) Start

func (w *S3ListerWorker) Start()

Start starts the SQS consumer workers

func (*S3ListerWorker) Stop

func (w *S3ListerWorker) Stop()

Stop sends notification to stop to workers and wait untill all workers finish

func (*S3ListerWorker) Wait

func (w *S3ListerWorker) Wait()

Wait waits until all workers have finished

type S3Object

type S3Object struct {
	*aws.S3Object
	*S3ReaderInformation
	// contains filtered or unexported fields
}

S3Object S3 object element to send thru pipeline

func NewS3Object

func NewS3Object(awsS3Object *aws.S3Object, ri *S3ReaderInformation, s3ObjectProcessNotifications S3ObjectProcessNotifications) *S3Object

NewS3Object creates a new S3 object to be sent thru pipeline

type S3ObjectProcessNotifications

type S3ObjectProcessNotifications interface {
	// EventACKed executed when an event is ACKed
	EventACKed()

	// EventSent executed when an event is sent to beats pipeline to be published
	EventSent()

	// S3ObjectProcessed executed when an S3 object is completely processed
	S3ObjectProcessed()
}

S3ObjectProcessNotifications interface implemented by elements passed on event.Private

func NewS3ObjectProcessNotificationsIgnorer

func NewS3ObjectProcessNotificationsIgnorer() S3ObjectProcessNotifications

NewS3ObjectProcessNotificationsIgnorer creates an S3 object process notifications which ignores all events

type S3ReaderInformation

type S3ReaderInformation struct {
	// contains filtered or unexported fields
}

S3ReaderInformation information present on inputs needed at S3 reader stage

func NewS3ReaderInformation

func NewS3ReaderInformation(logParser logparser.LogParser, keyRegexFields *regexp.Regexp, metadataType string) *S3ReaderInformation

NewS3ReaderInformation creates a new S3 reader information

func (S3ReaderInformation) GetKeyFields

func (ri S3ReaderInformation) GetKeyFields(key string) (*common.MapStr, error)

GetKeyFields extract fields from key if input set it

func (*S3ReaderInformation) GetLogParser

func (ri *S3ReaderInformation) GetLogParser() logparser.LogParser

GetLogParser obtains the log parser

func (*S3ReaderInformation) GetMetadataType

func (ri *S3ReaderInformation) GetMetadataType() string

GetMetadataType obtains metadata type

type S3ReaderWorker

type S3ReaderWorker struct {
	// contains filtered or unexported fields
}

S3ReaderWorker is a worker to read objects from S3, parse their content, and send events to output

func NewS3ReaderWorker

func NewS3ReaderWorker(in <-chan *S3Object, out beat.Client, wgEvents eventCounter, wgS3Objects eventCounter) *S3ReaderWorker

NewS3ReaderWorker creates a new S3ReaderWorker

func (*S3ReaderWorker) Start

func (w *S3ReaderWorker) Start()

Start starts the worker

func (*S3ReaderWorker) Stop

func (w *S3ReaderWorker) Stop()

Stop sends notification to stop to workers and wait untill all workers finish. We will not accept more S3 objects

func (*S3ReaderWorker) Wait

func (w *S3ReaderWorker) Wait()

Wait waits until all workers have finished

type SQS

type SQS struct {
	*aws.SQS
	*S3ReaderInformation
}

SQS SQS element to send thru pipeline

func NewSQS

func NewSQS(session *session.Session, queueURL *string, ri *S3ReaderInformation) *SQS

NewSQS creates a new SQS to be sent thru pipeline

type SQSConsumerWorker

type SQSConsumerWorker struct {
	// contains filtered or unexported fields
}

SQSConsumerWorker is a worker to read SQS notifications for reading messages from AWS (present on in channel), extract new S3 objects present on messages and pass to the output (out channel)

func NewSQSConsumerWorker

func NewSQSConsumerWorker(in <-chan *SQS, out chan<- *S3Object, wgSQSMessages eventCounter, wgS3Objects eventCounter, keepSQSMessages bool) *SQSConsumerWorker

NewSQSConsumerWorker creates an SQSConsumerWorker

func (*SQSConsumerWorker) Start

func (w *SQSConsumerWorker) Start()

Start starts the SQS consumer workers

func (*SQSConsumerWorker) Stop

func (w *SQSConsumerWorker) Stop()

Stop sends notification to stop to workers and wait untill all workers finish

func (*SQSConsumerWorker) StopAcceptingMessages

func (w *SQSConsumerWorker) StopAcceptingMessages()

StopAcceptingMessages sends notification to stop to workers and wait until all workers finish

func (*SQSConsumerWorker) Wait

func (w *SQSConsumerWorker) Wait()

Wait waits until all workers have finished

type SQSMessage

type SQSMessage struct {
	*aws.SQSMessage
	// contains filtered or unexported fields
}

SQSMessage SQS message to be passed thru pipeline. We have to keep how much S3 objects and how much events are generated from this message in order to delete it from SQS once it finishes

func NewSQSMessage

func NewSQSMessage(sqs *SQS, sqsMessage *aws.SQSMessage, keepOnCompleted bool) *SQSMessage

NewSQSMessage is a construct function for creating the object with session and url of the queue as arguments

func (*SQSMessage) EventACKed

func (s *SQSMessage) EventACKed()

EventACKed reduces the number of events to the counter (to know the number of events pending to ACK). If all events have been processed, the SQS message is deleted.

func (*SQSMessage) EventSent

func (s *SQSMessage) EventSent()

EventSent adds the number of events to the counter (to know the number of events pending to ACK)

func (*SQSMessage) ExtractNewS3Objects

func (s *SQSMessage) ExtractNewS3Objects(mh func(s3object *S3Object) error) error

ExtractNewS3Objects extracts those new S3 objects present on an SQS message This function is executed on a mutex to avoid the following case: Time 0 -> Goroutine A (GA) : executes ExtractNewS3Objects with first S3 element and keeps on the loop Time 1 -> Goroutine B (GB) : downloads S3 object and is empty. It executes DeleteOnJobCompleted and deletes SQS message Time 2 -> app crashes Problem: as SQS message has already been deleted, it can not be processed again

func (*SQSMessage) OnDelete

func (s *SQSMessage) OnDelete(f func())

OnDelete adds callback for OnDelete event

func (*SQSMessage) S3ObjectProcessed

func (s *SQSMessage) S3ObjectProcessed()

S3ObjectProcessed reduces the number of pending S3 objects to process and executed DeleteOnJobCompleted

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL