metamorphosis

package module
v0.1.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	GroupIDKey        = "groupID"
	ShardIDKey        = "shardID"
	WorkerIDKey       = "workerID"
	ExpiresAtKey      = "expiresAt"
	LatestSequenceKey = "latestSequence"
)

Variables

View Source
var (
	ErrNotFound          = errors.New("reservation missing")
	ErrShardReserved     = errors.New("shard is already reserved")
	ErrAllShardsReserved = errors.New("all shards are reserved")
	Now                  = time.Now
)
View Source
var (
	ErrMissingReservation = errors.New("missing reservation")
	ErrStreamError        = errors.New("stream error")
)
View Source
var (
	ErrInvalidConfiguration = errors.New("invalid metamorphosis config")
)

Functions

This section is empty.

Types

type API

type API interface {
	CommitRecord(ctx context.Context, record *metamorphosisv1.Record) error
	FetchRecord(ctx context.Context) (*metamorphosisv1.Record, error)
	FetchRecords(ctx context.Context, max int32) ([]*metamorphosisv1.Record, error)
	Init(ctx context.Context) error
	PutRecords(ctx context.Context, request *PutRecordsRequest) error
	CurrentReservation() *Reservation
	ListReservations(ctx context.Context) ([]Reservation, error)
	IsReserved(reservations []Reservation, shard ktypes.Shard) bool
}

type Actor

type Actor struct {
	SleepAfterProcessing time.Duration
	// contains filtered or unexported fields
}

func (*Actor) Work

func (a *Actor) Work(ctx context.Context) error

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(config *Config) *Client

func (*Client) CommitRecord

func (m *Client) CommitRecord(ctx context.Context, record *metamorphosisv1.Record) error

func (*Client) CurrentReservation

func (m *Client) CurrentReservation() *Reservation

func (*Client) FetchRecord

func (m *Client) FetchRecord(ctx context.Context) (*metamorphosisv1.Record, error)

func (*Client) FetchRecords

func (m *Client) FetchRecords(ctx context.Context, max int32) ([]*metamorphosisv1.Record, error)

func (*Client) Init

func (c *Client) Init(ctx context.Context) error

func (*Client) IsReserved

func (m *Client) IsReserved(reservations []Reservation, shard ktypes.Shard) bool

func (*Client) ListReservations

func (m *Client) ListReservations(ctx context.Context) ([]Reservation, error)

func (*Client) PutRecords

func (m *Client) PutRecords(ctx context.Context, req *PutRecordsRequest) error

func (*Client) ReleaseReservation

func (m *Client) ReleaseReservation(ctx context.Context) error

func (*Client) RenewReservation

func (m *Client) RenewReservation(ctx context.Context) error

func (*Client) ReserveShard

func (c *Client) ReserveShard(ctx context.Context) error

type ClientContextKey

type ClientContextKey struct{}

type Config

type Config struct {
	// required fields
	GroupID          string
	WorkerID         string
	StreamARN        string
	ReservationTable string

	// optional fields
	// StartingSequence   string
	ShardID            string
	ReservationTimeout time.Duration
	RenewTime          time.Duration
	MangerLoopWaitTime time.Duration

	SleepAfterProcessing time.Duration
	Seed                 int
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig() *Config

func (*Config) Bootstrap

func (c *Config) Bootstrap(ctx context.Context) error

func (*Config) Copy

func (c *Config) Copy() *Config

func (*Config) Validate

func (c *Config) Validate() error

func (*Config) WithDynamoClient

func (c *Config) WithDynamoClient(client DynamoDBAPI) *Config

func (*Config) WithGroup

func (c *Config) WithGroup(id string) *Config

func (*Config) WithKinesisClient

func (c *Config) WithKinesisClient(client KinesisAPI) *Config

func (*Config) WithLogger

func (c *Config) WithLogger(l *slog.Logger) *Config

func (*Config) WithMaxActorCount

func (c *Config) WithMaxActorCount(actors int) *Config

func (*Config) WithPrefix

func (c *Config) WithPrefix(id string) *Config

func (*Config) WithProcessor

func (c *Config) WithProcessor(p RecordProcessor) *Config

func (*Config) WithRenewTime

func (c *Config) WithRenewTime(d time.Duration) *Config

func (*Config) WithReservationTimeout

func (c *Config) WithReservationTimeout(d time.Duration) *Config

func (*Config) WithSeed

func (c *Config) WithSeed(seed int) *Config

func (*Config) WithShardCacheDuration

func (c *Config) WithShardCacheDuration(d time.Duration) *Config

func (*Config) WithShardID

func (c *Config) WithShardID(id string) *Config

func (*Config) WithStreamArn

func (c *Config) WithStreamArn(arn string) *Config

func (*Config) WithTableName

func (c *Config) WithTableName(table string) *Config

func (*Config) WithWorkerID

func (c *Config) WithWorkerID(id string) *Config

type DynamoDBAPI

type DynamoDBAPI interface {
	Scan(ctx context.Context, params *dynamodb.ScanInput, optFns ...func(*dynamodb.Options)) (*dynamodb.ScanOutput, error)
	DescribeTable(ctx context.Context, params *dynamodb.DescribeTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DescribeTableOutput, error)
	PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
	GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error)
	UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
	DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
	Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error)
	CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error)
	DeleteTable(ctx context.Context, params *dynamodb.DeleteTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteTableOutput, error)
}

type KinesisAPI

type KinesisAPI interface {
	GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error)
	GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error)
	PutRecords(ctx context.Context, params *kinesis.PutRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error)
	ListShards(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error)
	CreateStream(ctx context.Context, params *kinesis.CreateStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.CreateStreamOutput, error)
	DeleteStream(ctx context.Context, params *kinesis.DeleteStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DeleteStreamOutput, error)
	DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error)
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, config *Config) *Manager

func (*Manager) CheckForAvailableShards

func (m *Manager) CheckForAvailableShards(ctx context.Context) ([]types.Shard, error)

func (*Manager) Loop

func (m *Manager) Loop(ctx context.Context) error

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

type PutRecordsRequest

type PutRecordsRequest struct {
	Records    []*metamorphosisv1.Record
	StreamName *string
	StreamArn  *string
}

type RecordProcessor

type RecordProcessor = func(context.Context, *metamorphosisv1.Record) error

type Reservation

type Reservation struct {
	// primary key
	GroupID string `dynamodbav:"groupID"`
	// secondary key
	ShardID string `dynamodbav:"shardID"`

	WorkerID       string `dynamodbav:"workerID"`
	ExpiresAt      int64  `dynamodbav:"expiresAt"`
	LatestSequence string `dynamodbav:"latestSequence"`
}

func (*Reservation) Expires

func (r *Reservation) Expires() time.Time

Directories

Path Synopsis
gen

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL