gcssink

package
v0.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2023 License: BSD-3-Clause Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// CodeWrongTypeSinkMessage is returned when sink message is not a gcssink.SinkMessage
	CodeWrongTypeSinkMessage = errors.Code("WRONG_TYPE_SINK_MESSAGE")
	// CodeEmptyDataSinkMessage is returned when sink message data is empty
	CodeEmptyDataSinkMessage = errors.Code("EMPTY_DATA_MESSAGE")
	// CodeFailedToWriteAtBucket is returned when a error occurs while writing to gcs
	CodeFailedToWriteAtBucket = errors.Code("FAILED_TO_WRITE_AT_BUCKET")
	// CodeFailedToCloseBucket is returned when a error occurs while closing gcs bucket
	CodeFailedToCloseBucket = errors.Code("FAILED_TO_CLOSE_BUCKET")
	// CodePanic is returned when a panic occurs
	CodePanic = errors.Code("PANIC_TO_WRITE_AT_BUCKET")

	// ErrFailedToCloseSink is returned when any error occurs while gcsClientGateway is writing
	ErrFailedToStoreMessages = errors.New("failed to store messages")
	// ErrInvalidMessage is returned when sink message is invalid
	ErrInvalidSinkMessage = errors.New("invalid sink message")
	//ErrPanic is returned when a panic occurs
	ErrPanic = errors.New("panic occurred while storing to gcs")
)

Functions

func MakeGcsRetrierOptions added in v0.14.4

func MakeGcsRetrierOptions(
	initialIntervalInSeconds int,
	maxIntervalInSeconds int,
	multiplier float64,
	policy storage.RetryPolicy,
) []storage.RetryOption

MakeRetrierOptions returns a []storage.RetryOption to allows users to configure non-default retry behavior for API calls made to GCS.

func MustNewParallel

func MustNewParallel(
	clientGateway GcsClientGateway,
	contentType string,
	chunkSize int,
	retrierOption []storage.RetryOption,
) (pipeline.Sink, func() error)

MustNew creates a new pipeline sink that write messages to GCS. It panics if GcsClientGateway, contentType or chunkSize are nil or invalid. Order of the messages is not guaranteed.

Types

type GcsClientGateway

type GcsClientGateway interface {
	// GetWriter returns a writer to a GCS object with the given bucket, object, contentType, chunkSize and retrierOption.
	// If retrierOption is not empty, the writer will be configured with the given retry options.
	GetWriter(ctx context.Context, bucket, object, contentType string, chunkSize int, retrierOption ...storage.RetryOption) *storage.Writer
	// Write writes the given message to the given writer.
	Write(writer *storage.Writer, message SinkMessage) error
	// Close closes the GCS client
	Close() error
}

GcsClientGateway represents a gateway to GCS client

func NewGcsGateway

func NewGcsGateway(client *storage.Client) GcsClientGateway

NewGcsClientGateway creates a new GcsClientGateway

type SinkMessage

type SinkMessage struct {
	Data        []byte
	StoragePath string
	Bucket      string
	Metadata    map[string]string
}

SinkMessage is the input for the GCS Sink

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL