awslambdaeventsources

package
v2.144.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2024 License: Apache-2.0 Imports: 15 Imported by: 15

README

AWS Lambda Event Sources

An event source mapping is an AWS Lambda resource that reads from an event source and invokes a Lambda function. You can use event source mappings to process items from a stream or queue in services that don't invoke Lambda functions directly. Lambda provides event source mappings for the following services. Read more about lambda event sources here.

This module includes classes that allow using various AWS services as event sources for AWS Lambda via the high-level lambda.addEventSource(source) API.

NOTE: In most cases, it is also possible to use the resource APIs to invoke an AWS Lambda function. This library provides a uniform API for all Lambda event sources regardless of the underlying mechanism they use.

The following code sets up a lambda function with an SQS queue event source -

import "github.com/aws/aws-cdk-go/awscdk"

var fn function

queue := sqs.NewQueue(this, jsii.String("MyQueue"))
eventSource := awscdk.NewSqsEventSource(queue)
fn.AddEventSource(eventSource)

eventSourceId := eventSource.eventSourceMappingId
eventSourceMappingArn := eventSource.eventSourceMappingArn

The eventSourceId property contains the event source id. This will be a token that will resolve to the final value at the time of deployment.

The eventSourceMappingArn property contains the event source mapping ARN. This will be a token that will resolve to the final value at the time of deployment.

SQS

Amazon Simple Queue Service (Amazon SQS) allows you to build asynchronous workflows. For more information about Amazon SQS, see Amazon Simple Queue Service. You can configure AWS Lambda to poll for these messages as they arrive and then pass the event to a Lambda function invocation. To view a sample event, see Amazon SQS Event.

To set up Amazon Simple Queue Service as an event source for AWS Lambda, you first create or update an Amazon SQS queue and select custom values for the queue parameters. The following parameters will impact Amazon SQS's polling behavior:

  • visibilityTimeout: May impact the period between retries.
  • batchSize: Determines how many records are buffered before invoking your lambda function.
  • maxBatchingWindow: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing.
  • maxConcurrency: The maximum concurrency setting limits the number of concurrent instances of the function that an Amazon SQS event source can invoke.
  • enabled: If the SQS event source mapping should be enabled. The default is true.
import "github.com/aws/aws-cdk-go/awscdk"
var fn function


queue := sqs.NewQueue(this, jsii.String("MyQueue"), &QueueProps{
	VisibilityTimeout: awscdk.Duration_Seconds(jsii.Number(30)),
})

fn.AddEventSource(awscdk.NewSqsEventSource(queue, &SqsEventSourceProps{
	BatchSize: jsii.Number(10),
	 // default
	MaxBatchingWindow: awscdk.Duration_Minutes(jsii.Number(5)),
	ReportBatchItemFailures: jsii.Boolean(true),
}))

S3

You can write Lambda functions to process S3 bucket events, such as the object-created or object-deleted events. For example, when a user uploads a photo to a bucket, you might want Amazon S3 to invoke your Lambda function so that it reads the image and creates a thumbnail for the photo.

You can use the bucket notification configuration feature in Amazon S3 to configure the event source mapping, identifying the bucket events that you want Amazon S3 to publish and which Lambda function to invoke.

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
var fn function


bucket := s3.NewBucket(this, jsii.String("mybucket"))

fn.AddEventSource(awscdk.NewS3EventSource(bucket, &S3EventSourceProps{
	Events: []eventType{
		s3.*eventType_OBJECT_CREATED,
		s3.*eventType_OBJECT_REMOVED,
	},
	Filters: []notificationKeyFilter{
		&notificationKeyFilter{
			Prefix: jsii.String("subdir/"),
		},
	},
}))

In the example above, S3EventSource is accepting Bucket type as parameter. However, Functions like from_bucket_name and from_bucket_arn will return IBucket and is not compliant with S3EventSource. If this is the case, please consider using S3EventSourceV2 instead, this class accepts IBucket.

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
var fn function


bucket := s3.Bucket_FromBucketName(this, jsii.String("Bucket"), jsii.String("bucket-name"))

fn.AddEventSource(awscdk.NewS3EventSourceV2(bucket, &S3EventSourceProps{
	Events: []eventType{
		s3.*eventType_OBJECT_CREATED,
		s3.*eventType_OBJECT_REMOVED,
	},
	Filters: []notificationKeyFilter{
		&notificationKeyFilter{
			Prefix: jsii.String("subdir/"),
		},
	},
}))

SNS

You can write Lambda functions to process Amazon Simple Notification Service notifications. When a message is published to an Amazon SNS topic, the service can invoke your Lambda function by passing the message payload as a parameter. Your Lambda function code can then process the event, for example publish the message to other Amazon SNS topics, or send the message to other AWS services.

This also enables you to trigger a Lambda function in response to Amazon CloudWatch alarms and other AWS services that use Amazon SNS.

For an example event, see Appendix: Message and JSON Formats and Amazon SNS Sample Event. For an example use case, see Using AWS Lambda with Amazon SNS from Different Accounts.

import sns "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var topic topic

var fn function

deadLetterQueue := sqs.NewQueue(this, jsii.String("deadLetterQueue"))
fn.AddEventSource(awscdk.NewSnsEventSource(topic, &SnsEventSourceProps{
	FilterPolicy: map[string]interface{}{
	},
	DeadLetterQueue: deadLetterQueue,
}))

When a user calls the SNS Publish API on a topic that your Lambda function is subscribed to, Amazon SNS will call Lambda to invoke your function asynchronously. Lambda will then return a delivery status. If there was an error calling Lambda, Amazon SNS will retry invoking the Lambda function up to three times. After three tries, if Amazon SNS still could not successfully invoke the Lambda function, then Amazon SNS will send a delivery status failure message to CloudWatch.

DynamoDB Streams

You can write Lambda functions to process change events from a DynamoDB Table. An event is emitted to a DynamoDB stream (if configured) whenever a write (Put, Delete, Update) operation is performed against the table. See Using AWS Lambda with Amazon DynamoDB for more information about configuring Lambda function event sources with DynamoDB.

To process events with a Lambda function, first create or update a DynamoDB table and enable a stream specification. Then, create a DynamoEventSource and add it to your Lambda function. The following parameters will impact Amazon DynamoDB's polling behavior:

  • batchSize: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
  • bisectBatchOnError: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated.
  • reportBatchItemFailures: Allow functions to return partially successful responses for a batch of records.
  • maxBatchingWindow: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing.
  • maxRecordAge: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures.
  • onFailure: In the event a record fails after all retries or if the record age has exceeded the configured value, the record will be sent to SQS queue or SNS topic that is specified here
  • parallelizationFactor: The number of batches to concurrently process on each shard.
  • retryAttempts: The maximum number of times a record should be retried in the event of failure.
  • startingPosition: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
  • tumblingWindow: The duration in seconds of a processing window when using streams.
  • enabled: If the DynamoDB Streams event source mapping should be enabled. The default is true.
import dynamodb "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var table table

var fn function


deadLetterQueue := sqs.NewQueue(this, jsii.String("deadLetterQueue"))
fn.AddEventSource(awscdk.NewDynamoEventSource(table, &DynamoEventSourceProps{
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
	BatchSize: jsii.Number(5),
	BisectBatchOnError: jsii.Boolean(true),
	OnFailure: awscdk.NewSqsDlq(deadLetterQueue),
	RetryAttempts: jsii.Number(10),
}))

Kinesis

You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon Kinesis, see Amazon Kinesis Service. To learn more about configuring Lambda function event sources with kinesis and view a sample event, see Amazon Kinesis Event.

To set up Amazon Kinesis as an event source for AWS Lambda, you first create or update an Amazon Kinesis stream and select custom values for the event source parameters. The following parameters will impact Amazon Kinesis's polling behavior:

  • batchSize: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
  • bisectBatchOnError: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated.
  • reportBatchItemFailures: Allow functions to return partially successful responses for a batch of records.
  • maxBatchingWindow: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of possibly delaying processing.
  • maxRecordAge: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures.
  • onFailure: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here
  • parallelizationFactor: The number of batches to concurrently process on each shard.
  • retryAttempts: The maximum number of times a record should be retried in the event of failure.
  • startingPosition: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp. Note that 'AT_TIMESTAMP' is only supported for Amazon Kinesis streams.
  • startingPositionTimestamp: The time stamp from which to start reading. Used in conjunction with startingPosition when set to 'AT_TIMESTAMP'.
  • tumblingWindow: The duration in seconds of a processing window when using streams.
  • enabled: If the DynamoDB Streams event source mapping should be enabled. The default is true.
import kinesis "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var myFunction function


stream := kinesis.NewStream(this, jsii.String("MyStream"))
myFunction.AddEventSource(awscdk.NewKinesisEventSource(stream, &KinesisEventSourceProps{
	BatchSize: jsii.Number(100),
	 // default
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
}))

Kafka

You can write Lambda functions to process data either from Amazon MSK or a self managed Kafka cluster.

The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the MSK cluster, as described in Username/Password authentication.

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var myFunction function


// Your MSK cluster arn
clusterArn := "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"

// The Kafka topic you want to subscribe to
topic := "some-cool-topic"

// The secret that allows access to your MSK cluster
// You still have to make sure that it is associated with your cluster as described in the documentation
secret := awscdk.NewSecret(this, jsii.String("Secret"), &SecretProps{
	SecretName: jsii.String("AmazonMSK_KafkaSecret"),
})
myFunction.AddEventSource(awscdk.NewManagedKafkaEventSource(&ManagedKafkaEventSourceProps{
	ClusterArn: jsii.String(ClusterArn),
	Topic: topic,
	Secret: secret,
	BatchSize: jsii.Number(100),
	 // default
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
}))

The following code sets up a self managed Kafka cluster as an event source. Username and password based authentication will need to be set up as described in Managing access and permissions.

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

// The secret that allows access to your self hosted Kafka cluster
var secret secret

var myFunction function


// The list of Kafka brokers
bootstrapServers := []*string{
	"kafka-broker:9092",
}

// The Kafka topic you want to subscribe to
topic := "some-cool-topic"

// (Optional) The consumer group id to use when connecting to the Kafka broker. If omitted the UUID of the event source mapping will be used.
consumerGroupId := "my-consumer-group-id"
myFunction.AddEventSource(awscdk.NewSelfManagedKafkaEventSource(&SelfManagedKafkaEventSourceProps{
	BootstrapServers: bootstrapServers,
	Topic: topic,
	ConsumerGroupId: consumerGroupId,
	Secret: secret,
	BatchSize: jsii.Number(100),
	 // default
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
}))

If your self managed Kafka cluster is only reachable via VPC also configure vpc vpcSubnets and securityGroup.

You can specify event filtering for managed and self managed Kafka clusters using the filters property:

import "github.com/aws/aws-cdk-go/awscdk"

var myFunction function


// Your MSK cluster arn
clusterArn := "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"

// The Kafka topic you want to subscribe to
topic := "some-cool-topic"
myFunction.AddEventSource(awscdk.NewManagedKafkaEventSource(&ManagedKafkaEventSourceProps{
	ClusterArn: jsii.String(ClusterArn),
	Topic: jsii.String(Topic),
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
	Filters: []map[string]interface{}{
		lambda.FilterCriteria_Filter(map[string]interface{}{
			"stringEquals": lambda.FilterRule_isEqual(jsii.String("test")),
		}),
	},
}))

You can also specify an S3 bucket as an "on failure" destination:

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var bucket iBucket
var myFunction function


// Your MSK cluster arn
clusterArn := "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"

// The Kafka topic you want to subscribe to
topic := "some-cool-topic"

s3OnFailureDestination := awscdk.NewS3OnFailureDestination(bucket)

myFunction.AddEventSource(awscdk.NewManagedKafkaEventSource(&ManagedKafkaEventSourceProps{
	ClusterArn: jsii.String(ClusterArn),
	Topic: jsii.String(Topic),
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
	OnFailure: s3OnFailureDestination,
}))

Roadmap

Eventually, this module will support all the event sources described under Supported Event Sources in the AWS Lambda Developer Guide.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewApiEventSource_Override

func NewApiEventSource_Override(a ApiEventSource, method *string, path *string, options *awsapigateway.MethodOptions)

func NewDynamoEventSource_Override

func NewDynamoEventSource_Override(d DynamoEventSource, table awsdynamodb.ITable, props *DynamoEventSourceProps)

func NewKinesisEventSource_Override

func NewKinesisEventSource_Override(k KinesisEventSource, stream awskinesis.IStream, props *KinesisEventSourceProps)

func NewManagedKafkaEventSource_Override

func NewManagedKafkaEventSource_Override(m ManagedKafkaEventSource, props *ManagedKafkaEventSourceProps)

func NewS3EventSourceV2_Override added in v2.127.0

func NewS3EventSourceV2_Override(s S3EventSourceV2, bucket awss3.IBucket, props *S3EventSourceProps)

func NewS3EventSource_Override

func NewS3EventSource_Override(s S3EventSource, bucket awss3.Bucket, props *S3EventSourceProps)

func NewS3OnFailureDestination_Override added in v2.109.0

func NewS3OnFailureDestination_Override(s S3OnFailureDestination, bucket awss3.IBucket)

func NewSnsDlq_Override

func NewSnsDlq_Override(s SnsDlq, topic awssns.ITopic)

func NewSnsEventSource_Override

func NewSnsEventSource_Override(s SnsEventSource, topic awssns.ITopic, props *SnsEventSourceProps)

func NewSqsDlq_Override

func NewSqsDlq_Override(s SqsDlq, queue awssqs.IQueue)

func NewSqsEventSource_Override

func NewSqsEventSource_Override(s SqsEventSource, queue awssqs.IQueue, props *SqsEventSourceProps)

func NewStreamEventSource_Override

func NewStreamEventSource_Override(s StreamEventSource, props *StreamEventSourceProps)

Types

type ApiEventSource

type ApiEventSource interface {
	awslambda.IEventSource
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(target awslambda.IFunction)
}

Example:

// The code below shows an example of how to instantiate this type.
// The values are placeholders you should change.
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var authorizer authorizer
var model model
var requestValidator requestValidator

apiEventSource := awscdk.Aws_lambda_event_sources.NewApiEventSource(jsii.String("method"), jsii.String("path"), &MethodOptions{
	ApiKeyRequired: jsii.Boolean(false),
	AuthorizationScopes: []*string{
		jsii.String("authorizationScopes"),
	},
	AuthorizationType: awscdk.Aws_apigateway.AuthorizationType_NONE,
	Authorizer: authorizer,
	MethodResponses: []methodResponse{
		&methodResponse{
			StatusCode: jsii.String("statusCode"),

			// the properties below are optional
			ResponseModels: map[string]iModel{
				"responseModelsKey": model,
			},
			ResponseParameters: map[string]*bool{
				"responseParametersKey": jsii.Boolean(false),
			},
		},
	},
	OperationName: jsii.String("operationName"),
	RequestModels: map[string]*iModel{
		"requestModelsKey": model,
	},
	RequestParameters: map[string]*bool{
		"requestParametersKey": jsii.Boolean(false),
	},
	RequestValidator: requestValidator,
	RequestValidatorOptions: &RequestValidatorOptions{
		RequestValidatorName: jsii.String("requestValidatorName"),
		ValidateRequestBody: jsii.Boolean(false),
		ValidateRequestParameters: jsii.Boolean(false),
	},
})

func NewApiEventSource

func NewApiEventSource(method *string, path *string, options *awsapigateway.MethodOptions) ApiEventSource

type AuthenticationMethod

type AuthenticationMethod string

The authentication method to use with SelfManagedKafkaEventSource.

const (
	// SASL_SCRAM_512_AUTH authentication method for your Kafka cluster.
	AuthenticationMethod_SASL_SCRAM_512_AUTH AuthenticationMethod = "SASL_SCRAM_512_AUTH"
	// SASL_SCRAM_256_AUTH authentication method for your Kafka cluster.
	AuthenticationMethod_SASL_SCRAM_256_AUTH AuthenticationMethod = "SASL_SCRAM_256_AUTH"
	// BASIC_AUTH (SASL/PLAIN) authentication method for your Kafka cluster.
	AuthenticationMethod_BASIC_AUTH AuthenticationMethod = "BASIC_AUTH"
	// CLIENT_CERTIFICATE_TLS_AUTH (mTLS) authentication method for your Kafka cluster.
	AuthenticationMethod_CLIENT_CERTIFICATE_TLS_AUTH AuthenticationMethod = "CLIENT_CERTIFICATE_TLS_AUTH"
)

type BaseStreamEventSourceProps added in v2.7.0

type BaseStreamEventSourceProps struct {
	// Where to begin consuming the stream.
	StartingPosition awslambda.StartingPosition `field:"required" json:"startingPosition" yaml:"startingPosition"`
	// The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
	//
	// Your function receives an
	// event with all the retrieved records.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of:
	//   * 1000 for `DynamoEventSource`
	// * 10000 for `KinesisEventSource`, `ManagedKafkaEventSource` and `SelfManagedKafkaEventSource`.
	// Default: 100.
	//
	BatchSize *float64 `field:"optional" json:"batchSize" yaml:"batchSize"`
	// If the stream event source mapping should be enabled.
	// Default: true.
	//
	Enabled *bool `field:"optional" json:"enabled" yaml:"enabled"`
	// The maximum amount of time to gather records before invoking the function.
	//
	// Maximum of Duration.minutes(5).
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching
	//
	// Default: - Duration.seconds(0) for Kinesis, DynamoDB, and SQS event sources, Duration.millis(500) for MSK, self-managed Kafka, and Amazon MQ.
	//
	MaxBatchingWindow awscdk.Duration `field:"optional" json:"maxBatchingWindow" yaml:"maxBatchingWindow"`
}

The set of properties for streaming event sources shared by Dynamo, Kinesis and Kafka.

Example:

// The code below shows an example of how to instantiate this type.
// The values are placeholders you should change.
import cdk "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

baseStreamEventSourceProps := &BaseStreamEventSourceProps{
	StartingPosition: awscdk.Aws_lambda.StartingPosition_TRIM_HORIZON,

	// the properties below are optional
	BatchSize: jsii.Number(123),
	Enabled: jsii.Boolean(false),
	MaxBatchingWindow: cdk.Duration_Minutes(jsii.Number(30)),
}

type DynamoEventSource

type DynamoEventSource interface {
	StreamEventSource
	// The ARN for this EventSourceMapping.
	EventSourceMappingArn() *string
	// The identifier for this EventSourceMapping.
	EventSourceMappingId() *string
	Props() *StreamEventSourceProps
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(target awslambda.IFunction)
	EnrichMappingOptions(options *awslambda.EventSourceMappingOptions) *awslambda.EventSourceMappingOptions
}

Use an Amazon DynamoDB stream as an event source for AWS Lambda.

Example:

import eventsources "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var fn function

table := dynamodb.NewTable(this, jsii.String("Table"), &TableProps{
	PartitionKey: &Attribute{
		Name: jsii.String("id"),
		Type: dynamodb.AttributeType_STRING,
	},
	Stream: dynamodb.StreamViewType_NEW_IMAGE,
})
fn.AddEventSource(eventsources.NewDynamoEventSource(table, &DynamoEventSourceProps{
	StartingPosition: lambda.StartingPosition_LATEST,
	Filters: []map[string]interface{}{
		lambda.FilterCriteria_Filter(map[string]interface{}{
			"eventName": lambda.FilterRule_isEqual(jsii.String("INSERT")),
		}),
	},
}))

func NewDynamoEventSource

func NewDynamoEventSource(table awsdynamodb.ITable, props *DynamoEventSourceProps) DynamoEventSource

type DynamoEventSourceProps

type DynamoEventSourceProps struct {
	// Where to begin consuming the stream.
	StartingPosition awslambda.StartingPosition `field:"required" json:"startingPosition" yaml:"startingPosition"`
	// The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
	//
	// Your function receives an
	// event with all the retrieved records.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of:
	//   * 1000 for `DynamoEventSource`
	// * 10000 for `KinesisEventSource`, `ManagedKafkaEventSource` and `SelfManagedKafkaEventSource`.
	// Default: 100.
	//
	BatchSize *float64 `field:"optional" json:"batchSize" yaml:"batchSize"`
	// If the stream event source mapping should be enabled.
	// Default: true.
	//
	Enabled *bool `field:"optional" json:"enabled" yaml:"enabled"`
	// The maximum amount of time to gather records before invoking the function.
	//
	// Maximum of Duration.minutes(5).
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching
	//
	// Default: - Duration.seconds(0) for Kinesis, DynamoDB, and SQS event sources, Duration.millis(500) for MSK, self-managed Kafka, and Amazon MQ.
	//
	MaxBatchingWindow awscdk.Duration `field:"optional" json:"maxBatchingWindow" yaml:"maxBatchingWindow"`
	// If the function returns an error, split the batch in two and retry.
	// Default: false.
	//
	BisectBatchOnError *bool `field:"optional" json:"bisectBatchOnError" yaml:"bisectBatchOnError"`
	// Add filter criteria option.
	// Default: - None.
	//
	Filters *[]*map[string]interface{} `field:"optional" json:"filters" yaml:"filters"`
	// The maximum age of a record that Lambda sends to a function for processing.
	//
	// Valid Range:
	// * Minimum value of 60 seconds
	// * Maximum value of 7 days
	//
	// The default value is -1, which sets the maximum age to infinite.
	// When the value is set to infinite, Lambda never discards old records.
	// Record are valid until it expires in the event source.
	// Default: -1.
	//
	MaxRecordAge awscdk.Duration `field:"optional" json:"maxRecordAge" yaml:"maxRecordAge"`
	// An Amazon SQS queue or Amazon SNS topic destination for discarded records.
	// Default: - discarded records are ignored.
	//
	OnFailure awslambda.IEventSourceDlq `field:"optional" json:"onFailure" yaml:"onFailure"`
	// The number of batches to process from each shard concurrently.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of 10.
	// Default: 1.
	//
	ParallelizationFactor *float64 `field:"optional" json:"parallelizationFactor" yaml:"parallelizationFactor"`
	// Allow functions to return partially successful responses for a batch of records.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting
	//
	// Default: false.
	//
	ReportBatchItemFailures *bool `field:"optional" json:"reportBatchItemFailures" yaml:"reportBatchItemFailures"`
	// Maximum number of retry attempts Valid Range: * Minimum value of 0 * Maximum value of 10000.
	//
	// The default value is -1, which sets the maximum number of retries to infinite.
	// When MaximumRetryAttempts is infinite, Lambda retries failed records until
	// the record expires in the event source.
	// Default: -1.
	//
	RetryAttempts *float64 `field:"optional" json:"retryAttempts" yaml:"retryAttempts"`
	// The size of the tumbling windows to group records sent to DynamoDB or Kinesis Valid Range: 0 - 15 minutes.
	// Default: - None.
	//
	TumblingWindow awscdk.Duration `field:"optional" json:"tumblingWindow" yaml:"tumblingWindow"`
}

Example:

import eventsources "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var fn function

table := dynamodb.NewTable(this, jsii.String("Table"), &TableProps{
	PartitionKey: &Attribute{
		Name: jsii.String("id"),
		Type: dynamodb.AttributeType_STRING,
	},
	Stream: dynamodb.StreamViewType_NEW_IMAGE,
})
fn.AddEventSource(eventsources.NewDynamoEventSource(table, &DynamoEventSourceProps{
	StartingPosition: lambda.StartingPosition_LATEST,
	Filters: []map[string]interface{}{
		lambda.FilterCriteria_Filter(map[string]interface{}{
			"eventName": lambda.FilterRule_isEqual(jsii.String("INSERT")),
		}),
	},
}))

type KafkaEventSourceProps

type KafkaEventSourceProps struct {
	// Where to begin consuming the stream.
	StartingPosition awslambda.StartingPosition `field:"required" json:"startingPosition" yaml:"startingPosition"`
	// The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
	//
	// Your function receives an
	// event with all the retrieved records.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of:
	//   * 1000 for `DynamoEventSource`
	// * 10000 for `KinesisEventSource`, `ManagedKafkaEventSource` and `SelfManagedKafkaEventSource`.
	// Default: 100.
	//
	BatchSize *float64 `field:"optional" json:"batchSize" yaml:"batchSize"`
	// If the stream event source mapping should be enabled.
	// Default: true.
	//
	Enabled *bool `field:"optional" json:"enabled" yaml:"enabled"`
	// The maximum amount of time to gather records before invoking the function.
	//
	// Maximum of Duration.minutes(5).
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching
	//
	// Default: - Duration.seconds(0) for Kinesis, DynamoDB, and SQS event sources, Duration.millis(500) for MSK, self-managed Kafka, and Amazon MQ.
	//
	MaxBatchingWindow awscdk.Duration `field:"optional" json:"maxBatchingWindow" yaml:"maxBatchingWindow"`
	// The Kafka topic to subscribe to.
	Topic *string `field:"required" json:"topic" yaml:"topic"`
	// The identifier for the Kafka consumer group to join.
	//
	// The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value.  The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id
	//
	// Default: - none.
	//
	ConsumerGroupId *string `field:"optional" json:"consumerGroupId" yaml:"consumerGroupId"`
	// Add filter criteria to Event Source.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html
	//
	// Default: - none.
	//
	Filters *[]*map[string]interface{} `field:"optional" json:"filters" yaml:"filters"`
	// Add an on Failure Destination for this Kafka event.
	//
	// SNS/SQS/S3 are supported.
	// Default: - discarded records are ignored.
	//
	OnFailure awslambda.IEventSourceDlq `field:"optional" json:"onFailure" yaml:"onFailure"`
	// The secret with the Kafka credentials, see https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html for details This field is required if your Kafka brokers are accessed over the Internet.
	// Default: none.
	//
	Secret awssecretsmanager.ISecret `field:"optional" json:"secret" yaml:"secret"`
}

Properties for a Kafka event source.

Example:

// The code below shows an example of how to instantiate this type.
// The values are placeholders you should change.
import cdk "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var eventSourceDlq iEventSourceDlq
var filters interface{}
var secret secret

kafkaEventSourceProps := &KafkaEventSourceProps{
	StartingPosition: awscdk.Aws_lambda.StartingPosition_TRIM_HORIZON,
	Topic: jsii.String("topic"),

	// the properties below are optional
	BatchSize: jsii.Number(123),
	ConsumerGroupId: jsii.String("consumerGroupId"),
	Enabled: jsii.Boolean(false),
	Filters: []map[string]interface{}{
		map[string]interface{}{
			"filtersKey": filters,
		},
	},
	MaxBatchingWindow: cdk.Duration_Minutes(jsii.Number(30)),
	OnFailure: eventSourceDlq,
	Secret: secret,
}

type KinesisEventSource

type KinesisEventSource interface {
	StreamEventSource
	// The ARN for this EventSourceMapping.
	EventSourceMappingArn() *string
	// The identifier for this EventSourceMapping.
	EventSourceMappingId() *string
	Props() *StreamEventSourceProps
	Stream() awskinesis.IStream
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(target awslambda.IFunction)
	EnrichMappingOptions(options *awslambda.EventSourceMappingOptions) *awslambda.EventSourceMappingOptions
}

Use an Amazon Kinesis stream as an event source for AWS Lambda.

Example:

import kinesis "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var myFunction function

stream := kinesis.NewStream(this, jsii.String("MyStream"))
myFunction.AddEventSource(awscdk.NewKinesisEventSource(stream, &KinesisEventSourceProps{
	BatchSize: jsii.Number(100),
	 // default
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
}))

func NewKinesisEventSource

func NewKinesisEventSource(stream awskinesis.IStream, props *KinesisEventSourceProps) KinesisEventSource

type KinesisEventSourceProps

type KinesisEventSourceProps struct {
	// Where to begin consuming the stream.
	StartingPosition awslambda.StartingPosition `field:"required" json:"startingPosition" yaml:"startingPosition"`
	// The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
	//
	// Your function receives an
	// event with all the retrieved records.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of:
	//   * 1000 for `DynamoEventSource`
	// * 10000 for `KinesisEventSource`, `ManagedKafkaEventSource` and `SelfManagedKafkaEventSource`.
	// Default: 100.
	//
	BatchSize *float64 `field:"optional" json:"batchSize" yaml:"batchSize"`
	// If the stream event source mapping should be enabled.
	// Default: true.
	//
	Enabled *bool `field:"optional" json:"enabled" yaml:"enabled"`
	// The maximum amount of time to gather records before invoking the function.
	//
	// Maximum of Duration.minutes(5).
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching
	//
	// Default: - Duration.seconds(0) for Kinesis, DynamoDB, and SQS event sources, Duration.millis(500) for MSK, self-managed Kafka, and Amazon MQ.
	//
	MaxBatchingWindow awscdk.Duration `field:"optional" json:"maxBatchingWindow" yaml:"maxBatchingWindow"`
	// If the function returns an error, split the batch in two and retry.
	// Default: false.
	//
	BisectBatchOnError *bool `field:"optional" json:"bisectBatchOnError" yaml:"bisectBatchOnError"`
	// Add filter criteria option.
	// Default: - None.
	//
	Filters *[]*map[string]interface{} `field:"optional" json:"filters" yaml:"filters"`
	// The maximum age of a record that Lambda sends to a function for processing.
	//
	// Valid Range:
	// * Minimum value of 60 seconds
	// * Maximum value of 7 days
	//
	// The default value is -1, which sets the maximum age to infinite.
	// When the value is set to infinite, Lambda never discards old records.
	// Record are valid until it expires in the event source.
	// Default: -1.
	//
	MaxRecordAge awscdk.Duration `field:"optional" json:"maxRecordAge" yaml:"maxRecordAge"`
	// An Amazon SQS queue or Amazon SNS topic destination for discarded records.
	// Default: - discarded records are ignored.
	//
	OnFailure awslambda.IEventSourceDlq `field:"optional" json:"onFailure" yaml:"onFailure"`
	// The number of batches to process from each shard concurrently.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of 10.
	// Default: 1.
	//
	ParallelizationFactor *float64 `field:"optional" json:"parallelizationFactor" yaml:"parallelizationFactor"`
	// Allow functions to return partially successful responses for a batch of records.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting
	//
	// Default: false.
	//
	ReportBatchItemFailures *bool `field:"optional" json:"reportBatchItemFailures" yaml:"reportBatchItemFailures"`
	// Maximum number of retry attempts Valid Range: * Minimum value of 0 * Maximum value of 10000.
	//
	// The default value is -1, which sets the maximum number of retries to infinite.
	// When MaximumRetryAttempts is infinite, Lambda retries failed records until
	// the record expires in the event source.
	// Default: -1.
	//
	RetryAttempts *float64 `field:"optional" json:"retryAttempts" yaml:"retryAttempts"`
	// The size of the tumbling windows to group records sent to DynamoDB or Kinesis Valid Range: 0 - 15 minutes.
	// Default: - None.
	//
	TumblingWindow awscdk.Duration `field:"optional" json:"tumblingWindow" yaml:"tumblingWindow"`
	// The time from which to start reading, in Unix time seconds.
	// Default: - no timestamp.
	//
	StartingPositionTimestamp *float64 `field:"optional" json:"startingPositionTimestamp" yaml:"startingPositionTimestamp"`
}

Example:

import kinesis "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var myFunction function

stream := kinesis.NewStream(this, jsii.String("MyStream"))
myFunction.AddEventSource(awscdk.NewKinesisEventSource(stream, &KinesisEventSourceProps{
	BatchSize: jsii.Number(100),
	 // default
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
}))

type ManagedKafkaEventSource

type ManagedKafkaEventSource interface {
	StreamEventSource
	// The ARN for this EventSourceMapping.
	EventSourceMappingArn() *string
	// The identifier for this EventSourceMapping.
	EventSourceMappingId() *string
	Props() *StreamEventSourceProps
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(target awslambda.IFunction)
	EnrichMappingOptions(options *awslambda.EventSourceMappingOptions) *awslambda.EventSourceMappingOptions
}

Use a MSK cluster as a streaming source for AWS Lambda.

Example:

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var myFunction function

// Your MSK cluster arn
clusterArn := "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"

// The Kafka topic you want to subscribe to
topic := "some-cool-topic"

// The secret that allows access to your MSK cluster
// You still have to make sure that it is associated with your cluster as described in the documentation
secret := awscdk.NewSecret(this, jsii.String("Secret"), &SecretProps{
	SecretName: jsii.String("AmazonMSK_KafkaSecret"),
})
myFunction.AddEventSource(awscdk.NewManagedKafkaEventSource(&ManagedKafkaEventSourceProps{
	ClusterArn: jsii.String(ClusterArn),
	Topic: topic,
	Secret: secret,
	BatchSize: jsii.Number(100),
	 // default
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
}))

type ManagedKafkaEventSourceProps

type ManagedKafkaEventSourceProps struct {
	// Where to begin consuming the stream.
	StartingPosition awslambda.StartingPosition `field:"required" json:"startingPosition" yaml:"startingPosition"`
	// The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
	//
	// Your function receives an
	// event with all the retrieved records.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of:
	//   * 1000 for `DynamoEventSource`
	// * 10000 for `KinesisEventSource`, `ManagedKafkaEventSource` and `SelfManagedKafkaEventSource`.
	// Default: 100.
	//
	BatchSize *float64 `field:"optional" json:"batchSize" yaml:"batchSize"`
	// If the stream event source mapping should be enabled.
	// Default: true.
	//
	Enabled *bool `field:"optional" json:"enabled" yaml:"enabled"`
	// The maximum amount of time to gather records before invoking the function.
	//
	// Maximum of Duration.minutes(5).
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching
	//
	// Default: - Duration.seconds(0) for Kinesis, DynamoDB, and SQS event sources, Duration.millis(500) for MSK, self-managed Kafka, and Amazon MQ.
	//
	MaxBatchingWindow awscdk.Duration `field:"optional" json:"maxBatchingWindow" yaml:"maxBatchingWindow"`
	// The Kafka topic to subscribe to.
	Topic *string `field:"required" json:"topic" yaml:"topic"`
	// The identifier for the Kafka consumer group to join.
	//
	// The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value.  The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id
	//
	// Default: - none.
	//
	ConsumerGroupId *string `field:"optional" json:"consumerGroupId" yaml:"consumerGroupId"`
	// Add filter criteria to Event Source.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html
	//
	// Default: - none.
	//
	Filters *[]*map[string]interface{} `field:"optional" json:"filters" yaml:"filters"`
	// Add an on Failure Destination for this Kafka event.
	//
	// SNS/SQS/S3 are supported.
	// Default: - discarded records are ignored.
	//
	OnFailure awslambda.IEventSourceDlq `field:"optional" json:"onFailure" yaml:"onFailure"`
	// The secret with the Kafka credentials, see https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html for details This field is required if your Kafka brokers are accessed over the Internet.
	// Default: none.
	//
	Secret awssecretsmanager.ISecret `field:"optional" json:"secret" yaml:"secret"`
	// An MSK cluster construct.
	ClusterArn *string `field:"required" json:"clusterArn" yaml:"clusterArn"`
}

Properties for a MSK event source.

Example:

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var myFunction function

// Your MSK cluster arn
clusterArn := "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"

// The Kafka topic you want to subscribe to
topic := "some-cool-topic"

// The secret that allows access to your MSK cluster
// You still have to make sure that it is associated with your cluster as described in the documentation
secret := awscdk.NewSecret(this, jsii.String("Secret"), &SecretProps{
	SecretName: jsii.String("AmazonMSK_KafkaSecret"),
})
myFunction.AddEventSource(awscdk.NewManagedKafkaEventSource(&ManagedKafkaEventSourceProps{
	ClusterArn: jsii.String(ClusterArn),
	Topic: topic,
	Secret: secret,
	BatchSize: jsii.Number(100),
	 // default
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
}))

type S3EventSource

type S3EventSource interface {
	awslambda.IEventSource
	Bucket() awss3.Bucket
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(target awslambda.IFunction)
}

Use S3 bucket notifications as an event source for AWS Lambda.

Example:

import eventsources "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var fn function

bucket := s3.NewBucket(this, jsii.String("Bucket"))
fn.AddEventSource(eventsources.NewS3EventSource(bucket, &S3EventSourceProps{
	Events: []eventType{
		s3.*eventType_OBJECT_CREATED,
		s3.*eventType_OBJECT_REMOVED,
	},
	Filters: []notificationKeyFilter{
		&notificationKeyFilter{
			Prefix: jsii.String("subdir/"),
		},
	},
}))

func NewS3EventSource

func NewS3EventSource(bucket awss3.Bucket, props *S3EventSourceProps) S3EventSource

type S3EventSourceProps

type S3EventSourceProps struct {
	// The s3 event types that will trigger the notification.
	Events *[]awss3.EventType `field:"required" json:"events" yaml:"events"`
	// S3 object key filter rules to determine which objects trigger this event.
	//
	// Each filter must include a `prefix` and/or `suffix` that will be matched
	// against the s3 object key. Refer to the S3 Developer Guide for details
	// about allowed filter rules.
	Filters *[]*awss3.NotificationKeyFilter `field:"optional" json:"filters" yaml:"filters"`
}

Example:

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
var fn function

bucket := s3.NewBucket(this, jsii.String("mybucket"))

fn.AddEventSource(awscdk.NewS3EventSource(bucket, &S3EventSourceProps{
	Events: []eventType{
		s3.*eventType_OBJECT_CREATED,
		s3.*eventType_OBJECT_REMOVED,
	},
	Filters: []notificationKeyFilter{
		&notificationKeyFilter{
			Prefix: jsii.String("subdir/"),
		},
	},
}))

type S3EventSourceV2 added in v2.127.0

type S3EventSourceV2 interface {
	awslambda.IEventSource
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(target awslambda.IFunction)
}

S3EventSourceV2 Use S3 bucket notifications as an event source for AWS Lambda.

Example:

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
var fn function

bucket := s3.Bucket_FromBucketName(this, jsii.String("Bucket"), jsii.String("bucket-name"))

fn.AddEventSource(awscdk.NewS3EventSourceV2(bucket, &S3EventSourceProps{
	Events: []eventType{
		s3.*eventType_OBJECT_CREATED,
		s3.*eventType_OBJECT_REMOVED,
	},
	Filters: []notificationKeyFilter{
		&notificationKeyFilter{
			Prefix: jsii.String("subdir/"),
		},
	},
}))

func NewS3EventSourceV2 added in v2.127.0

func NewS3EventSourceV2(bucket awss3.IBucket, props *S3EventSourceProps) S3EventSourceV2

type S3OnFailureDestination added in v2.109.0

type S3OnFailureDestination interface {
	awslambda.IEventSourceDlq
	// Returns a destination configuration for the DLQ.
	Bind(_target awslambda.IEventSourceMapping, targetHandler awslambda.IFunction) *awslambda.DlqDestinationConfig
}

An S3 dead letter bucket destination configuration for a Lambda event source.

Example:

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var bucket iBucket
var myFunction function

// Your MSK cluster arn
clusterArn := "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"

// The Kafka topic you want to subscribe to
topic := "some-cool-topic"

s3OnFailureDestination := awscdk.NewS3OnFailureDestination(bucket)

myFunction.AddEventSource(awscdk.NewManagedKafkaEventSource(&ManagedKafkaEventSourceProps{
	ClusterArn: jsii.String(ClusterArn),
	Topic: jsii.String(Topic),
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
	OnFailure: s3OnFailureDestination,
}))

func NewS3OnFailureDestination added in v2.109.0

func NewS3OnFailureDestination(bucket awss3.IBucket) S3OnFailureDestination

type SelfManagedKafkaEventSource

type SelfManagedKafkaEventSource interface {
	StreamEventSource
	Props() *StreamEventSourceProps
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(target awslambda.IFunction)
	EnrichMappingOptions(options *awslambda.EventSourceMappingOptions) *awslambda.EventSourceMappingOptions
}

Use a self hosted Kafka installation as a streaming source for AWS Lambda.

Example:

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

// The secret that allows access to your self hosted Kafka cluster
var secret secret

var myFunction function

// The list of Kafka brokers
bootstrapServers := []*string{
	"kafka-broker:9092",
}

// The Kafka topic you want to subscribe to
topic := "some-cool-topic"

// (Optional) The consumer group id to use when connecting to the Kafka broker. If omitted the UUID of the event source mapping will be used.
consumerGroupId := "my-consumer-group-id"
myFunction.AddEventSource(awscdk.NewSelfManagedKafkaEventSource(&SelfManagedKafkaEventSourceProps{
	BootstrapServers: bootstrapServers,
	Topic: topic,
	ConsumerGroupId: consumerGroupId,
	Secret: secret,
	BatchSize: jsii.Number(100),
	 // default
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
}))

type SelfManagedKafkaEventSourceProps

type SelfManagedKafkaEventSourceProps struct {
	// Where to begin consuming the stream.
	StartingPosition awslambda.StartingPosition `field:"required" json:"startingPosition" yaml:"startingPosition"`
	// The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
	//
	// Your function receives an
	// event with all the retrieved records.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of:
	//   * 1000 for `DynamoEventSource`
	// * 10000 for `KinesisEventSource`, `ManagedKafkaEventSource` and `SelfManagedKafkaEventSource`.
	// Default: 100.
	//
	BatchSize *float64 `field:"optional" json:"batchSize" yaml:"batchSize"`
	// If the stream event source mapping should be enabled.
	// Default: true.
	//
	Enabled *bool `field:"optional" json:"enabled" yaml:"enabled"`
	// The maximum amount of time to gather records before invoking the function.
	//
	// Maximum of Duration.minutes(5).
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching
	//
	// Default: - Duration.seconds(0) for Kinesis, DynamoDB, and SQS event sources, Duration.millis(500) for MSK, self-managed Kafka, and Amazon MQ.
	//
	MaxBatchingWindow awscdk.Duration `field:"optional" json:"maxBatchingWindow" yaml:"maxBatchingWindow"`
	// The Kafka topic to subscribe to.
	Topic *string `field:"required" json:"topic" yaml:"topic"`
	// The identifier for the Kafka consumer group to join.
	//
	// The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value.  The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id
	//
	// Default: - none.
	//
	ConsumerGroupId *string `field:"optional" json:"consumerGroupId" yaml:"consumerGroupId"`
	// Add filter criteria to Event Source.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html
	//
	// Default: - none.
	//
	Filters *[]*map[string]interface{} `field:"optional" json:"filters" yaml:"filters"`
	// Add an on Failure Destination for this Kafka event.
	//
	// SNS/SQS/S3 are supported.
	// Default: - discarded records are ignored.
	//
	OnFailure awslambda.IEventSourceDlq `field:"optional" json:"onFailure" yaml:"onFailure"`
	// The secret with the Kafka credentials, see https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html for details This field is required if your Kafka brokers are accessed over the Internet.
	// Default: none.
	//
	Secret awssecretsmanager.ISecret `field:"optional" json:"secret" yaml:"secret"`
	// The list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.
	//
	// They are in the format `abc.xyz.com:xxxx`.
	BootstrapServers *[]*string `field:"required" json:"bootstrapServers" yaml:"bootstrapServers"`
	// The authentication method for your Kafka cluster.
	// Default: AuthenticationMethod.SASL_SCRAM_512_AUTH
	//
	AuthenticationMethod AuthenticationMethod `field:"optional" json:"authenticationMethod" yaml:"authenticationMethod"`
	// The secret with the root CA certificate used by your Kafka brokers for TLS encryption This field is required if your Kafka brokers use certificates signed by a private CA.
	// Default: - none.
	//
	RootCACertificate awssecretsmanager.ISecret `field:"optional" json:"rootCACertificate" yaml:"rootCACertificate"`
	// If your Kafka brokers are only reachable via VPC, provide the security group here.
	// Default: - none, required if setting vpc.
	//
	SecurityGroup awsec2.ISecurityGroup `field:"optional" json:"securityGroup" yaml:"securityGroup"`
	// If your Kafka brokers are only reachable via VPC provide the VPC here.
	// Default: none.
	//
	Vpc awsec2.IVpc `field:"optional" json:"vpc" yaml:"vpc"`
	// If your Kafka brokers are only reachable via VPC, provide the subnets selection here.
	// Default: - none, required if setting vpc.
	//
	VpcSubnets *awsec2.SubnetSelection `field:"optional" json:"vpcSubnets" yaml:"vpcSubnets"`
}

Properties for a self managed Kafka cluster event source.

If your Kafka cluster is only reachable via VPC make sure to configure it.

Example:

import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

// The secret that allows access to your self hosted Kafka cluster
var secret secret

var myFunction function

// The list of Kafka brokers
bootstrapServers := []*string{
	"kafka-broker:9092",
}

// The Kafka topic you want to subscribe to
topic := "some-cool-topic"

// (Optional) The consumer group id to use when connecting to the Kafka broker. If omitted the UUID of the event source mapping will be used.
consumerGroupId := "my-consumer-group-id"
myFunction.AddEventSource(awscdk.NewSelfManagedKafkaEventSource(&SelfManagedKafkaEventSourceProps{
	BootstrapServers: bootstrapServers,
	Topic: topic,
	ConsumerGroupId: consumerGroupId,
	Secret: secret,
	BatchSize: jsii.Number(100),
	 // default
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
}))

type SnsDlq

type SnsDlq interface {
	awslambda.IEventSourceDlq
	// Returns a destination configuration for the DLQ.
	Bind(_target awslambda.IEventSourceMapping, targetHandler awslambda.IFunction) *awslambda.DlqDestinationConfig
}

An SNS dead letter queue destination configuration for a Lambda event source.

Example:

// The code below shows an example of how to instantiate this type.
// The values are placeholders you should change.
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var topic topic

snsDlq := awscdk.Aws_lambda_event_sources.NewSnsDlq(topic)

func NewSnsDlq

func NewSnsDlq(topic awssns.ITopic) SnsDlq

type SnsEventSource

type SnsEventSource interface {
	awslambda.IEventSource
	Topic() awssns.ITopic
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(target awslambda.IFunction)
}

Use an Amazon SNS topic as an event source for AWS Lambda.

Example:

import sns "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var topic topic

var fn function

deadLetterQueue := sqs.NewQueue(this, jsii.String("deadLetterQueue"))
fn.AddEventSource(awscdk.NewSnsEventSource(topic, &SnsEventSourceProps{
	FilterPolicy: map[string]interface{}{
	},
	DeadLetterQueue: deadLetterQueue,
}))

func NewSnsEventSource

func NewSnsEventSource(topic awssns.ITopic, props *SnsEventSourceProps) SnsEventSource

type SnsEventSourceProps

type SnsEventSourceProps struct {
	// Queue to be used as dead letter queue.
	//
	// If not passed no dead letter queue is enabled.
	// Default: - No dead letter queue enabled.
	//
	DeadLetterQueue awssqs.IQueue `field:"optional" json:"deadLetterQueue" yaml:"deadLetterQueue"`
	// The filter policy.
	// Default: - all messages are delivered.
	//
	FilterPolicy *map[string]awssns.SubscriptionFilter `field:"optional" json:"filterPolicy" yaml:"filterPolicy"`
	// The filter policy that is applied on the message body.
	//
	// To apply a filter policy to the message attributes, use `filterPolicy`. A maximum of one of `filterPolicyWithMessageBody` and `filterPolicy` may be used.
	// Default: - all messages are delivered.
	//
	FilterPolicyWithMessageBody *map[string]awssns.FilterOrPolicy `field:"optional" json:"filterPolicyWithMessageBody" yaml:"filterPolicyWithMessageBody"`
}

Properties forwarded to the Lambda Subscription.

Example:

import sns "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var topic topic

var fn function

deadLetterQueue := sqs.NewQueue(this, jsii.String("deadLetterQueue"))
fn.AddEventSource(awscdk.NewSnsEventSource(topic, &SnsEventSourceProps{
	FilterPolicy: map[string]interface{}{
	},
	DeadLetterQueue: deadLetterQueue,
}))

type SqsDlq

type SqsDlq interface {
	awslambda.IEventSourceDlq
	// Returns a destination configuration for the DLQ.
	Bind(_target awslambda.IEventSourceMapping, targetHandler awslambda.IFunction) *awslambda.DlqDestinationConfig
}

An SQS dead letter queue destination configuration for a Lambda event source.

Example:

import dynamodb "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var table table

var fn function

deadLetterQueue := sqs.NewQueue(this, jsii.String("deadLetterQueue"))
fn.AddEventSource(awscdk.NewDynamoEventSource(table, &DynamoEventSourceProps{
	StartingPosition: lambda.StartingPosition_TRIM_HORIZON,
	BatchSize: jsii.Number(5),
	BisectBatchOnError: jsii.Boolean(true),
	OnFailure: awscdk.NewSqsDlq(deadLetterQueue),
	RetryAttempts: jsii.Number(10),
}))

func NewSqsDlq

func NewSqsDlq(queue awssqs.IQueue) SqsDlq

type SqsEventSource

type SqsEventSource interface {
	awslambda.IEventSource
	// The ARN for this EventSourceMapping.
	EventSourceMappingArn() *string
	// The identifier for this EventSourceMapping.
	EventSourceMappingId() *string
	Queue() awssqs.IQueue
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(target awslambda.IFunction)
}

Use an Amazon SQS queue as an event source for AWS Lambda.

Example:

import eventsources "github.com/aws/aws-cdk-go/awscdk"
import sqs "github.com/aws/aws-cdk-go/awscdk"

var fn function

queue := sqs.NewQueue(this, jsii.String("Queue"))
fn.AddEventSource(eventsources.NewSqsEventSource(queue))

func NewSqsEventSource

func NewSqsEventSource(queue awssqs.IQueue, props *SqsEventSourceProps) SqsEventSource

type SqsEventSourceProps

type SqsEventSourceProps struct {
	// The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
	//
	// Your function receives an
	// event with all the retrieved records.
	//
	// Valid Range: Minimum value of 1. Maximum value of 10.
	// If `maxBatchingWindow` is configured, this value can go up to 10,000.
	// Default: 10.
	//
	BatchSize *float64 `field:"optional" json:"batchSize" yaml:"batchSize"`
	// If the SQS event source mapping should be enabled.
	// Default: true.
	//
	Enabled *bool `field:"optional" json:"enabled" yaml:"enabled"`
	// Add filter criteria option.
	// Default: - None.
	//
	Filters *[]*map[string]interface{} `field:"optional" json:"filters" yaml:"filters"`
	// The maximum amount of time to gather records before invoking the function.
	//
	// Valid Range: Minimum value of 0 minutes. Maximum value of 5 minutes.
	// Default: - no batching window. The lambda function will be invoked immediately with the records that are available.
	//
	MaxBatchingWindow awscdk.Duration `field:"optional" json:"maxBatchingWindow" yaml:"maxBatchingWindow"`
	// The maximum concurrency setting limits the number of concurrent instances of the function that an Amazon SQS event source can invoke.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency
	//
	// Valid Range: Minimum value of 2. Maximum value of 1000.
	//
	// Default: - No specific limit.
	//
	MaxConcurrency *float64 `field:"optional" json:"maxConcurrency" yaml:"maxConcurrency"`
	// Allow functions to return partially successful responses for a batch of records.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
	//
	// Default: false.
	//
	ReportBatchItemFailures *bool `field:"optional" json:"reportBatchItemFailures" yaml:"reportBatchItemFailures"`
}

Example:

import "github.com/aws/aws-cdk-go/awscdk"
var fn function

queue := sqs.NewQueue(this, jsii.String("MyQueue"), &QueueProps{
	VisibilityTimeout: awscdk.Duration_Seconds(jsii.Number(30)),
})

fn.AddEventSource(awscdk.NewSqsEventSource(queue, &SqsEventSourceProps{
	BatchSize: jsii.Number(10),
	 // default
	MaxBatchingWindow: awscdk.Duration_Minutes(jsii.Number(5)),
	ReportBatchItemFailures: jsii.Boolean(true),
}))

type StreamEventSource

type StreamEventSource interface {
	awslambda.IEventSource
	Props() *StreamEventSourceProps
	// Called by `lambda.addEventSource` to allow the event source to bind to this function.
	Bind(_target awslambda.IFunction)
	EnrichMappingOptions(options *awslambda.EventSourceMappingOptions) *awslambda.EventSourceMappingOptions
}

Use an stream as an event source for AWS Lambda.

type StreamEventSourceProps

type StreamEventSourceProps struct {
	// Where to begin consuming the stream.
	StartingPosition awslambda.StartingPosition `field:"required" json:"startingPosition" yaml:"startingPosition"`
	// The largest number of records that AWS Lambda will retrieve from your event source at the time of invoking your function.
	//
	// Your function receives an
	// event with all the retrieved records.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of:
	//   * 1000 for `DynamoEventSource`
	// * 10000 for `KinesisEventSource`, `ManagedKafkaEventSource` and `SelfManagedKafkaEventSource`.
	// Default: 100.
	//
	BatchSize *float64 `field:"optional" json:"batchSize" yaml:"batchSize"`
	// If the stream event source mapping should be enabled.
	// Default: true.
	//
	Enabled *bool `field:"optional" json:"enabled" yaml:"enabled"`
	// The maximum amount of time to gather records before invoking the function.
	//
	// Maximum of Duration.minutes(5).
	// See: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching
	//
	// Default: - Duration.seconds(0) for Kinesis, DynamoDB, and SQS event sources, Duration.millis(500) for MSK, self-managed Kafka, and Amazon MQ.
	//
	MaxBatchingWindow awscdk.Duration `field:"optional" json:"maxBatchingWindow" yaml:"maxBatchingWindow"`
	// If the function returns an error, split the batch in two and retry.
	// Default: false.
	//
	BisectBatchOnError *bool `field:"optional" json:"bisectBatchOnError" yaml:"bisectBatchOnError"`
	// Add filter criteria option.
	// Default: - None.
	//
	Filters *[]*map[string]interface{} `field:"optional" json:"filters" yaml:"filters"`
	// The maximum age of a record that Lambda sends to a function for processing.
	//
	// Valid Range:
	// * Minimum value of 60 seconds
	// * Maximum value of 7 days
	//
	// The default value is -1, which sets the maximum age to infinite.
	// When the value is set to infinite, Lambda never discards old records.
	// Record are valid until it expires in the event source.
	// Default: -1.
	//
	MaxRecordAge awscdk.Duration `field:"optional" json:"maxRecordAge" yaml:"maxRecordAge"`
	// An Amazon SQS queue or Amazon SNS topic destination for discarded records.
	// Default: - discarded records are ignored.
	//
	OnFailure awslambda.IEventSourceDlq `field:"optional" json:"onFailure" yaml:"onFailure"`
	// The number of batches to process from each shard concurrently.
	//
	// Valid Range:
	// * Minimum value of 1
	// * Maximum value of 10.
	// Default: 1.
	//
	ParallelizationFactor *float64 `field:"optional" json:"parallelizationFactor" yaml:"parallelizationFactor"`
	// Allow functions to return partially successful responses for a batch of records.
	// See: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting
	//
	// Default: false.
	//
	ReportBatchItemFailures *bool `field:"optional" json:"reportBatchItemFailures" yaml:"reportBatchItemFailures"`
	// Maximum number of retry attempts Valid Range: * Minimum value of 0 * Maximum value of 10000.
	//
	// The default value is -1, which sets the maximum number of retries to infinite.
	// When MaximumRetryAttempts is infinite, Lambda retries failed records until
	// the record expires in the event source.
	// Default: -1.
	//
	RetryAttempts *float64 `field:"optional" json:"retryAttempts" yaml:"retryAttempts"`
	// The size of the tumbling windows to group records sent to DynamoDB or Kinesis Valid Range: 0 - 15 minutes.
	// Default: - None.
	//
	TumblingWindow awscdk.Duration `field:"optional" json:"tumblingWindow" yaml:"tumblingWindow"`
}

The set of properties for streaming event sources shared by Dynamo and Kinesis.

Example:

// The code below shows an example of how to instantiate this type.
// The values are placeholders you should change.
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var eventSourceDlq iEventSourceDlq
var filters interface{}

streamEventSourceProps := &StreamEventSourceProps{
	StartingPosition: awscdk.Aws_lambda.StartingPosition_TRIM_HORIZON,

	// the properties below are optional
	BatchSize: jsii.Number(123),
	BisectBatchOnError: jsii.Boolean(false),
	Enabled: jsii.Boolean(false),
	Filters: []map[string]interface{}{
		map[string]interface{}{
			"filtersKey": filters,
		},
	},
	MaxBatchingWindow: cdk.Duration_Minutes(jsii.Number(30)),
	MaxRecordAge: cdk.Duration_*Minutes(jsii.Number(30)),
	OnFailure: eventSourceDlq,
	ParallelizationFactor: jsii.Number(123),
	ReportBatchItemFailures: jsii.Boolean(false),
	RetryAttempts: jsii.Number(123),
	TumblingWindow: cdk.Duration_*Minutes(jsii.Number(30)),
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL