Documentation ¶
Index ¶
- Variables
- func GetStreamKey(streamName, shardId, clientName string) string
- type Client
- func (c *Client) CreateStream(streamName string, shardCount int) error
- func (c *Client) DeleteStream(streamName string) error
- func (c *Client) ListStreams() ([]string, error)
- func (c *Client) NewLockedReader(streamName string, shardId string, clientName string) (*LockedReader, error)
- func (c *Client) NewLockedReaderWithParameters(streamName string, shardId string, clientName string, ...) (*LockedReader, error)
- func (c *Client) NewReader(streamName string, shardId string, clientName string) (*Reader, error)
- func (c *Client) NewReaderWithParameters(streamName string, shardId string, clientName string, ...) (*Reader, error)
- func (c *Client) NewSharedReader(streamName string, clientName string) (*SharedReader, error)
- func (c *Client) NewSharedReaderWithParameters(streamName string, clientName string, streamReadInterval time.Duration, ...) (*SharedReader, error)
- func (c *Client) PutRecord(streamName, partitionKey string, record []byte) error
- func (c *Client) PutRecords(streamName string, records []*kinesis.PutRecordsRequestEntry) error
- func (c *Client) StreamDescription(streamName string) (*kinesis.StreamDescription, error)
- func (c *Client) UpdateStream(streamName string, shardsCount int) error
- type Kinesis
- type LockedReader
- type Reader
- type SharedReader
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func GetStreamKey ¶
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func New ¶
func New(awsConfig *aws.Config, distlock locker.Locker, checkpoint checkpointer.Checkpointer, snitch snitcher.Snitcher) *Client
func (*Client) CreateStream ¶
func (*Client) DeleteStream ¶
func (*Client) ListStreams ¶
func (*Client) NewLockedReader ¶
func (c *Client) NewLockedReader(streamName string, shardId string, clientName string) (*LockedReader, error)
NewLockedReader creates a new reader with default parameters and locks it so no other instance of clientName can create a new one on this shard.
func (*Client) NewLockedReaderWithParameters ¶
func (c *Client) NewLockedReaderWithParameters(streamName string, shardId string, clientName string, streamReadInterval time.Duration, readBatchSize int, channelBufferSize int) (*LockedReader, error)
NewLockedReader creates a new reader with specified parameters and locks it so no other instance of clientName can create a new one on this shard.
func (*Client) NewReader ¶
NewReader initialize a reader on a shard with default parameters. It reads a batch of 100 records from a shard every 100 ms
func (*Client) NewReaderWithParameters ¶
func (c *Client) NewReaderWithParameters(streamName string, shardId string, clientName string, streamReadInterval time.Duration, readBatchSize int, channelBufferSize int) (*Reader, error)
NewReaderWithParameters initialize a reader on a shard defining how often a shard should be read, the size of a read batch and the channel buffer.
func (*Client) NewSharedReader ¶
func (c *Client) NewSharedReader(streamName string, clientName string) (*SharedReader, error)
func (*Client) NewSharedReaderWithParameters ¶
func (*Client) PutRecords ¶
func (c *Client) PutRecords(streamName string, records []*kinesis.PutRecordsRequestEntry) error
func (*Client) StreamDescription ¶
func (c *Client) StreamDescription(streamName string) (*kinesis.StreamDescription, error)
type Kinesis ¶
type Kinesis interface { PutRecord(streamName, partitionKey string, record []byte) error PutRecords(streamName string, records []*kinesis.PutRecordsRequestEntry) error StreamDescription(streamName string) (*kinesis.StreamDescription, error) CreateStream(streamName string, shardCount int) error UpdateStream(streamName string, shardsCount int) error DeleteStream(streamName string) error ListStreams() ([]string, error) NewReader(streamName string, shardId string, clientName string) (*Reader, error) NewReaderWithParameters(streamName string, shardId string, clientName string, streamReadInterval time.Duration, readBatchSize int, channelBufferSize int) (*Reader, error) NewLockedReader(streamName string, shardId string, clientName string) (*LockedReader, error) NewLockedReaderWithParameters(streamName string, shardId string, clientName string, streamReadInterval time.Duration, readBatchSize int, channelBufferSize int) (*LockedReader, error) }
type LockedReader ¶
type LockedReader struct { *Reader // contains filtered or unexported fields }
func (*LockedReader) CloseAndRelease ¶
func (lr *LockedReader) CloseAndRelease(wg *sync.WaitGroup) error
CloseAndRelease closes the reader and releases the lock AFTER wg.Done() was called.
IMPORTANT: You should call wg.Done() when the channel is closed and consumed. Failing to call wg.Done() will result in this call hanging indefinitely.
func (*LockedReader) CloseUpdateCheckpointAndRelease ¶
func (lr *LockedReader) CloseUpdateCheckpointAndRelease(wg *sync.WaitGroup) error
CloseUpdateCheckpointAndRelease closes the reader and updates the checkpoint of the reader and releases the lock AFTER wg.Done() was called.
IMPORTANT: You should call wg.Done() when the channel is closed and consumed. Failing to call wg.Done() will result in this call hanging indefinitely.
func (*LockedReader) Release ¶
func (lr *LockedReader) Release() error
Release releases the lock that was created when creating this reader. Successfully calling this function more than once will result in no-op.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
func (*Reader) BlockReading ¶
func (r *Reader) BlockReading()
BlockReading stops reading from the stream after the current batch is processed. This could be used to safely update checkpoints before the reader is closed.
func (*Reader) Close ¶
Close waits for the current batch to be read and pushed to the channel and then stops the reading and closes the channel. No further records will be read from the stream. After calling close and consuming the channel is safe to call UpdateCheckpoint.
func (*Reader) Records ¶
Records consumes a shard from the last checkpoint if any, otherwise starts at the last untrimmed record. It returns a read-only buffered channel via which results are delivered. NOTE: checkpoints are NOT automatically set, you have to do it via UpdateCheckpoint function ideally after you call Close and consume all messages from the channel. If you want to make checkpoints while consuming the stream and be 100% safe that no messages got unprocessed you should call BlockReading, consume the channel, call UpdateCheckpoint and then call ResumeReading.
func (*Reader) UpdateCheckpoint ¶
UpdateCheckpoint sets the checkpoint to the last record that was read. It waits for the current batch to be processed and pushed to the channel.
type SharedReader ¶
type SharedReader struct {
// contains filtered or unexported fields
}
func (*SharedReader) Close ¶
func (sr *SharedReader) Close() error
func (*SharedReader) Records ¶
func (sr *SharedReader) Records() chan *kinesis.Record
func (*SharedReader) UpdateCheckpoint ¶
func (sr *SharedReader) UpdateCheckpoint() error