kinesis

package module
v0.0.0-...-94552da Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2022 License: MIT Imports: 15 Imported by: 1

README

go-kinesis

Build Status

GO-lang library for AWS Kinesis API.

Documentation

Example

Example you can find in folder examples.

Command line interface

You can find a tool for interacting with kinesis from the command line in folder kinesis-cli.

Testing

Local Kinesis Server

The tests require a local Kinesis server such as Kinesalite to be running and reachable at http://127.0.0.1:4567.

To make the tests complete faster, you might want to have Kinesalite perform stream creation and deletion faster than the default of 500ms, like so:

kinesalite --createStreamMs 5 --deleteStreamMs 5 &

The & runs Kinesalite in the background, which is probably what you want.

go test

Some of the tests are marked as safe to be run in parallel, so to speed up test execution you might want to run go test with the -parallel n flag.

Documentation

Overview

Package kinesis provide GOlang API for http://aws.amazon.com/kinesis/

Index

Constants

View Source
const (
	AccessEnvKey       = "AWS_ACCESS_KEY"
	AccessEnvKeyId     = "AWS_ACCESS_KEY_ID"
	SecretEnvKey       = "AWS_SECRET_KEY"
	SecretEnvAccessKey = "AWS_SECRET_ACCESS_KEY"

	AWSMetadataServer = "169.254.169.254"
	AWSIAMCredsPath   = "/latest/meta-data/iam/security-credentials"
	AWSIAMCredsURL    = "http://" + AWSMetadataServer + "/" + AWSIAMCredsPath
)
View Source
const (
	ActionKey     = "Action"
	RegionEnvName = "AWS_REGION_NAME"

	// Regions
	USEast1      = "us-east-1"
	USWest2      = "us-west-2"
	EUWest1      = "eu-west-1"
	EUCentral1   = "eu-central-1"
	APSouthEast1 = "ap-southeast-1"
	APSouthEast2 = "ap-southeast-2"
	APNortheast1 = "ap-northeast-1"
)
View Source
const (
	AWS4_URL = "aws4_request"
)
View Source
const AWSSecurityTokenHeader = "X-Amz-Security-Token"

Variables

This section is empty.

Functions

func NewRegionFromEnv

func NewRegionFromEnv() string

NewRegionFromEnv creates a region from the an expected environment variable

func Sign

func Sign(authKeys Auth, r *http.Request) error

Sign signs a request with a Service derived from r.Host

Types

type Auth

type Auth interface {
	GetToken() string
	GetExpiration() time.Time
	GetSecretKey() string
	GetAccessKey() string
	HasExpiration() bool
	Renew() error
	Sign(*Service, time.Time) []byte
}

Auth interface for authentication credentials and information

type AuthCredentials

type AuthCredentials struct {
	// contains filtered or unexported fields
}

AuthCredentials holds the AWS credentials and metadata

func NewAuth

func NewAuth(accessKey, secretKey string) *AuthCredentials

NewAuth creates a *AuthCredentials struct that adheres to the Auth interface to dynamically retrieve AWS credentials

func NewAuthFromEnv

func NewAuthFromEnv() (*AuthCredentials, error)

NewAuthFromEnv retrieves auth credentials from environment vars

func NewAuthFromMetadata

func NewAuthFromMetadata() (*AuthCredentials, error)

NewAuthFromMetadata retrieves auth credentials from the metadata server. If an IAM role is associated with the instance we are running on, the metadata server will expose credentials for that role under a known endpoint.

TODO: specify custom network (connect, read) timeouts, else this will block for the default timeout durations.

func (*AuthCredentials) GetAccessKey

func (a *AuthCredentials) GetAccessKey() string

GetAccessKey returns the access key

func (*AuthCredentials) GetExpiration

func (a *AuthCredentials) GetExpiration() time.Time

GetExpiration retrieves the current expiration time

func (*AuthCredentials) GetSecretKey

func (a *AuthCredentials) GetSecretKey() string

GetSecretKey returns the secret key

func (*AuthCredentials) GetToken

func (a *AuthCredentials) GetToken() string

GetToken returns the token

func (*AuthCredentials) HasExpiration

func (a *AuthCredentials) HasExpiration() bool

HasExpiration returns true if the expiration time is non-zero and false otherwise

func (*AuthCredentials) Renew

func (a *AuthCredentials) Renew() error

Renew retrieves a new token and mutates it on an instance of the Auth struct

func (*AuthCredentials) Sign

func (a *AuthCredentials) Sign(s *Service, t time.Time) []byte

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is like http.Client, but signs all requests using Auth.

func NewClient

func NewClient(auth Auth) *Client

NewClient creates a new Client that uses the credentials in the specified Auth object.

This function assumes the Auth object has been sanely initialized. If you wish to infer auth credentials from the environment, refer to NewAuth

func NewClientWithHTTPClient

func NewClientWithHTTPClient(auth Auth, httpClient *http.Client) *Client

NewClientWithHTTPClient creates a client with a non-default http client ie. a timeout could be set on the HTTP client to timeout if Kinesis doesn't response in a timely manner like after the 5 minute mark where the current shard iterator expires

func (*Client) Do

func (c *Client) Do(req *http.Request) (*http.Response, error)

Do some request, but sign it before sending

type DescribeStreamResp

type DescribeStreamResp struct {
	StreamDescription struct {
		HasMoreShards bool
		Shards        []DescribeStreamShards
		StreamARN     string
		StreamName    string
		StreamStatus  string
	}
}

DescribeStreamResp stores the information that provides by DescribeStream API call

type DescribeStreamShards

type DescribeStreamShards struct {
	AdjacentParentShardId string
	HashKeyRange          struct {
		EndingHashKey   string
		StartingHashKey string
	}
	ParentShardId       string
	SequenceNumberRange struct {
		EndingSequenceNumber   string
		StartingSequenceNumber string
	}
	ShardId string
}

DescribeStreamShards stores the information about list of shards inside DescribeStreamResp

type Error

type Error struct {
	// HTTP status code (200, 403, ...)
	StatusCode int
	// error code ("UnsupportedOperation", ...)
	Code string
	// The human-oriented error message
	Message   string
	RequestId string
}

Error represent error from Kinesis API

func (*Error) Error

func (err *Error) Error() string

Return error message from error object

type GetRecordsRecords

type GetRecordsRecords struct {
	ApproximateArrivalTimestamp float64
	Data                        []byte
	PartitionKey                string
	SequenceNumber              string
}

GetNextRecordsRecords stores the information that provides by GetNextRecordsResp

func (GetRecordsRecords) GetData

func (r GetRecordsRecords) GetData() []byte

type GetRecordsResp

type GetRecordsResp struct {
	MillisBehindLatest int64
	NextShardIterator  string
	Records            []GetRecordsRecords
}

GetNextRecordsResp stores the information that provides by GetNextRecords API call

type GetShardIteratorResp

type GetShardIteratorResp struct {
	ShardIterator string
}

GetShardIteratorResp stores the information that provides by GetShardIterator API call

type Kinesis

type Kinesis struct {
	// contains filtered or unexported fields
}

Structure for kinesis client

func New

func New(auth Auth, region string) *Kinesis

New returns an initialized AWS Kinesis client using the canonical live “production” endpoint for AWS Kinesis, i.e. https://kinesis.{region}.amazonaws.com

func NewWithClient

func NewWithClient(region string, client *Client) *Kinesis

NewWithClient returns an initialized AWS Kinesis client using the canonical live “production” endpoint for AWS Kinesis, i.e. https://kinesis.{region}.amazonaws.com but with the ability to create a custom client with specific configurations like a timeout

func NewWithEndpoint

func NewWithEndpoint(auth Auth, region string, endpoint string) *Kinesis

NewWithEndpoint returns an initialized AWS Kinesis client using the specified endpoint. This is generally useful for testing, so a local Kinesis server can be used.

func (*Kinesis) CreateStream

func (kinesis *Kinesis) CreateStream(StreamName string, ShardCount int) error

CreateStream adds a new Amazon Kinesis stream to your AWS account StreamName is a name of stream, ShardCount is number of shards more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html

func (*Kinesis) DeleteStream

func (kinesis *Kinesis) DeleteStream(StreamName string) error

DeleteStream deletes a stream and all of its shards and data from your AWS account StreamName is a name of stream more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html

func (*Kinesis) DescribeStream

func (kinesis *Kinesis) DescribeStream(args *RequestArgs) (resp *DescribeStreamResp, err error)

DescribeStream returns the following information about the stream: the current status of the stream, the stream Amazon Resource Name (ARN), and an array of shard objects that comprise the stream. For each shard object there is information about the hash key and sequence number ranges that the shard spans, and the IDs of any earlier shards that played in a role in a MergeShards or SplitShard operation that created the shard more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html

func (*Kinesis) DescribeStreamAllShards

func (kinesis *Kinesis) DescribeStreamAllShards(streamName string) (resp *DescribeStreamResp, err error)

DescribeStreamAllShards returns the same information as DescribeStream, but also returns all of the shards if hitting limits.

func (*Kinesis) GetRecords

func (kinesis *Kinesis) GetRecords(args *RequestArgs) (resp *GetRecordsResp, err error)

GetRecords returns one or more data records from a shard more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html

func (*Kinesis) GetShardIterator

func (kinesis *Kinesis) GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error)

GetShardIterator returns a shard iterator more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

func (*Kinesis) ListStreams

func (kinesis *Kinesis) ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error)

ListStreams returns an array of the names of all the streams that are associated with the AWS account making the ListStreams request more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html

func (*Kinesis) MergeShards

func (kinesis *Kinesis) MergeShards(args *RequestArgs) error

MergeShards merges two adjacent shards in a stream and combines them into a single shard to reduce the stream's capacity to ingest and transport data more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html

func (*Kinesis) PutRecord

func (kinesis *Kinesis) PutRecord(args *RequestArgs) (resp *PutRecordResp, err error)

PutRecord puts a data record into an Amazon Kinesis stream from a producer. args must contain a single record added with AddRecord. More info: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

func (*Kinesis) PutRecords

func (kinesis *Kinesis) PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error)

PutRecords puts multiple data records into an Amazon Kinesis stream from a producer more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html

func (*Kinesis) SplitShard

func (kinesis *Kinesis) SplitShard(args *RequestArgs) error

SplitShard splits a shard into two new shards in the stream, to increase the stream's capacity to ingest and transport data more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_SplitShard.html

type KinesisClient

type KinesisClient interface {
	CreateStream(StreamName string, ShardCount int) error
	DeleteStream(StreamName string) error
	DescribeStream(args *RequestArgs) (resp *DescribeStreamResp, err error)
	DescribeStreamAllShards(StreamName string) (resp *DescribeStreamResp, err error)
	GetRecords(args *RequestArgs) (resp *GetRecordsResp, err error)
	GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error)
	ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error)
	MergeShards(args *RequestArgs) error
	PutRecord(args *RequestArgs) (resp *PutRecordResp, err error)
	PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error)
	SplitShard(args *RequestArgs) error
}

KinesisClient interface implemented by Kinesis

type ListStreamsResp

type ListStreamsResp struct {
	HasMoreStreams bool
	StreamNames    []string
}

ListStreamsResp stores the information that provides by ListStreams API call

type PutRecordResp

type PutRecordResp struct {
	SequenceNumber string
	ShardId        string
}

PutRecordResp stores the information that provides by PutRecord API call

type PutRecordsResp

type PutRecordsResp struct {
	FailedRecordCount int
	Records           []PutRecordsRespRecord
}

PutRecordsResp stores the information that provides by PutRecord API call

type PutRecordsRespRecord

type PutRecordsRespRecord struct {
	ErrorCode      string
	ErrorMessage   string
	SequenceNumber string
	ShardId        string
}

RecordResp stores individual Record information provided by PutRecords API call

type RequestArgs

type RequestArgs struct {
	Records []map[string]interface{}
	// contains filtered or unexported fields
}

RequestArgs store params for request

func NewArgs

func NewArgs() *RequestArgs

NewFilter creates a new Filter.

func (*RequestArgs) Add

func (f *RequestArgs) Add(name string, value interface{})

Add appends a filtering parameter with the given name and value(s).

func (*RequestArgs) AddData

func (f *RequestArgs) AddData(value []byte)

func (*RequestArgs) AddRecord

func (f *RequestArgs) AddRecord(value []byte, partitionKey string)

Add data and partition for sending multiple Records to Kinesis in one API call

func (*RequestArgs) AddRecordWithArgs

func (f *RequestArgs) AddRecordWithArgs(value []byte, partitionKey string, args map[string]interface{})

Add data and partition for sending multiple Records to Kinesis in one API call

type Service

type Service struct {
	// Name is the name of the service being used (i.e. iam, etc)
	Name string

	// Region is the region you want to communicate with the service through. (i.e. us-east-1)
	Region string
}

Service represents an AWS-compatible service.

func (*Service) Sign

func (s *Service) Sign(authKeys Auth, r *http.Request) error

Sign signs an HTTP request with the given AWS keys for use on service s.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL