s32cs

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2021 License: MIT Imports: 23 Imported by: 0

README

s32cs

Amazon CloudSearch document uploader via S3 event notification.

Usage

Example Lambda functions configuration.

{
  "FunctionName": "s32cs",
  "Description": "Amazon Cloudsearch uploader via S3 event notification.",
  "Environment": {
    "Variables": {
      "ENDPOINT": "<YOUR CloudSearch document default endpoint",
      "KEY_REGEXP": "Regexp to extract an endpoint from S3 object key"
    }
  },
  "Handler": "s32cs",
  "MemorySize": 128,
  "Role": "<YOUR role ARN>",
  "Runtime": "provided.al2",
  "Timeout": 60
}

Configure your S3 bucket. Set an event notification to the Lambda function.

Example
{
  "FunctionName": "s32cs",
  "Description": ""
  "Environment": {
    "Variables": {
      "ENDPOINT": "example-nregueirgrehbuigre.ap-northeast-1.cloudsearch.amazonaws.com",
      "KEY_REGEXP": "example/(.+?)/"
    }
  },
  "Handler": "s32cs",
  "MemorySize": 128,
  "Role": "arn:aws:iam::xxxxxxxxxxxx:role/s32cs_lambda_function",
  "Runtime": "provided.al2",
  "Timeout": 60
}

This configuration works as below.

  • S3
    1. Event notification invokes the function.
  • Lambda
    1. Read notified objects from S3.
    2. Convert the object (line delimitered JSON) to SDF.
    3. Upload SDF to CloudSearch.
    • endpoint is determined by ENDPOINT environment value(default) or extract from a object key by KEY_REGEXP.

Source object file format

Line delimitered JSON only.

{"id":"123","type":"add","fields":{"foo":"bar","bar":["A","B"]}}
{"id":"123","type":"delete"}

id, type (add or delete) columns are required.

Using with Dead Letter Queue(DLQ)

If Lambda use DLQ (SQS), s32cs also works by jobs from SQS.

When s32cs invoked with payload like {"queue_url:""}, s32cs will fetch jobs from the SQS and upload.

// eample payload
{"queue_url":"https://sqs.ap-northeast-1.amazonaws.com/xxxxxxx/upload"}

You can this invocation periodically by CloudWatch Events scheduled jobs.

Requirements

  • Go

LICENSE

The MIT License (MIT)

Copyright (c) 2017 FUJIWARA Shunichiro / (c) 2017 KAYAC Inc.

Documentation

Index

Constants

View Source
const MaxUploadSize = 5 * 1024 * 1024

Variables

View Source
var (
	DEBUG = false
)
View Source
var InvalidChars = regexp.MustCompile("[^\u0009\u000a\u000d\u0020-\uD7FF\uE000-\uFFFD]")

http://docs.aws.amazon.com/cloudsearch/latest/developerguide/preparing-data.html

Functions

func Run added in v1.2.0

func Run()

Types

type Buffer

type Buffer struct {
	bytes.Buffer
}

func NewBuffer

func NewBuffer() *Buffer

func (*Buffer) Close

func (b *Buffer) Close()

func (*Buffer) Init

func (b *Buffer) Init()

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(sess *session.Session, endpoint string, reg *regexp.Regexp) *Client

func (*Client) BuildAndFlush

func (c *Client) BuildAndFlush(src io.Reader, flush Flusher) error

func (*Client) Process

func (c *Client) Process(event S3Event) error

func (*Client) ProcessSQS

func (d *Client) ProcessSQS(queueURL string) error

func (*Client) Upload

func (c *Client) Upload(src io.Reader, endpoint string) error

type Flusher

type Flusher func(*Buffer) error

type S3Event

type S3Event struct {
	Records []S3EventRecord `json:"Records"`
}

func (S3Event) String

func (e S3Event) String() string

type S3EventRecord added in v1.0.4

type S3EventRecord struct {
	EventVersion string    `json:"eventVersion"`
	EventSource  string    `json:"eventSource"`
	AwsRegion    string    `json:"awsRegion"`
	EventTime    time.Time `json:"eventTime"`
	EventName    string    `json:"eventName"`
	UserIdentity struct {
		PrincipalID string `json:"principalId"`
	} `json:"userIdentity"`
	RequestParameters struct {
		SourceIPAddress string `json:"sourceIPAddress"`
	} `json:"requestParameters"`
	ResponseElements struct {
		XAmzRequestID string `json:"x-amz-request-id"`
		XAmzID2       string `json:"x-amz-id-2"`
	} `json:"responseElements"`
	S3 struct {
		S3SchemaVersion string `json:"s3SchemaVersion"`
		ConfigurationID string `json:"configurationId"`
		Bucket          struct {
			Name          string `json:"name"`
			OwnerIdentity struct {
				PrincipalID string `json:"principalId"`
			} `json:"ownerIdentity"`
			Arn string `json:"arn"`
		} `json:"bucket"`
		Object struct {
			Key       string `json:"key"`
			Size      int    `json:"size"`
			ETag      string `json:"eTag"`
			VersionID string `json:"versionId"`
			Sequencer string `json:"sequencer"`
		} `json:"object"`
	} `json:"s3"`
}

func (S3EventRecord) Parse added in v1.0.4

func (r S3EventRecord) Parse() (bucket, key string, err error)

type SDFRecord

type SDFRecord struct {
	ID     string                 `json:"id"`
	Type   string                 `json:"type"`
	Fields map[string]interface{} `json:"fields,omitempty"`
}

func (*SDFRecord) Validate

func (r *SDFRecord) Validate() error

type SQSEvent

type SQSEvent struct {
	QueueURL string `json:"queue_url"`
}

Directories

Path Synopsis
cmd
functions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL