kinesis

package module
v0.0.0-...-8de9069 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2019 License: MIT Imports: 18 Imported by: 31

README

go-kinesis

Build Status

GO-lang library for AWS Kinesis API.

Documentation

Example

Example you can find in folder examples.

Command line interface

You can find a tool for interacting with kinesis from the command line in folder kinesis-cli.

Testing

Local Kinesis Server

The tests require a local Kinesis server such as Kinesalite to be running and reachable at http://127.0.0.1:4567.

To make the tests complete faster, you might want to have Kinesalite perform stream creation and deletion faster than the default of 500ms, like so:

kinesalite --createStreamMs 5 --deleteStreamMs 5 &

The & runs Kinesalite in the background, which is probably what you want.

go test

Some of the tests are marked as safe to be run in parallel, so to speed up test execution you might want to run go test with the -parallel n flag.

Documentation

Overview

Package kinesis provide GOlang API for http://aws.amazon.com/kinesis/

Index

Constants

View Source
const (
	AWSMetadataServer = "169.254.169.254"
	AWSIAMCredsPath   = "/latest/meta-data/iam/security-credentials"
	AWSIAMCredsURL    = "http://" + AWSMetadataServer + "/" + AWSIAMCredsPath
)
View Source
const (
	AccessEnvKey        = "AWS_ACCESS_KEY"
	AccessEnvKeyId      = "AWS_ACCESS_KEY_ID"
	SecretEnvKey        = "AWS_SECRET_KEY"
	SecretEnvAccessKey  = "AWS_SECRET_ACCESS_KEY"
	SecurityTokenEnvKey = "AWS_SECURITY_TOKEN"
)
View Source
const (
	ActionKey     = "Action"
	RegionEnvName = "AWS_REGION_NAME"

	// Regions
	USEast1      = "us-east-1"
	USWest2      = "us-west-2"
	EUWest1      = "eu-west-1"
	EUCentral1   = "eu-central-1"
	APSouthEast1 = "ap-southeast-1"
	APSouthEast2 = "ap-southeast-2"
	APNortheast1 = "ap-northeast-1"

	KinesisVersion  = "20131202"
	FirehoseVersion = "20150804"
)
View Source
const (
	AWS4_URL = "aws4_request"
)
View Source
const (
	AWSSecurityTokenHeader = "X-Amz-Security-Token"
)

Variables

This section is empty.

Functions

func NewRegionFromEnv

func NewRegionFromEnv() string

NewRegionFromEnv creates a region from the an expected environment variable

func Sign

func Sign(authKeys Auth, r *http.Request) error

Sign signs a request with a Service derived from r.Host

Types

type Auth

type Auth interface {
	// KeyForSigning return an access key / secret / token appropriate for signing at time now,
	// which as the name suggests, is usually now.
	KeyForSigning(now time.Time) (*SigningKey, error)
}

Auth interface for authentication credentials and information

func NewAuth

func NewAuth(accessKey, secretKey, token string) Auth

NewAuth creates return an auth object that uses static credentials which do not automatically renew.

func NewAuthFromEnv

func NewAuthFromEnv() (Auth, error)

NewAuthFromEnv retrieves auth credentials from environment vars

func NewAuthFromMetadata

func NewAuthFromMetadata() (Auth, error)

NewAuthFromMetadata retrieves auth credentials from the metadata server. If an IAM role is associated with the instance we are running on, the metadata server will expose credentials for that role under a known endpoint.

TODO: specify custom network (connect, read) timeouts, else this will block for the default timeout durations.

func NewAuthWithAssumedRole

func NewAuthWithAssumedRole(roleArn, sessionName, region string, stsAuth Auth) (Auth, error)

NewAuthWithAssumedRole will call STS in a given region to assume a role stsAuth object is used to authenticate to STS to fetch temporary credentials for the desired role.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is like http.Client, but signs all requests using Auth.

func NewClient

func NewClient(auth Auth) *Client

NewClient creates a new Client that uses the credentials in the specified Auth object.

This function assumes the Auth object has been sanely initialized. If you wish to infer auth credentials from the environment, refer to NewAuth

func NewClientWithHTTPClient

func NewClientWithHTTPClient(auth Auth, httpClient *http.Client) *Client

NewClientWithHTTPClient creates a client with a non-default http client ie. a timeout could be set on the HTTP client to timeout if Kinesis doesn't response in a timely manner like after the 5 minute mark where the current shard iterator expires

func (*Client) Do

func (c *Client) Do(req *http.Request) (*http.Response, error)

Do some request, but sign it before sending

type DescribeDeliveryStreamResp

type DescribeDeliveryStreamResp struct {
	DeliveryStreamDescription struct {
		CreateTimestamp      float32
		DeliveryStreamARN    string
		DeliveryStreamName   string
		DeliveryStreamStatus string
		Destinations         []DestinationsResp
		HasMoreDestinations  bool
		LastUpdatedTimestamp int
		VersionId            string
	}
}

DescribeDeliveryStreamResp stores the information that provides by the Firehose DescribeDeliveryStream API call

type DescribeStreamResp

type DescribeStreamResp struct {
	StreamDescription struct {
		HasMoreShards bool
		Shards        []DescribeStreamShards
		StreamARN     string
		StreamName    string
		StreamStatus  string
	}
}

DescribeStreamResp stores the information that provides by DescribeStream API call

type DescribeStreamShards

type DescribeStreamShards struct {
	AdjacentParentShardId string
	HashKeyRange          struct {
		EndingHashKey   string
		StartingHashKey string
	}
	ParentShardId       string
	SequenceNumberRange struct {
		EndingSequenceNumber   string
		StartingSequenceNumber string
	}
	ShardId string
}

DescribeStreamShards stores the information about list of shards inside DescribeStreamResp

type DestinationsResp

type DestinationsResp struct {
	DestinationId                  string
	RedshiftDestinationDescription RedshiftDestinationDescriptionResp
	S3DestinationDescription       S3DestinationDescriptionResp
}

type Error

type Error struct {
	// HTTP status code (200, 403, ...)
	StatusCode int
	// error code ("UnsupportedOperation", ...)
	Code string
	// The human-oriented error message
	Message   string
	RequestId string
}

Error represent error from Kinesis API

func (*Error) Error

func (err *Error) Error() string

Error returns error message from error object

type GetRecordsRecords

type GetRecordsRecords struct {
	ApproximateArrivalTimestamp float64
	Data                        []byte
	PartitionKey                string
	SequenceNumber              string
}

GetNextRecordsRecords stores the information that provides by GetNextRecordsResp

func (GetRecordsRecords) GetData

func (r GetRecordsRecords) GetData() []byte

type GetRecordsResp

type GetRecordsResp struct {
	MillisBehindLatest int64
	NextShardIterator  string
	Records            []GetRecordsRecords
}

GetNextRecordsResp stores the information that provides by GetNextRecords API call

type GetShardIteratorResp

type GetShardIteratorResp struct {
	ShardIterator string
}

GetShardIteratorResp stores the information that provides by GetShardIterator API call

type Kinesis

type Kinesis struct {
	// contains filtered or unexported fields
}

Structure for kinesis client

func New

func New(auth Auth, region string) *Kinesis

New returns an initialized AWS Kinesis client using the canonical live “production” endpoint for AWS Kinesis, i.e. https://kinesis.{region}.amazonaws.com

func NewWithClient

func NewWithClient(region string, client *Client) *Kinesis

NewWithClient returns an initialized AWS Kinesis client using the canonical live “production” endpoint for AWS Kinesis, i.e. https://kinesis.{region}.amazonaws.com but with the ability to create a custom client with specific configurations like a timeout

func NewWithEndpoint

func NewWithEndpoint(auth Auth, region, endpoint string) *Kinesis

NewWithEndpoint returns an initialized AWS Kinesis client using the specified endpoint. This is generally useful for testing, so a local Kinesis server can be used.

func (*Kinesis) CreateStream

func (kinesis *Kinesis) CreateStream(StreamName string, ShardCount int) error

CreateStream adds a new Amazon Kinesis stream to your AWS account StreamName is a name of stream, ShardCount is number of shards more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html

func (*Kinesis) DeleteStream

func (kinesis *Kinesis) DeleteStream(StreamName string) error

DeleteStream deletes a stream and all of its shards and data from your AWS account StreamName is a name of stream more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html

func (*Kinesis) DescribeStream

func (kinesis *Kinesis) DescribeStream(args *RequestArgs) (resp *DescribeStreamResp, err error)

DescribeStream returns the following information about the stream: the current status of the stream, the stream Amazon Resource Name (ARN), and an array of shard objects that comprise the stream. For each shard object there is information about the hash key and sequence number ranges that the shard spans, and the IDs of any earlier shards that played in a role in a MergeShards or SplitShard operation that created the shard more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html

func (*Kinesis) Firehose

func (k *Kinesis) Firehose()

func (*Kinesis) GetRecords

func (kinesis *Kinesis) GetRecords(args *RequestArgs) (resp *GetRecordsResp, err error)

GetRecords returns one or more data records from a shard more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html

func (*Kinesis) GetShardIterator

func (kinesis *Kinesis) GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error)

GetShardIterator returns a shard iterator more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

func (*Kinesis) ListStreams

func (kinesis *Kinesis) ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error)

ListStreams returns an array of the names of all the streams that are associated with the AWS account making the ListStreams request more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html

func (*Kinesis) MergeShards

func (kinesis *Kinesis) MergeShards(args *RequestArgs) error

MergeShards merges two adjacent shards in a stream and combines them into a single shard to reduce the stream's capacity to ingest and transport data more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html

func (*Kinesis) PutRecord

func (kinesis *Kinesis) PutRecord(args *RequestArgs) (resp *PutRecordResp, err error)

PutRecord puts a data record into an Amazon Kinesis stream from a producer. args must contain a single record added with AddRecord. More info: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

func (*Kinesis) PutRecords

func (kinesis *Kinesis) PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error)

PutRecords puts multiple data records into an Amazon Kinesis stream from a producer more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html

func (*Kinesis) SplitShard

func (kinesis *Kinesis) SplitShard(args *RequestArgs) error

SplitShard splits a shard into two new shards in the stream, to increase the stream's capacity to ingest and transport data more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_SplitShard.html

type KinesisClient

type KinesisClient interface {
	CreateStream(StreamName string, ShardCount int) error
	DeleteStream(StreamName string) error
	DescribeStream(args *RequestArgs) (resp *DescribeStreamResp, err error)
	DescribeDeliveryStream(args *RequestArgs) (resp *DescribeDeliveryStreamResp, err error)
	GetRecords(args *RequestArgs) (resp *GetRecordsResp, err error)
	GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error)
	ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error)
	MergeShards(args *RequestArgs) error
	PutRecord(args *RequestArgs) (resp *PutRecordResp, err error)
	PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error)
	PutRecordBatch(args *RequestArgs) (resp *PutRecordBatchResp, err error)
	SplitShard(args *RequestArgs) error
}

KinesisClient interface implemented by Kinesis

type ListStreamsResp

type ListStreamsResp struct {
	HasMoreStreams bool
	StreamNames    []string
}

ListStreamsResp stores the information that provides by ListStreams API call

type PutRecordBatchResp

type PutRecordBatchResp struct {
	FailedPutCount   int
	RequestResponses []PutRecordBatchResponses
}

PutRecordBatchResp stores the information that provides by PutRecordBatch API call

type PutRecordBatchResponses

type PutRecordBatchResponses struct {
	ErrorCode    string
	ErrorMessage string
	RecordId     string
}

RecordBatchResponses stores individual Record information provided by PutRecordBatch API call

type PutRecordResp

type PutRecordResp struct {
	SequenceNumber string
	ShardId        string
}

PutRecordResp stores the information that provides by PutRecord API call

type PutRecordsResp

type PutRecordsResp struct {
	FailedRecordCount int
	Records           []PutRecordsRespRecord
}

PutRecordsResp stores the information that provides by PutRecord API call

type PutRecordsRespRecord

type PutRecordsRespRecord struct {
	ErrorCode      string
	ErrorMessage   string
	SequenceNumber string
	ShardId        string
}

RecordResp stores individual Record information provided by PutRecords API call

type Record

type Record struct {
	Data         []byte
	PartitionKey string
}

Record stores the Data and PartitionKey for PutRecord or PutRecords calls to Kinesis API

type RedshiftDestinationDescriptionResp

type RedshiftDestinationDescriptionResp struct {
	ClusterJDBCURL string
	CopyCommand    struct {
		CopyOptions      string
		DataTableColumns string
		DataTableName    string
	}
	RoleARN                  string
	S3DestinationDescription S3DestinationDescriptionResp
	Username                 string
}

type RequestArgs

type RequestArgs struct {
	Records []Record
	// contains filtered or unexported fields
}

RequestArgs store params for request

func NewArgs

func NewArgs() *RequestArgs

NewArgs creates a new Filter.

func (*RequestArgs) Add

func (f *RequestArgs) Add(name string, value interface{})

Add appends a filtering parameter with the given name and value(s).

func (*RequestArgs) AddData

func (f *RequestArgs) AddData(value []byte)

func (*RequestArgs) AddRecord

func (f *RequestArgs) AddRecord(value []byte, partitionKey string)

AddRecord adds data and partition for sending multiple Records to Kinesis in one API call

type S3DestinationDescriptionResp

type S3DestinationDescriptionResp struct {
	BucketARN      string
	BufferingHints struct {
		IntervalInSeconds int
		SizeInMBs         int
	}
	CompressionFormat       string
	EncryptionConfiguration struct {
		KMSEncryptionConfig struct {
			AWSKMSKeyARN string
		}
		NoEncryptionConfig string
	}
	Prefix  string
	RoleARN string
}

type Service

type Service struct {
	// Name is the name of the service being used (i.e. iam, etc)
	Name string

	// Region is the region you want to communicate with the service through. (i.e. us-east-1)
	Region string
}

Service represents an AWS-compatible service.

func (*Service) Sign

func (s *Service) Sign(authKeys Auth, r *http.Request) error

Sign signs an HTTP request with the given AWS keys for use on service s.

type SigningKey

type SigningKey struct {
	AccessKeyId     string
	SecretAccessKey string
	SessionToken    string
}

SigningKey returns a set of data needed for signing

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL