lambdastore

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2023 License: MIT Imports: 35 Imported by: 1

Documentation

Index

Constants

View Source
const (
	ConnectionOpen uint32 = iota
	ConnectionClosing
	ConnectionClosed
)
View Source
const (
	INSTANCE_MASK_STATUS_START      = 0x0000000F
	INSTANCE_MASK_STATUS_CONNECTION = 0x000000F0
	INSTANCE_MASK_STATUS_BACKING    = 0x00000F00
	INSTANCE_MASK_STATUS_LIFECYCLE  = 0x0000F000
	INSTANCE_MASK_STATUS_FAILURE    = 0xF0000000

	// Start status
	INSTANCE_UNSTARTED = 0
	INSTANCE_RUNNING   = 1
	INSTANCE_CLOSED    = 2
	INSTANCE_SHADOW    = 15

	// Connection status
	// Activate: Sleeping -> Activating -> Active
	// Retry:    Activating/Active -> Activate (validating)
	// Abandon:  Activating/Active -> Sleeping (validating, warmup)
	// Sleep:    Active -> Sleeping
	// Switch:   Active -> Maybe (Unmanaged)
	// Sleep:    Maybe -> Sleeping
	INSTANCE_SLEEPING   = 0
	INSTANCE_ACTIVATING = 1
	INSTANCE_ACTIVE     = 2
	INSTANCE_MAYBE      = 3

	// Backing status
	INSTANCE_RECOVERING = 1
	INSTANCE_BACKING    = 2

	// Lifecycle status
	PHASE_ACTIVE       = 0 // Instance is actively serving main repository and backup
	PHASE_BACKING_ONLY = 1 // Instance is expiring and serving backup only, warmup should be degraded.
	PHASE_RECLAIMED    = 2 // Instance has been reclaimed.
	PHASE_EXPIRED      = 3 // Instance is expired, no invocation will be made, and it is safe to recycle.

	// Abnormal status
	FAILURE_MAX_QUEUE_REACHED = 1

	MAX_CONCURRENCY  = 2
	IN_CONCURRENCY   = 1
	OUT_CONCURRENCY  = 2
	TEMP_MAP_SIZE    = 10
	BACKING_DISABLED = 0
	BACKING_RESERVED = 1
	BACKING_ENABLED  = 2
	BACKING_FORBID   = 3

	DESCRIPTION_UNSTARTED  = "unstarted"
	DESCRIPTION_CLOSED     = "closed"
	DESCRIPTION_SLEEPING   = "sleeping"
	DESCRIPTION_ACTIVATING = "activating"
	DESCRIPTION_ACTIVE     = "active"
	DESCRIPTION_MAYBE      = "unmanaged"
	DESCRIPTION_UNDEFINED  = "undefined"

	DISPATCH_OPT_BUSY_CHECK = 0x0001
	DISPATCH_OPT_RELOCATED  = 0x0002

	NUM_REQUEST_UNIT       = 0x0000000000000001
	NUM_WRITE_REQUEST_UNIT = 0x0000000100000000
	NUM_REQUEST_MASK       = 0x00000000FFFFFFFF
	NUM_WRITE_REQUEST_MASK = 0xFFFFFFFF00000000
	NUM_WRITE_REQUEST_BITS = 32
)
View Source
const (
	LinkBucketSize       = 10
	UnlimitedActiveLinks = 0
	// Max spare links
	// Set to 2 to reserve 1 for ready and 1 for buffering.
	// Set to MAX_CONCURRENCY + 1 to avoid links being frenquently closed and re-created.
	// Can be override dynamically by instance.SetMaxActiveDataLinks().
	ActiveLinks = 2
)

Variables

View Source
var (
	ErrConnectionClosed      = errors.New("connection closed")
	ErrMissingResponse       = errors.New("missing response")
	ErrUnexpectedCommand     = errors.New("unexpected command")
	ErrUnexpectedType        = errors.New("unexpected type")
	ErrMissingRequest        = errors.New("missing request")
	ErrUnexpectedSendRequest = errors.New("unexpected SendRequest call")
)
View Source
var (
	CM             ClusterManager
	WarmTimeout    = config.InstanceWarmTimeout
	TriggerTimeout = 1 * time.Second // Triggering cost is about 20ms, set large enough to avoid exceeded timeout
	// TODO: Make RTT dynamic, global or per instance.
	RTT                   = 20 * time.Millisecond
	DefaultConnectTimeout = 1000 * time.Millisecond // Decide by RTT.
	MaxConnectTimeout     = 1 * time.Second
	PromisedGoodDue       = 1 * time.Second // Keep consistent with lambda/lifetime/timeout.TICK_ERROR_EXTEND
	MinValidationInterval = RTT             // MinValidationInterval The minimum interval between validations.
	MaxValidationFailure  = 3
	BackoffFactor         = 2
	MaxControlRequestSize = int64(200000) // 200KB, which can be transmitted in 20ms.
	DefaultPingPayload    = []byte{}
	AwsSession            = awsSession.Must(awsSession.NewSessionWithOptions(awsSession.Options{
		SharedConfigState: awsSession.SharedConfigEnable,
	}))

	// Errors
	ErrInstanceClosed     = errors.New("instance closed")
	ErrInstanceReclaimed  = errors.New("instance reclaimed")
	ErrInstanceSleeping   = errors.New("instance is sleeping")
	ErrInstanceRecovering = errors.New("instance is recovering")
	ErrReservationFailed  = errors.New("reservation failed")
	ErrDuplicatedSession  = errors.New("session has started")
	ErrNotCtrlLink        = errors.New("not control link")
	ErrInstanceValidated  = errors.New("instance has been validated by another connection")
	ErrInstanceBusy       = errors.New("instance busy")
	ErrWarmupReturn       = errors.New("return from warmup")
	ErrUnknown            = errors.New("unknown error")
	ErrValidationTimeout  = &LambdaError{error: errors.New("funciton validation timeout"), typ: LambdaErrorTimeout}
	ErrCapacityExceeded   = errors.New("capacity exceeded")
	ErrQueueTimeout       = &LambdaError{error: errors.New("queue timeout"), typ: LambdaErrorTimeout}
	ErrRelocationFailed   = errors.New("relocation failed")
)
View Source
var (
	ErrLinkRequestClosed  = &LambdaError{error: errors.New("link request closed")}
	ErrLinkRequestTimeout = &LambdaError{error: errors.New("link request timeout"), typ: LambdaErrorTimeout}
	ErrLinkManagerReset   = &LambdaError{error: errors.New("link manager reset")}
	ErrNilLink            = &LambdaError{error: errors.New("unexpected nil link")}

	// Keep following variables as false. They are only for unit tests.
	UnitTestMTC1 = false
)

Functions

func IsLambdaTimeout

func IsLambdaTimeout(err error) bool

Types

type AvailableLink struct {
	// contains filtered or unexported fields
}

func (*AvailableLink) Close

func (l *AvailableLink) Close()

func (*AvailableLink) Closed

func (l *AvailableLink) Closed() <-chan struct{}

func (*AvailableLink) Error

func (l *AvailableLink) Error() error

func (*AvailableLink) Request

func (l *AvailableLink) Request() chan<- *types.Request

func (*AvailableLink) SetTimeout

func (l *AvailableLink) SetTimeout(d time.Duration)
type AvailableLinks struct {
	// contains filtered or unexported fields
}

func (*AvailableLinks) AddAvailable

func (l *AvailableLinks) AddAvailable(link manageableLink, nolimit bool) bool

func (*AvailableLinks) GetRequestPipe

func (l *AvailableLinks) GetRequestPipe() *AvailableLink

func (*AvailableLinks) Len

func (l *AvailableLinks) Len() int

func (*AvailableLinks) OffsetLimit

func (l *AvailableLinks) OffsetLimit(offset int, max int) int

func (*AvailableLinks) Reset

func (l *AvailableLinks) Reset()

func (*AvailableLinks) SetLimit

func (l *AvailableLinks) SetLimit(limit int) int

type Backer

type Backer interface {
	ReserveBacking() error
	StartBacking(*Instance, int, int) bool
	StopBacking(*Instance)
}

type BackerGetter

type BackerGetter interface {
	// contains filtered or unexported methods
}

type BackupIterator

type BackupIterator struct {
	// contains filtered or unexported fields
}

func (*BackupIterator) Len

func (iter *BackupIterator) Len() int

func (*BackupIterator) Next

func (iter *BackupIterator) Next() bool

func (*BackupIterator) Value

func (iter *BackupIterator) Value() (int, interface{})

type Backups

type Backups struct {
	// contains filtered or unexported fields
}

Backups for a instace. If not specified, all operation are not thread safe.

func NewBackups

func NewBackups(ins *Instance, backups []Backer) *Backups

func NewBackupsFromInstances

func NewBackupsFromInstances(ins *Instance, backups []*Instance, adapter BackerGetter) *Backups

func (*Backups) Availables

func (b *Backups) Availables() int

func (*Backups) GetByHash

func (b *Backups) GetByHash(hash uint64) (*Instance, bool)

func (*Backups) GetByKey

func (b *Backups) GetByKey(key string) (*Instance, bool)

This function is thread safe

func (*Backups) GetByLocation

func (b *Backups) GetByLocation(loc int, required int) (*Instance, bool)

func (*Backups) Invalidate

func (b *Backups) Invalidate()

func (*Backups) Iter

func (b *Backups) Iter() *BackupIterator

func (*Backups) Len

func (b *Backups) Len() int

func (*Backups) Locator

func (b *Backups) Locator() *protocol.BackupLocator

func (*Backups) Reserve

func (b *Backups) Reserve(fallback mapreduce.Iterator) int

func (*Backups) ResetCandidates

func (b *Backups) ResetCandidates(required int, candidates []*Instance)

func (*Backups) Start

func (b *Backups) Start(target *Instance) int

Helper functions

func (*Backups) StartByIndex

func (b *Backups) StartByIndex(i int, target *Instance) (*Instance, bool)

func (*Backups) Stop

func (b *Backups) Stop(target *Instance)

type CandidateProvider

type CandidateProvider func() *Instance

type CandidateQueue

type CandidateQueue struct {
	// contains filtered or unexported fields
}

func NewCandidateQueue

func NewCandidateQueue(bufsize int, provider CandidatesProvider) *CandidateQueue

func (*CandidateQueue) Candidates

func (q *CandidateQueue) Candidates() <-chan *Instance

func (*CandidateQueue) Close

func (q *CandidateQueue) Close()

func (*CandidateQueue) Start

func (q *CandidateQueue) Start() bool

Start the autoload service

type CandidatesProvider

type CandidatesProvider interface {
	LoadCandidates(*CandidateQueue, []*Instance) int
}

type ClusterManager

type ClusterManager interface {
	InstanceManager
	Relocator
}

type Connection

type Connection struct {
	Id uint32
	net.Conn
	// contains filtered or unexported fields
}

TODO: use bsm/pool

func NewConnection

func NewConnection(cn net.Conn) *Connection

func (*Connection) BindInstance

func (conn *Connection) BindInstance(ins *Instance) *Connection

func (*Connection) ClearResponses

func (conn *Connection) ClearResponses()

func (*Connection) Close

func (conn *Connection) Close() error

func (*Connection) CloseAndWait

func (conn *Connection) CloseAndWait() error

func (*Connection) CloseWithReason

func (conn *Connection) CloseWithReason(reason string, block bool) error

Close Signal connection should be closed. Function close() will be called later for actural operation

func (*Connection) IsClosed

func (conn *Connection) IsClosed() bool

func (*Connection) IsSameWorker

func (conn *Connection) IsSameWorker(another *Connection) bool

func (*Connection) SendControl

func (conn *Connection) SendControl(ctrl *types.Control) error

func (*Connection) SendPing

func (conn *Connection) SendPing(payload []byte) error

SendPing send ping with piggyback infos.

func (*Connection) SendRequest

func (conn *Connection) SendRequest(req *types.Request, args ...interface{}) error

func (*Connection) ServeLambda

func (conn *Connection) ServeLambda()

blocking on lambda peek Type lambda handle incoming lambda store response

field 0 : conn id field 1 : req id field 2 : chunk id field 3 : obj val

func (*Connection) String

func (conn *Connection) String() string

func (*Connection) Writer

func (conn *Connection) Writer() *resp.RequestWriter

type DefaultInstanceEnumerator

type DefaultInstanceEnumerator struct {
	mapreduce.Enumerator
}

func NewInstanceEnumerator

func NewInstanceEnumerator(instances []*Instance) *DefaultInstanceEnumerator

func (*DefaultInstanceEnumerator) Instance

func (enum *DefaultInstanceEnumerator) Instance(i int) *Instance

type Delegate

type Delegate struct {
	*Instance
}

Delegate offers new Backup impletation for delagation.

func (*Delegate) StartBacking

func (ins *Delegate) StartBacking(deleIns *Instance, bakId int, total int) bool

Start serving as the delegate for specified instance. Return true always.

type DelegateBackerAdapter

type DelegateBackerAdapter struct {
}

type Deployment

type Deployment struct {
	Block int
	// contains filtered or unexported fields
}

func NewDeployment

func NewDeployment(name string, id uint64) *Deployment

func (*Deployment) Id

func (d *Deployment) Id() uint64

func (*Deployment) Name

func (d *Deployment) Name() string

type Instance

type Instance struct {
	*Deployment
	Meta
	// contains filtered or unexported fields
}

func NewInstance

func NewInstance(name string, id uint64) *Instance

create new lambda instance

func NewInstanceFromDeployment

func NewInstanceFromDeployment(dp *Deployment, id uint64) *Instance

func (*Instance) AbandonLambda

func (ins *Instance) AbandonLambda()

func (*Instance) AssignBackups

func (ins *Instance) AssignBackups(numBak int, candidates []*Instance)

func (*Instance) Close

func (ins *Instance) Close()

func (*Instance) CollectData

func (ins *Instance) CollectData()

func (*Instance) Degrade

func (ins *Instance) Degrade()

func (*Instance) Description

func (ins *Instance) Description() string

func (*Instance) Dispatch

func (ins *Instance) Dispatch(cmd types.Command) error

func (*Instance) DispatchWithOptions

func (ins *Instance) DispatchWithOptions(cmd types.Command, opts int) error

func (*Instance) Expire

func (ins *Instance) Expire()

func (*Instance) FlagDataCollected

func (ins *Instance) FlagDataCollected(ok string)

func (*Instance) ForbidBacking

func (ins *Instance) ForbidBacking() bool

func (*Instance) GetShadowInstance

func (ins *Instance) GetShadowInstance() *Instance

func (*Instance) HandleRequests

func (ins *Instance) HandleRequests()

Handle incoming client requests lambda facing goroutine

func (*Instance) IsActive

func (ins *Instance) IsActive() bool

func (*Instance) IsBacking

func (ins *Instance) IsBacking(includingPrepare bool) bool

func (*Instance) IsBusy

func (ins *Instance) IsBusy(cmd types.Command) (uint64, bool)

func (*Instance) IsClosed

func (ins *Instance) IsClosed() bool

func (*Instance) IsReclaimed

func (ins *Instance) IsReclaimed() bool

func (*Instance) IsRecovering

func (ins *Instance) IsRecovering() bool

func (*Instance) Migrate

func (ins *Instance) Migrate() error

TODO: Add sid support, proxy now need sid to connect.

func (*Instance) Occupancy

func (ins *Instance) Occupancy(mode types.InstanceOccupancyMode) float64

func (*Instance) Phase

func (ins *Instance) Phase() uint32

func (*Instance) ReserveBacking

func (ins *Instance) ReserveBacking() error

Check if the instance is available for serving as a backup for specified instance. Return false if the instance is backing another instance.

func (*Instance) ResetDue

func (ins *Instance) ResetDue(delay bool, reason string)

func (*Instance) ResumeServing

func (ins *Instance) ResumeServing()

Resume serving

func (*Instance) SetDue

func (ins *Instance) SetDue(due int64, delay bool, reason string, args ...interface{})

func (*Instance) StartBacking

func (ins *Instance) StartBacking(bakIns *Instance, bakId int, total int) bool

Start serving as the backup for specified instance. Return false if the instance is backing another instance.

func (*Instance) StartDelegation

func (ins *Instance) StartDelegation() int

StartDelegation delegates the instance to parallel buffer instances. Return # of delegates

func (*Instance) StartRecovery

func (ins *Instance) StartRecovery() int

StartRecovery starts parallel recovery mode. Return # of ready backups

func (*Instance) Status

func (ins *Instance) Status() uint64

InstantStats implementation

func (*Instance) StatusDescription

func (ins *Instance) StatusDescription() string

func (*Instance) StopBacking

func (ins *Instance) StopBacking(bakIns *Instance)

Stop serving as a backup

func (*Instance) String

func (ins *Instance) String() string

func (*Instance) TryFlagValidated

func (ins *Instance) TryFlagValidated(conn *Connection, sid string, flags int64) (*Connection, time.Duration, error)

Flag the instance as validated by specified connection. This also validate the connection belonging to the instance by setting instance field of the connection.

func (*Instance) Validate

func (ins *Instance) Validate(opts ...*ValidateOption) (*Connection, time.Duration, error)

func (*Instance) WarmUp

func (ins *Instance) WarmUp()

type InstanceEnumerator

type InstanceEnumerator interface {
	mapreduce.Enumerator
	Instance(i int) *Instance
}

type InstanceManager

type InstanceManager interface {
	Instance(uint64) *Instance
	Recycle(types.LambdaDeployment) error
	GetBackupCandidates() mapreduce.Iterator
	GetDelegates() []*Instance
	GetServePort(uint64) int
	GetPersistCache() types.PersistCache
}

type LambdaError

type LambdaError struct {
	// contains filtered or unexported fields
}

func (*LambdaError) IsTimeout

func (e *LambdaError) IsTimeout() bool

type LambdaErrorType

type LambdaErrorType int
const (
	LambdaErrorUncategoried LambdaErrorType = iota
	LambdaErrorTimeout
)

type LinkBucket

type LinkBucket struct {
	// contains filtered or unexported fields
}

func (*LinkBucket) Reset

func (b *LinkBucket) Reset()

type LinkManager

type LinkManager struct {
	// contains filtered or unexported fields
}

func NewLinkManager

func NewLinkManager(ins *Instance) *LinkManager
func (m *LinkManager) AddDataLink(link *Connection) bool

func (*LinkManager) Close

func (m *LinkManager) Close()
func (m *LinkManager) DataLinks() *AvailableLinks

func (*LinkManager) FlagAvailableForRequest

func (m *LinkManager) FlagAvailableForRequest(link *Connection) bool

func (*LinkManager) GetAvailableForRequest

func (m *LinkManager) GetAvailableForRequest() *AvailableLink

func (*LinkManager) GetControl

func (m *LinkManager) GetControl() *Connection

func (*LinkManager) GetLastControl

func (m *LinkManager) GetLastControl() *Connection

func (*LinkManager) InvalidateControl

func (m *LinkManager) InvalidateControl(link manageableLink)
func (m *LinkManager) RemoveDataLink(link *Connection)

func (*LinkManager) Reset

func (m *LinkManager) Reset()

func (*LinkManager) SetControl

func (m *LinkManager) SetControl(link *Connection)
func (m *LinkManager) SetMaxActiveDataLinks(num int)

type Meta

type Meta struct {
	// Sequence of the last confirmed log. Logs store by sequence.
	Term uint64

	// Total transmission size for restoring all confirmed logs.
	Updates uint64

	// Rank for lambda to decide if a fast recovery is required.
	DiffRank float64

	// Hash of the last confirmed log.
	Hash string

	// Sequence of snapshot.
	SnapshotTerm uint64

	// Total transmission size for restoring all confirmed logs from start to SnapShotSeq.
	SnapshotUpdates uint64

	// Total size of snapshot for transmission.
	SnapshotSize uint64

	// Flag shows that if meta is out of sync with the corresponding lambda.
	Stale bool

	// Capacity of the instance.
	Capacity uint64
	// contains filtered or unexported fields
}

FULL = (Updates - SnapshotUpdates + SnapshotSize) / Bandwidth + (Term - SnapShotTerm + 1) * RTT INCREMENTAL = (Updates - LastUpdates) / Bandwidth + (Seq - LastSeq) * RTT FULL < INCREMENTAL

func (*Meta) AddChunk

func (m *Meta) AddChunk(key string, sz int64) (num int, size uint64)

func (*Meta) DecreaseSize

func (m *Meta) DecreaseSize(dec int64) uint64

func (*Meta) EffectiveCapacity

func (m *Meta) EffectiveCapacity() uint64

func (*Meta) FromProtocolMeta

func (m *Meta) FromProtocolMeta(meta *protocol.Meta) (bool, error)

func (*Meta) IncreaseSize

func (m *Meta) IncreaseSize(inc int64) uint64

func (*Meta) ModifiedOccupancy

func (m *Meta) ModifiedOccupancy(adjustment uint64) float64

func (*Meta) NumChunks

func (m *Meta) NumChunks() int

func (*Meta) Reconcile

func (m *Meta) Reconcile(meta *protocol.ShortMeta)

func (*Meta) RemoveChunk

func (m *Meta) RemoveChunk(key string, sz int64) (num int, size uint64)

func (*Meta) ReservedCapacity

func (m *Meta) ReservedCapacity() uint64

func (*Meta) ResetCapacity

func (m *Meta) ResetCapacity(capacity uint64, effective uint64)

func (*Meta) Size

func (m *Meta) Size() uint64

func (*Meta) ToBackupPayload

func (m *Meta) ToBackupPayload(id uint64, key int, total int, maxChunkSize uint64) ([]byte, error)

func (*Meta) ToDelegatePayload

func (m *Meta) ToDelegatePayload(id uint64, key int, total int, maxChunkSize uint64) (*protocol.Meta, []byte, error)

func (*Meta) ToPayload

func (m *Meta) ToPayload(id uint64) ([]byte, error)

func (*Meta) ToProtocolMeta

func (m *Meta) ToProtocolMeta(id uint64) *protocol.Meta

type Relocator

type Relocator interface {
	// Relocate relocate the chunk specified by the meta(interface{}) and chunkId(int).
	// Return the instance the chunk is relocated to.
	Relocate(interface{}, int, types.Command) (*Instance, error)

	// TryRelocate Test and relocate the chunk specified by the meta(interface{}) and chunkId(int).
	// Return the instance, trigggered or not(bool), and error if the chunk is triggered.
	TryRelocate(interface{}, int, types.Command) (*Instance, bool, error)
}

type ValidateOption

type ValidateOption struct {
	Notifier  chan struct{}
	Validated *Connection
	Error     error

	// Options
	WarmUp  bool
	Command types.Command
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL