firequeue

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2024 License: MIT Imports: 15 Imported by: 0

README

firequeue

Test Status Coverage Status MIT License PkgGoDev

firequeue is to ensure putting items to Amazon Kinesis Data Firehose with an in-memory queue

Synopsis

import (
    "context"

    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/firehose"
)

func main() {
    sess := session.Must(session.NewSession())
    fh := firehose.New(sess)
    fq := firequeue.New(fh)

    ctx := context.Background()
    go fq.Loop(ctx) // We should start looping before sending items

    err := fq.Send(&firehose.PutRecordInput{...})
    ...
}

Description

The firequeue utilizes an in-memory queue to ensure input to Amazon Kinesis Data Firehose. When the looping process is cancelled by the context, the firequeue wait for the queue to be empty and then exit the loop.

Installation

% go get github.com/natureglobal/firequeue

Author

Songmu

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*Queue)

Option is a type for constructor options

func BatchInterval added in v0.0.6

func BatchInterval(period time.Duration) Option

BatchInterval is an option to specify batch interval

func BatchSize added in v0.0.6

func BatchSize(length int) Option

BatchSize is an option to specify batch size

func ErrorHandler

func ErrorHandler(fn func(error, []*firehose.Record)) Option

ErrorHandler is an option to specify the error handler

func MaxQueueLength

func MaxQueueLength(length int) Option

MaxQueueLength is an option to specify max length of in-memory queue

func Parallel

func Parallel(i int) Option

Parallel is an option to specify the number of parallelism

type Queue

type Queue struct {
	DeliveryStreamName *string
	// contains filtered or unexported fields
}

Queue manages a sending list for firehose

func New

func New(fh firehoseiface.FirehoseAPI, DeliveryStreamName string, opts ...Option) *Queue

New return new Queue

func (*Queue) Enqueue added in v0.0.6

func (q *Queue) Enqueue(r *firehose.Record) error

Enqueue add data to queue

func (*Queue) Loop

func (q *Queue) Loop(ctx context.Context) error

Loop proceeds jobs in queue

func (*Queue) Stats

func (q *Queue) Stats() Stats

Stats return queue stats

type Stats

type Stats struct {
	QueueLength      int
	BatchLength      int
	Success          int64
	RetryCount       int64
	UnretryableError int64
	QueueFullError   int64
}

Stats contains firequeue statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL