pool

package
v0.0.0-...-90deddd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2023 License: Apache-2.0 Imports: 16 Imported by: 9

Documentation

Overview

Package pool implements resource pools for reflow. Reflow manages resources in units of "allocs" -- an a resource allocation that exists on a single machine, and to which is attached a shared repository with the results of all execs within that Alloc. Allocs are leased-- they must be kept alive to guarantee continuity; they are collected as a unit.

Index

Constants

View Source
const (
	MaxKeepaliveInterval = 5 * time.Minute
)

Variables

View Source
var (
	// KeepaliveRetryInitialWaitInterval is the initial duration to wait before
	// retrying if a keepalive attempt fails on an alloc (with a retryable failure)
	KeepaliveRetryInitialWaitInterval = 2 * time.Second
	// KeepaliveTimeout is the timeout for keepalive calls.
	KeepaliveTimeout = 10 * time.Second

	// Non-fatal keepalive failures will be retried using this policy. The policy is
	// configured such that the last retry will occur within the policy's max duration.
	// With a=KeepaliveRetryInitialWaitInterval, b=backoffFactor (1.5), n=keepaliveTries,
	// ivOffset should be such that: sum_{i=0 .. n-1} a*b^i < ivOffset
	KeepaliveRetryPolicy = retry.Jitter(retry.MaxRetries(retry.Backoff(KeepaliveRetryInitialWaitInterval, ivOffset, 1.5), keepaliveTries), 0.2)
)

Functions

func AllocExpired

func AllocExpired(a Alloc) bool

AllocExpired tells whether the alloc is expired.

func AllocExpiredBy

func AllocExpiredBy(a Alloc) time.Duration

AllocExpiredBy tells by how much the alloc is expired. Unexpired allocs will return a negative duration.

func Keepalive

func Keepalive(ctx context.Context, log *log.Logger, alloc Alloc) error

Keepalive maintains the lease on alloc until it expires (e.g., by calling Free), or until the passed-in context is cancelled. Keepalive retries errors by exponential backoffs with a fixed configuration.

Types

type Alloc

type Alloc interface {
	reflow.Executor

	// Pool returns the pool from which the alloc is reserved.
	Pool() Pool

	// ID returns the ID of alloc in the pool. The format of the ID is opaque.
	ID() string

	// Keepalive maintains the lease of this Alloc. It must be called again
	// before the expiration of the returned duration. The user may also
	// request a maintenance interval. This is just a hint and may not be
	// respected by the Alloc.
	// Clients should preferably use `pool.Keepalive` (in a goroutine) instead
	// of calling this directly.
	Keepalive(ctx context.Context, interval time.Duration) (time.Duration, error)

	// Inspect returns Alloc metadata.
	Inspect(ctx context.Context) (AllocInspect, error)

	// Free frees the alloc. Pending tasks are killed but its Repository
	// is not collected. Some implementations may implement "zombie"
	// allocs so that they can be inspected after Free is called.
	Free(ctx context.Context) error
}

Alloc represent a resource allocation attached to a single executor, a reservation of resources on a single node.

func Allocate

func Allocate(ctx context.Context, pool Pool, req reflow.Requirements, labels Labels) (Alloc, error)

Allocate attempts to place an Alloc on a pool with the given resource requirements.

func Allocs

func Allocs(ctx context.Context, pool Pool, log *log.Logger) []Alloc

Allocs fetches all of the allocs from the provided pool. If it encounters any failure (e.g., due to a context timeout), they are logged, but ignored. The returned slice contains all the successfuly fetched allocs.

type AllocInspect

type AllocInspect struct {
	ID            string
	TaskDBAllocID digest.Digest
	Resources     reflow.Resources
	Meta          AllocMeta
	Created       time.Time
	LastKeepalive time.Time
	Expires       time.Time
}

AllocInspect contains Alloc metadata.

type AllocManager

type AllocManager interface {
	// Name returns the name of the alloc manager
	Name() string

	// New creates a new alloc with the given metadata and an initial keepalive duration.
	New(ctx context.Context, id string, meta AllocMeta, keepalive time.Duration, existing []Alloc) (Alloc, error)

	// Kill kills the given alloc.
	Kill(a Alloc) error
}

AllocManager manages the creation and destruction of Allocs and is responsible for managing all the underlying resources necessary for an Alloc.

type AllocMeta

type AllocMeta struct {
	Want   reflow.Resources
	Owner  string
	Labels Labels
}

AllocMeta contains Alloc requester metadata.

type Labels

type Labels map[string]string

Labels represents a set of metadata labels for a run.

func (Labels) Add

func (l Labels) Add(k, v string) Labels

Add returns a copy of Labels l with an added key and value.

func (Labels) Copy

func (l Labels) Copy() Labels

Copy returns a copy of l.

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

Mux is a Pool implementation that multiplexes and aggregates multiple underlying pools. Mux uses a URI naming scheme to address allocs and offers. Namely, the ID the underlying pool, followed by '/' and then the ID of the alloc or offer. For example, the URI

1.worker.us-west-2a.reflowy.eng.aws.grail.com:9000/4640204a5fd6ce42

Names the alloc with ID "4640204a5fd6ce42" of the pool named 1.worker.us-west-2a.reflowy.eng.aws.grail.com:9000.

func (*Mux) Alloc

func (m *Mux) Alloc(ctx context.Context, uri string) (Alloc, error)

Alloc returns an alloc named by a URI.

func (*Mux) Allocs

func (m *Mux) Allocs(ctx context.Context) ([]Alloc, error)

Allocs returns the current set of allocs over all underlying pools.

func (*Mux) ID

func (m *Mux) ID() string

ID returns the ID of this pool. It is always empty.

func (*Mux) Offer

func (m *Mux) Offer(ctx context.Context, uri string) (Offer, error)

Offer looks up the offer named by the given URI.

func (*Mux) Offers

func (m *Mux) Offers(ctx context.Context) ([]Offer, error)

Offers enumerates all the offers available from the underlying pools. Offers applies a timeout to the underlying requests; requests that do not meet the deadline are simply dropped.

func (*Mux) Pools

func (m *Mux) Pools() []Pool

Pools retrieves the Mux's underlying pools.

func (*Mux) SetCaching

func (m *Mux) SetCaching(b bool)

SetCaching sets the caching behavior (true turns caching on).

func (*Mux) SetPools

func (m *Mux) SetPools(pools []Pool)

SetPools sets the Mux's underlying pools.

func (*Mux) Size

func (m *Mux) Size() int

Size tells how many pools the Mux comprises.

type Offer

type Offer interface {
	// ID returns the ID of the offer. It is an opaque string.
	ID() string

	// Pool returns the pool from which this Offer is extended.
	Pool() Pool

	// Available returns the amount of total available resources
	// that can be accepted.
	Available() reflow.Resources

	// Accept accepts this Offer with the given Alloc metadata. The
	// metadata includes how many resources are requested. Accept may
	// return ErrOfferExpired if another client accepted the offer
	// first.
	Accept(ctx context.Context, meta AllocMeta) (Alloc, error)
}

Offer represents an offer of resources, from which an Alloc can be created.

type OfferJSON

type OfferJSON struct {
	// The ID of the offer.
	ID string
	// The amount of available resources the offer represents.
	Available reflow.Resources
}

OfferJSON is the JSON structure used to describe offers.

type Pool

type Pool interface {
	// ID returns the ID of the pool. It is an opaque string.
	ID() string

	// Alloc returns the Alloc named by an ID.
	Alloc(ctx context.Context, id string) (Alloc, error)

	// Allocs enumerates the available Allocs in this Pool.
	Allocs(ctx context.Context) ([]Alloc, error)

	// Offer returns the Offer identified by an id.
	Offer(ctx context.Context, id string) (Offer, error)

	// Offers returns the set of current Offers from this Pool.
	// TODO(marius): it would be good to have a scanning/long-poll
	// version of this so that clients do not have to do their own polling.
	Offers(ctx context.Context) ([]Offer, error)
}

Pool is a resource pool which manages a set of allocs.

func CachingPool

func CachingPool(p Pool) Pool

CachingPool returns a Pool which caches the offers from the given pool. If the underlying pool returned no offers, then the cached result expires after emptyOffersTtl. Cached offers will expire if all them have outdated (ie, a discrepancy is detected between the cached offer and the underlying one)

type ResourcePool

type ResourcePool struct {
	// contains filtered or unexported fields
}

ResourcePool implements a resource pool backed by an alloc manager. ResourcePool simply manages a given set of resources by allowing the creation of allocs within it using a subset of the total resourecs. The underlying AllocManager is responsible for creating and destroying the actual allocs.

func NewResourcePool

func NewResourcePool(manager AllocManager, log *log.Logger) ResourcePool

func (*ResourcePool) Alive

func (p *ResourcePool) Alive(a Alloc) bool

Alive tells whether an alloc's lease is current.

func (*ResourcePool) Alloc

func (p *ResourcePool) Alloc(ctx context.Context, id string) (Alloc, error)

Alloc looks up an alloc by ID.

func (*ResourcePool) Allocs

func (p *ResourcePool) Allocs(ctx context.Context) ([]Alloc, error)

Allocs lists all the active allocs in this resource pool.

func (*ResourcePool) Available

func (p *ResourcePool) Available() reflow.Resources

Available returns the amount of currently available resources: The total less what is occupied by active allocs.

func (*ResourcePool) Free

func (p *ResourcePool) Free(a Alloc) error

Free frees alloc a from this ResourcePool and invokes `AllocManager.Kill` on it.

func (*ResourcePool) ID

func (p *ResourcePool) ID() string

ID returns the ID of the resource pool.

func (*ResourcePool) Init

func (p *ResourcePool) Init(r reflow.Resources, m map[string]Alloc)

func (*ResourcePool) New

func (p *ResourcePool) New(ctx context.Context, meta AllocMeta) (Alloc, error)

New creates a new alloc with the given meta. new collects expired allocs as needed to make room for the resource requirements as indicated by meta.

func (*ResourcePool) Offer

func (p *ResourcePool) Offer(ctx context.Context, id string) (Offer, error)

Offer looks up the an offer by ID.

func (*ResourcePool) Offers

func (p *ResourcePool) Offers(ctx context.Context) ([]Offer, error)

Offers enumerates all the current offers of this ResourcePool. It always returns either no offers, when there are no more available resources, or 1 offer comprising the entirety of available resources.

func (*ResourcePool) Resources

func (p *ResourcePool) Resources() reflow.Resources

func (*ResourcePool) StopIfIdleFor

func (p *ResourcePool) StopIfIdleFor(d time.Duration) (bool, time.Duration)

StopIfIdle stops the pool if it is idle. Returns whether the pool was stopped. If the pool was not stopped (ie, it was not idle), returns the current max duration to expiry of all allocs in the resource pool. Note that further alloc keepalive calls can make the pool unstoppable after the given duration passes.

Directories

Path Synopsis
Package client implements a remoting client for reflow pools.
Package client implements a remoting client for reflow pools.
Package server exposes a pool implementation for remote access.
Package server exposes a pool implementation for remote access.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL