s3iot

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: Apache-2.0 Imports: 10 Imported by: 8

README

s3iot

Go Reference ci codecov License

News

November 2023: API breaking change in aws-sdk-go-v2

https://github.com/aws/aws-sdk-go-v2 made a API breaking change in v1.23.0 (service/s3 v1.42.2). If you want to use the earlier versions, import github.com/at-wat/s3iot/awss3v2-1.22.2 instead of github.com/at-wat/s3iot/awss3v2.

AWS S3 Uploader/Downloader for IoT-ish applications

Package s3iot provides S3 uploader/downloader applicable for unreliable and congestible network. Main features:

  • Programmable retry
  • Pause/resume
  • Bandwidth control (uploader only)

Examples

License

This package is licensed under Apache License Version 2.0.

Documentation

Overview

Package s3iot provides S3 uploader applicable for unreliable and congestible network. Object can be uploaded with retry, pause/resume, and bandwidth limit.

Package s3iot provides S3 uploader applicable for unreliable and congestible network. Object can be uploaded with retry, pause/resume, and bandwidth limit.

Index

Constants

View Source
const (
	DefaultExponentialBackoffWaitBase = time.Second
	DefaultExponentialBackoffWaitMax  = time.Minute
	DefaultRetryMax                   = 8
)

Default retry parameters.

View Source
const (
	DefaultUploadPartSize = 1024 * 1024 * 5
	MaxUploadParts        = 10000
)

Default upload slicing parametes.

View Source
const (
	DefaultDownloadPartSize = 1024 * 1024 * 5
)

Default download slicing parametes.

View Source
const (
	DefaultWaitReadInterceptorMaxChunkSize = 4 * 1024
)

Default ReadInterceptor parameters.

Variables

View Source
var (
	ErrChangedDuringDownload    = errors.New("object is changed during download")
	ErrUnexpectedServerResponse = errors.New("unexpected server response")
)

Download errors.

View Source
var DefaultErrorClassifier = &NaiveErrorClassifier{}

DefaultErrorClassifier is the default ErrorClassifier.

DefaultRetryer is the default Retryer used by Uploader.

View Source
var ErrForcePaused = &retryableError{errors.New("force paused")}

ErrForcePaused indicates part up/download is canceled due to force pause.

Functions

This section is empty.

Types

type BucketKeyer

type BucketKeyer interface {
	BucketKey() (_, _ string)
}

BucketKeyer provides getter of target bucket and key.

type DefaultDownloadSlicerFactory

type DefaultDownloadSlicerFactory struct {
	PartSize int64
}

DefaultDownloadSlicerFactory is a factory of the default slicing logic.

func (DefaultDownloadSlicerFactory) New

New creates DownloadSlicer for the given io.WriterAt.

type DefaultUploadSlicerFactory

type DefaultUploadSlicerFactory struct {
	PartSize       int64
	MaxUploadParts int
}

DefaultUploadSlicerFactory is a factory of the default slicing logic.

func (DefaultUploadSlicerFactory) New

New creates UploadSlicer for the given io.Reader.

type DoneNotifier

type DoneNotifier interface {
	// Done returns a channel which will be closed after complete.
	Done() <-chan struct{}
}

DoneNotifier provices completion notifier.

type DownloadContext

type DownloadContext interface {
	// Result reutrns the upload status or error.
	Status() (DownloadStatus, error)
	// Result reutrns the upload result or error.
	Result() (DownloadOutput, error)

	Pauser
	DoneNotifier
}

DownloadContext provides access to the download progress and the result.

type DownloadInput

type DownloadInput struct {
	Bucket    *string
	Key       *string
	VersionID *string
}

DownloadInput represents upload destination and data.

type DownloadOutput

type DownloadOutput struct {
	ContentType  *string
	ETag         *string
	LastModified *time.Time
	VersionID    *string
}

DownloadOutput represents download result.

type DownloadSlicer

type DownloadSlicer interface {
	NextWriter() (io.WriterAt, contentrange.Range)
}

DownloadSlicer splits input data stream into multiple io.WriterAt.

type DownloadSlicerFactory

type DownloadSlicerFactory interface {
	New(io.WriterAt) DownloadSlicer
}

DownloadSlicerFactory creates DownloadSlicer for given io.WriterAt. DownloadSlicer will be created for each Download() call.

type DownloadStatus

type DownloadStatus struct {
	Status
	DownloadOutput
}

DownloadStatus represents download status.

type Downloader

type Downloader struct {
	UpDownloaderBase

	DownloadSlicerFactory DownloadSlicerFactory
}

Downloader implements S3 downloader with configurable retry and bandwidth limit.

func (Downloader) Download

func (u Downloader) Download(ctx context.Context, w io.WriterAt, input *DownloadInput) (DownloadContext, error)

Download a file to S3.

type DownloaderOption

type DownloaderOption interface {
	ApplyToDownloader(*Downloader)
}

DownloaderOption sets optional parameter to the Downloader.

func WithDownloadSlicer

func WithDownloadSlicer(s DownloadSlicerFactory) DownloaderOption

WithDownloadSlicer sets DownloadSlicerFactory to Downloader.

type DownloaderOptionFn

type DownloaderOptionFn func(*Downloader)

DownloaderOptionFn is functional option for Downloader.

func (DownloaderOptionFn) ApplyToDownloader

func (f DownloaderOptionFn) ApplyToDownloader(d *Downloader)

ApplyToDownloader apply the option to the Downloader.

type ErrorClassifier

type ErrorClassifier interface {
	IsRetryable(error) bool
	IsThrottle(error) (time.Duration, bool)
}

ErrorClassifier distinguishes given error is retryable.

type ExponentialBackoffRetryerFactory

type ExponentialBackoffRetryerFactory struct {
	WaitBase time.Duration
	WaitMax  time.Duration
	RetryMax int
}

ExponentialBackoffRetryerFactory creates ExponentialBackoffRetryer. When raw s3 upload API call is failed, the API call will be retried after WaitBase. Wait duration is multiplied by 2 if it continuously failed up to WaitMax.

func (ExponentialBackoffRetryerFactory) New

New creates ExponentialBackoffRetryer.

type NaiveErrorClassifier

type NaiveErrorClassifier struct{}

NaiveErrorClassifier returns error as retryable and no need to throttle.

func (NaiveErrorClassifier) IsRetryable

func (NaiveErrorClassifier) IsRetryable(error) bool

IsRetryable implements ErrorClassifier.

func (NaiveErrorClassifier) IsThrottle

func (NaiveErrorClassifier) IsThrottle(error) (time.Duration, bool)

IsThrottle implements ErrorClassifier.

type NoRetryerFactory

type NoRetryerFactory struct{}

NoRetryerFactory disables retry.

func (NoRetryerFactory) New

New creates NoRetryer.

type PauseOnFailRetryerFactory

type PauseOnFailRetryerFactory struct {
	Base RetryerFactory
}

PauseOnFailRetryerFactory creates retryer to pause on failure instead of aborting.

func (PauseOnFailRetryerFactory) New

New creates PauseOnFail.

type Pauser

type Pauser interface {
	Pause()
	Resume()
}

Pauser provices pause/resume interface.

type ReadInterceptor

type ReadInterceptor interface {
	Reader(io.ReadSeeker) io.ReadSeeker
}

ReadInterceptor wraps io.ReadSeeker to intercept Read() calls.

type ReadInterceptorFactory

type ReadInterceptorFactory interface {
	New() ReadInterceptor
}

ReadInterceptorFactory creates ReadInterceptor. ReadInterceptor will be created for each Upload() call.

type RetryError

type RetryError struct {
	// contains filtered or unexported fields
}

RetryError is returned when retry is exceeded limit.

func (*RetryError) Error

func (e *RetryError) Error() string

Error implements error.

func (*RetryError) Unwrap

func (e *RetryError) Unwrap() error

Unwrap returns original error.

type Retryer

type Retryer interface {
	OnFail(ctx context.Context, id int64, err error) bool
	OnSuccess(id int64)
}

Retryer controls upload retrying logic. Retryer may sleep on OnFail to control retry interval, but can be canceled by ctx.

type RetryerFactory

type RetryerFactory interface {
	New(Pauser) Retryer
}

RetryerFactory creates Retryer. Retryer will be created for each Upload() call.

type RetryerHookFactory

type RetryerHookFactory struct {
	Base    RetryerFactory
	OnError func(bucket, key string, err error)
}

RetryerHookFactory adds hook callback to the base retryer. Base and OnError must be set, or cause panic.

func (RetryerHookFactory) New

New creates RetryerHook.

type Status

type Status struct {
	Size          int64
	CompletedSize int64
	NumRetries    int
	Paused        bool
}

Status represents upload/download status.

type UpDownloaderBase

type UpDownloaderBase struct {
	API             s3api.UpDownloadAPI
	RetryerFactory  RetryerFactory
	ErrorClassifier ErrorClassifier
	ForcePause      bool
}

UpDownloaderBase stores downloader/uploader base objects.

type UpDownloaderOption

type UpDownloaderOption interface {
	UploaderOption
	DownloaderOption
}

UpDownloaderOption sets optional parameter to the Uploader or Downloader.

func WithAPI

WithAPI sets S3 API.

func WithErrorClassifier

func WithErrorClassifier(ec ErrorClassifier) UpDownloaderOption

WithErrorClassifier sets ErrorClassifierFactor.

func WithForcePause added in v0.0.3

func WithForcePause(f bool) UpDownloaderOption

WithForcePause enables to forcefully pause. If ForcePause is enabled, ongoing API call will be canceled and retried on resume.

func WithRetryer

func WithRetryer(r RetryerFactory) UpDownloaderOption

WithRetryer sets RetryerFactor.

type UpDownloaderOptionFn

type UpDownloaderOptionFn func(*UpDownloaderBase)

UpDownloaderOptionFn is functional option for Uploader/Downloader.

func (UpDownloaderOptionFn) ApplyToDownloader

func (f UpDownloaderOptionFn) ApplyToDownloader(d *Downloader)

ApplyToDownloader apply the option to the Downloader.

func (UpDownloaderOptionFn) ApplyToUploader

func (f UpDownloaderOptionFn) ApplyToUploader(u *Uploader)

ApplyToUploader apply the option to the Uploader.

type UploadContext

type UploadContext interface {
	// Result reutrns the upload status or error.
	Status() (UploadStatus, error)
	// Result reutrns the upload result or error.
	Result() (UploadOutput, error)

	Pauser
	DoneNotifier
}

UploadContext provides access to the upload progress and the result.

type UploadInput

type UploadInput struct {
	Bucket      *string
	Key         *string
	ACL         *string
	Body        io.Reader
	ContentType *string
}

UploadInput represents upload destination and data.

type UploadOutput

type UploadOutput struct {
	VersionID *string
	ETag      *string
	Location  *string
}

UploadOutput represents upload result.

type UploadSlicer

type UploadSlicer interface {
	Len() int64
	NextReader() (io.ReadSeeker, func(), error)
}

UploadSlicer splits input data stream into multiple io.ReadSeekers.

type UploadSlicerFactory

type UploadSlicerFactory interface {
	New(io.Reader) (UploadSlicer, error)
}

UploadSlicerFactory creates UploadSlicer for given io.Reader. UploadSlicer will be created for each Upload() call.

type UploadStatus

type UploadStatus struct {
	Status

	UploadID string
}

UploadStatus represents upload status.

type Uploader

type Uploader struct {
	UpDownloaderBase

	UploadSlicerFactory    UploadSlicerFactory
	ReadInterceptorFactory ReadInterceptorFactory
}

Uploader implements S3 uploader with configurable retry and bandwidth limit.

func (Uploader) Upload

func (u Uploader) Upload(ctx context.Context, input *UploadInput) (UploadContext, error)

Upload a file to S3.

type UploaderOption

type UploaderOption interface {
	ApplyToUploader(*Uploader)
}

UploaderOption sets optional parameter to the Uploader.

func WithReadInterceptor

func WithReadInterceptor(i ReadInterceptorFactory) UploaderOption

WithReadInterceptor sets ReadInterceptorFactory to Uploader.

func WithUploadSlicer

func WithUploadSlicer(s UploadSlicerFactory) UploaderOption

WithUploadSlicer sets UploadSlicerFactory to Uploader.

type UploaderOptionFn

type UploaderOptionFn func(*Uploader)

UploaderOptionFn is functional option for Uploader.

func (UploaderOptionFn) ApplyToUploader

func (f UploaderOptionFn) ApplyToUploader(u *Uploader)

ApplyToUploader apply the option to the Uploader.

type WaitReadInterceptorFactory

type WaitReadInterceptorFactory struct {
	// contains filtered or unexported fields
}

WaitReadInterceptorFactory creates WaitInterceptor.

func NewWaitReadInterceptorFactory

func NewWaitReadInterceptorFactory(waitPerByte time.Duration, opts ...WaitReadInterceptorOption) *WaitReadInterceptorFactory

NewWaitReadInterceptorFactory creates WaitReadInterceptorFactory with wait/byte value.

func (*WaitReadInterceptorFactory) New

New creates WaitReadInterceptor.

func (*WaitReadInterceptorFactory) SetMaxChunkSize

func (f *WaitReadInterceptorFactory) SetMaxChunkSize(s int)

SetMaxChunkSize sets maximum size of the uploading data chunk. It can be changed during upload.

func (*WaitReadInterceptorFactory) SetWaitPerByte

func (f *WaitReadInterceptorFactory) SetWaitPerByte(w time.Duration)

SetWaitPerByte sets wait/byte parameter. It can be changed during upload.

type WaitReadInterceptorOption

type WaitReadInterceptorOption func(*WaitReadInterceptorFactory)

WaitReadInterceptorOption configures WaitReadInterceptorFactory.

func WaitReadInterceptorMaxChunkSize

func WaitReadInterceptorMaxChunkSize(s int) WaitReadInterceptorOption

WaitReadInterceptorMaxChunkSize sets MaxChunkSize.

Directories

Path Synopsis
awss3v1 module
awss3v2 module
awssdkv2 module
Package contentrange handles HTTP Range header.
Package contentrange handles HTTP Range header.
examples module
internal
iotest
Package iotest provides io interfaces for testing.
Package iotest provides io interfaces for testing.
Package s3api provides abstracted S3 API interface to support multiple major version of aws-sdk-go.
Package s3api provides abstracted S3 API interface to support multiple major version of aws-sdk-go.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL