Documentation ¶
Index ¶
- Constants
- Variables
- type API
- type Actor
- type Client
- func (m *Client) CommitRecord(ctx context.Context, record *metamorphosisv1.Record) error
- func (m *Client) CurrentReservation() *Reservation
- func (m *Client) FetchRecord(ctx context.Context) (*metamorphosisv1.Record, error)
- func (m *Client) FetchRecords(ctx context.Context, max int32) ([]*metamorphosisv1.Record, error)
- func (c *Client) Init(ctx context.Context) error
- func (m *Client) IsReserved(reservations []Reservation, shard ktypes.Shard) bool
- func (m *Client) ListReservations(ctx context.Context) ([]Reservation, error)
- func (m *Client) PutRecords(ctx context.Context, req *PutRecordsRequest) error
- func (m *Client) ReleaseReservation(ctx context.Context) error
- func (m *Client) RenewReservation(ctx context.Context) error
- func (c *Client) ReserveShard(ctx context.Context) error
- type ClientContextKey
- type Config
- func (c *Config) Bootstrap(ctx context.Context) error
- func (c *Config) Copy() *Config
- func (c *Config) Validate() error
- func (c *Config) WithDynamoClient(client DynamoDBAPI) *Config
- func (c *Config) WithGroup(id string) *Config
- func (c *Config) WithKinesisClient(client KinesisAPI) *Config
- func (c *Config) WithLogger(l *slog.Logger) *Config
- func (c *Config) WithMaxActorCount(actors int) *Config
- func (c *Config) WithPrefix(id string) *Config
- func (c *Config) WithProcessor(p RecordProcessor) *Config
- func (c *Config) WithRenewTime(d time.Duration) *Config
- func (c *Config) WithReservationTimeout(d time.Duration) *Config
- func (c *Config) WithSeed(seed int) *Config
- func (c *Config) WithShardCacheDuration(d time.Duration) *Config
- func (c *Config) WithShardID(id string) *Config
- func (c *Config) WithStreamArn(arn string) *Config
- func (c *Config) WithTableName(table string) *Config
- func (c *Config) WithWorkerID(id string) *Config
- type DynamoDBAPI
- type KinesisAPI
- type Manager
- type PutRecordsRequest
- type RecordProcessor
- type Reservation
Constants ¶
View Source
const ( GroupIDKey = "groupID" ShardIDKey = "shardID" WorkerIDKey = "workerID" ExpiresAtKey = "expiresAt" LatestSequenceKey = "latestSequence" )
Variables ¶
View Source
var ( ErrNotFound = errors.New("reservation missing") ErrShardReserved = errors.New("shard is already reserved") ErrAllShardsReserved = errors.New("all shards are reserved") Now = time.Now )
View Source
var ( ErrMissingReservation = errors.New("missing reservation") ErrStreamError = errors.New("stream error") )
View Source
var (
ErrInvalidConfiguration = errors.New("invalid metamorphosis config")
)
Functions ¶
This section is empty.
Types ¶
type API ¶
type API interface { CommitRecord(ctx context.Context, record *metamorphosisv1.Record) error FetchRecord(ctx context.Context) (*metamorphosisv1.Record, error) FetchRecords(ctx context.Context, max int32) ([]*metamorphosisv1.Record, error) Init(ctx context.Context) error PutRecords(ctx context.Context, request *PutRecordsRequest) error CurrentReservation() *Reservation ListReservations(ctx context.Context) ([]Reservation, error) IsReserved(reservations []Reservation, shard ktypes.Shard) bool }
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) CommitRecord ¶
func (*Client) CurrentReservation ¶
func (m *Client) CurrentReservation() *Reservation
func (*Client) FetchRecord ¶
func (*Client) FetchRecords ¶
func (*Client) IsReserved ¶
func (m *Client) IsReserved(reservations []Reservation, shard ktypes.Shard) bool
func (*Client) ListReservations ¶
func (m *Client) ListReservations(ctx context.Context) ([]Reservation, error)
func (*Client) PutRecords ¶
func (m *Client) PutRecords(ctx context.Context, req *PutRecordsRequest) error
type ClientContextKey ¶
type ClientContextKey struct{}
type Config ¶
type Config struct { // required fields GroupID string WorkerID string StreamARN string ReservationTable string // optional fields // StartingSequence string ShardID string ReservationTimeout time.Duration RenewTime time.Duration MangerLoopWaitTime time.Duration SleepAfterProcessing time.Duration Seed int // contains filtered or unexported fields }
func (*Config) WithDynamoClient ¶
func (c *Config) WithDynamoClient(client DynamoDBAPI) *Config
func (*Config) WithKinesisClient ¶
func (c *Config) WithKinesisClient(client KinesisAPI) *Config
func (*Config) WithMaxActorCount ¶
func (*Config) WithPrefix ¶
func (*Config) WithProcessor ¶
func (c *Config) WithProcessor(p RecordProcessor) *Config
func (*Config) WithReservationTimeout ¶
func (*Config) WithShardCacheDuration ¶
func (*Config) WithShardID ¶
func (*Config) WithStreamArn ¶
func (*Config) WithTableName ¶
func (*Config) WithWorkerID ¶
type DynamoDBAPI ¶
type DynamoDBAPI interface { Scan(ctx context.Context, params *dynamodb.ScanInput, optFns ...func(*dynamodb.Options)) (*dynamodb.ScanOutput, error) DescribeTable(ctx context.Context, params *dynamodb.DescribeTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DescribeTableOutput, error) PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error) DeleteTable(ctx context.Context, params *dynamodb.DeleteTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteTableOutput, error) }
type KinesisAPI ¶
type KinesisAPI interface { GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) PutRecords(ctx context.Context, params *kinesis.PutRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error) ListShards(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) CreateStream(ctx context.Context, params *kinesis.CreateStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.CreateStreamOutput, error) DeleteStream(ctx context.Context, params *kinesis.DeleteStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DeleteStreamOutput, error) DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) }
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func (*Manager) CheckForAvailableShards ¶
type PutRecordsRequest ¶
type PutRecordsRequest struct { Records []*metamorphosisv1.Record StreamName *string StreamArn *string }
type RecordProcessor ¶
type RecordProcessor = func(context.Context, *metamorphosisv1.Record) error
type Reservation ¶
type Reservation struct { // primary key GroupID string `dynamodbav:"groupID"` // secondary key ShardID string `dynamodbav:"shardID"` WorkerID string `dynamodbav:"workerID"` ExpiresAt int64 `dynamodbav:"expiresAt"` LatestSequence string `dynamodbav:"latestSequence"` }
func (*Reservation) Expires ¶
func (r *Reservation) Expires() time.Time
Click to show internal directories.
Click to hide internal directories.