xact

package
v1.3.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2024 License: MIT Imports: 24 Imported by: 1

README

This is the top eXtended Action (xaction) directory containing much of the common functionality and interfaces used by the rest of the code.

In addition, it contains subdirectories:

  • xreg - xaction registry
  • xs - concrete named xactions, e.g. apc.ActRebalance, apc.ActPromote, apc.ActSummaryBck and other enumerated kinds.

For all supported xactions, their kinds and static properties, see xact.Table.

Xaction kinds are generally consistent with the API constants from api/apc/const.go.

Extended Actions (xactions)

Batch operations that may take many seconds (minutes, hours) to execute are called eXtended actions or xactions.

Xactions run asynchronously, have one of the enumerated kinds, start/stop times, and xaction-specific statistics. Xactions start running based on a wide variety of runtime conditions that include:

  • periodic (defined by a configured interval of time)
  • resource utilization (e.g., usable capacity falling below configured watermark)
  • certain type of workload (e.g., PUT into a mirrored or erasure-coded bucket)
  • user request (e.g., to reduce the number of local object copies in a given bucket)
  • adding or removing storage targets (the events that trigger cluster-wide rebalancing)
  • adding or removing local disks (the events that cause resilver to start moving stored content between mountpaths - see Managing mountpaths)
  • and more...

Further, to reduce congestion and minimize interference with user-generated workload, extended actions (self-)throttle themselves based on configurable watermarks. The latter include disk_util_low_wm and disk_util_high_wm (see configuration). Roughly speaking, the idea is that when local disk utilization falls below the low watermark (disk_util_low_wm) extended actions that utilize local storage can run at full throttle. And vice versa.

The amount of throttling that a given xaction imposes on itself is always defined by a combination of dynamic factors. To give concrete examples, an extended action that runs LRU evictions performs its "balancing act" by taking into account the remaining storage capacity and the current utilization of the local filesystems. The mirroring (xaction) takes into account congestion on its communication channel that callers use for posting requests to create local replicas.


NOTE (Dec 2021): rest of this document is somewhat outdated and must be revisited. For the most recently updated information on running and monitoring xactions, please see:


Supported extended actions are enumerated in the user-facing API and include:

  • cluster-wide rebalancing (denoted as ActGlobalReb in the API) that gets triggered when storage targets join or leave the cluster
  • LRU-based cache eviction (see LRU) that depends on the remaining free capacity and configuration
  • prefetching batches of objects (or arbitrary size) from the Cloud (see List/Range Operations)
  • consensus voting (when conducting new leader election)
  • erasure-encoding objects in a EC-configured bucket (see Erasure coding)
  • creating additional local replicas, and reducing number of object replicas in a given locally-mirrored bucket (see Storage Services)
  • and more...

There are different actions that may be taken upon xaction. Actions include stats, start and stop. List of supported actions can be found in the API

Xaction requests are generic for all xactions, but responses from each xaction are different. See below. The request looks as follows:

  1. Single target request:

    $ curl -i -X GET  -H 'Content-Type: application/json' -d '{"action": "actiontype", "name": "xactionname", "value":{"bucket":"bucketname"}}' 'http://T/v1/daemon?what=xaction'
    

    To simplify the logic, result is always an array, even if there's only one element in the result

  2. Proxy request, which executes a request on all targets within the cluster, and responds with list of targets' responses:

    $ curl -i -X GET  -H 'Content-Type: application/json' -d '{"action": "actiontype", "name": "xactionname", "value":{"bucket":"bucketname"}}' 'http://G/v1/cluster?what=xaction'
    

    Response of a query to proxy is a map of daemonID -> target's response. If any of targets responded with error status code, the proxy's response will result in the same error response.

Start and Stop

For a successful request, the response only contains the HTTP status code. If the request was sent to the proxy and all targets responded with a successful HTTP code, the proxy would respond with the successful HTTP code. The response body should be omitted.

For an unsuccessful request, the target's response contains the error code and error message. If the request was sent to proxy and at least one of targets responded with an error code, the proxy will respond with the same error code and error message.

As always, G above (and throughout this entire README) serves as a placeholder for the real gateway's hostname/IP address and T serves for placeholder for target's hostname/IP address. More information in notation section.

The corresponding RESTful API includes support for querying all xactions including global-rebalancing and prefetch operations.

Stats

Stats request results in list of requested xactions. Statistics of each xaction share a common base format which looks as follow:

[
   {
      "id":1,
      "kind":"ec-get",
      "bucket":"test",
      "startTime":"2019-04-15T12:40:18.721697505-07:00",
      "endTime":"0001-01-01T00:00:00Z",
      "status":"InProgress"
   },
   {
      "id":2,
      "kind":"ec-put",
      "bucket":"test",
      "startTime":"2019-04-15T12:40:18.721723865-07:00",
      "endTime":"0001-01-01T00:00:00Z",
      "status":"InProgress"
   }
]

Any xaction can have additional fields, which are included in additional field called "ext"

Example rebalance stats response:

[
    {
      "id": 3,
      "kind": "rebalance",
      "bucket": "",
      "start_time": "2019-04-15T13:38:51.556388821-07:00",
      "end_time": "0001-01-01T00:00:00Z",
      "status": "InProgress",
      "count": 0,
      "ext": {
        "tx.n": 0,
        "tx.size": 0,
        "rx.n": 0,
        "rx.size": 0
      }
    }
]

If flag --all is provided, stats command will display old, finished xactions, along with currently running ones. If --all is not set (default), only the most recent xactions will be displayed, for each bucket, kind or (bucket, kind)

References

For xaction-related CLI documentation and examples, supported multi-object (batch) operations, and more, please see:

Documentation

Overview

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Index

Constants

View Source
const (
	ScopeG  = iota + 1 // cluster
	ScopeB             // bucket
	ScopeGB            // (one bucket) | (all buckets)
	ScopeT             // target
)
View Source
const (
	SepaID = ","

	LeftID  = "["
	RightID = "]"
)
View Source
const (
	DefWaitTimeShort = time.Minute        // zero `ArgsMsg.Timeout` defaults to
	DefWaitTimeLong  = 7 * 24 * time.Hour // when `ArgsMsg.Timeout` is negative
	MaxProbingFreq   = 30 * time.Second   // as the name implies
	MinPollTime      = 2 * time.Second    // ditto
	MaxPollTime      = 2 * time.Minute    // can grow up to

	// number of consecutive 'idle' xaction states, with possible numeric
	// values translating as follows:
	// 1: fully rely on xact.IsIdle() logic with no extra checks whatsoever
	// 2: one additional IsIdle() call after MinPollTime
	// 3: two additional IsIdle() calls spaced at MinPollTime interval, and so on.
	NumConsecutiveIdle = 2
)

global waiting tunables (used in: `api.WaitForXactionIC` and `api.WaitForXactionNode`)

View Source
const (
	IdleDefault = time.Minute // hk -> idle tick
)

Variables

View Source
var IncFinished func()
View Source
var Table = map[string]Descriptor{

	apc.ActElection:  {DisplayName: "elect-primary", Scope: ScopeG, Startable: false},
	apc.ActRebalance: {Scope: ScopeG, Startable: true, Metasync: true, Rebalance: true},

	apc.ActETLInline: {Scope: ScopeG, Startable: false, AbortRebRes: true},

	apc.ActLRU:          {DisplayName: "lru-eviction", Scope: ScopeGB, Startable: true},
	apc.ActStoreCleanup: {DisplayName: "cleanup", Scope: ScopeGB, Startable: true},
	apc.ActSummaryBck: {
		DisplayName: "summary",
		Scope:       ScopeGB,
		Access:      apc.AceObjLIST | apc.AceBckHEAD,
		Startable:   false,
		Metasync:    false,
	},

	apc.ActResilver: {Scope: ScopeT, Startable: true, Resilver: true},

	apc.ActECGet:     {Scope: ScopeB, Startable: false, Idles: true, ExtendedStats: true},
	apc.ActECPut:     {Scope: ScopeB, Startable: false, RefreshCap: true, Idles: true, ExtendedStats: true},
	apc.ActECRespond: {Scope: ScopeB, Startable: false, Idles: true},
	apc.ActPutCopies: {Scope: ScopeB, Startable: false, RefreshCap: true, Idles: true},

	apc.ActArchive: {Scope: ScopeB, Access: apc.AccessRW, Startable: false, RefreshCap: true, Idles: true},
	apc.ActCopyObjects: {
		DisplayName: "copy-objects",
		Scope:       ScopeB,
		Access:      apc.AccessRW,
		Startable:   false,
		RefreshCap:  true,
		Idles:       true,
	},
	apc.ActETLObjects: {
		DisplayName: "etl-objects",
		Scope:       ScopeB,
		Access:      apc.AccessRW,
		Startable:   false,
		RefreshCap:  true,
		Idles:       true,
		AbortRebRes: true,
	},

	apc.ActBlobDl: {Access: apc.AccessRW, Scope: ScopeB, Startable: true, AbortRebRes: true, RefreshCap: true},

	apc.ActDownload: {Access: apc.AccessRW, Scope: ScopeG, Startable: false, Idles: true, AbortRebRes: true},

	apc.ActDsort: {
		DisplayName:    "dsort",
		Scope:          ScopeB,
		Access:         apc.AccessRW,
		Startable:      false,
		RefreshCap:     true,
		ConflictRebRes: true,
		ExtendedStats:  true,
		AbortRebRes:    true,
	},

	apc.ActPromote: {
		DisplayName: "promote-files",
		Scope:       ScopeB,
		Access:      apc.AcePromote,
		Startable:   false,
		RefreshCap:  true,
	},
	apc.ActEvictObjects: {
		DisplayName: "evict-objects",
		Scope:       ScopeB,
		Access:      apc.AceObjDELETE,
		Startable:   false,
		RefreshCap:  true,
	},
	apc.ActDeleteObjects: {
		DisplayName: "delete-objects",
		Scope:       ScopeB,
		Access:      apc.AceObjDELETE,
		Startable:   false,
		RefreshCap:  true,
	},
	apc.ActPrefetchObjects: {
		DisplayName: "prefetch-objects",
		Scope:       ScopeB,
		Access:      apc.AccessRW,
		Startable:   true,
		RefreshCap:  true,
	},

	apc.ActECEncode: {
		DisplayName:    "ec-bucket",
		Scope:          ScopeB,
		Access:         apc.AccessRW,
		Startable:      true,
		Metasync:       true,
		RefreshCap:     true,
		ConflictRebRes: true,
	},
	apc.ActMakeNCopies: {
		DisplayName: "mirror",
		Scope:       ScopeB,
		Access:      apc.AccessRW,
		Startable:   true,
		Metasync:    true,
		RefreshCap:  true,
	},
	apc.ActMoveBck: {
		DisplayName:    "rename-bucket",
		Scope:          ScopeB,
		Access:         apc.AceMoveBucket,
		Startable:      false,
		Metasync:       true,
		Rebalance:      true,
		ConflictRebRes: true,
	},
	apc.ActCopyBck: {
		DisplayName:    "copy-bucket",
		Scope:          ScopeB,
		Access:         apc.AccessRW,
		Startable:      false,
		Metasync:       true,
		RefreshCap:     true,
		ConflictRebRes: true,
	},
	apc.ActETLBck: {
		DisplayName: "etl-bucket",
		Scope:       ScopeB,
		Access:      apc.AccessRW,
		Startable:   false,
		Metasync:    true,
		RefreshCap:  true,
		AbortRebRes: true,
	},

	apc.ActList: {Scope: ScopeB, Access: apc.AceObjLIST, Startable: false, Metasync: false, Idles: true},

	apc.ActLoadLomCache:   {DisplayName: "warm-up-metadata", Scope: ScopeB, Startable: true},
	apc.ActInvalListCache: {Scope: ScopeB, Access: apc.AceObjLIST, Startable: false},
}

`xact.Table` is a static, public, and global Kind=>[Xaction Descriptor] map that contains xaction kinds and static properties, such as `Startable`, `Owned`, etc. In particular, "startability" is narrowly defined as ability to start xaction via `api.StartXaction` (whereby copying bucket, for instance, requires a separate `api.CopyBucket`, etc.)

Functions

func Cname added in v1.3.22

func Cname(kind, uuid string) string

func CompareRebIDs

func CompareRebIDs(someID, fltID string) int

func GetKindName added in v1.3.16

func GetKindName(kindOrName string) (kind, name string)

func GoRunW

func GoRunW(xctn core.Xact)

common helper to go-run and wait until it actually starts running

func IdlesBeforeFinishing added in v1.3.16

func IdlesBeforeFinishing(kindOrName string) bool

func IsSameScope added in v1.3.16

func IsSameScope(kindOrName string, scs ...int) bool

func IsValidKind

func IsValidKind(kind string) bool

func IsValidRebID

func IsValidRebID(id string) (valid bool)

func IsValidUUID added in v1.3.16

func IsValidUUID(id string) bool

func ListDisplayNames added in v1.3.16

func ListDisplayNames(onlyStartable bool) (names []string)

func ParseCname added in v1.3.22

func ParseCname(cname string) (xactKind, xactID string, _ error)

func RebID2S

func RebID2S(id int64) string

func RefcntQuiCB

func RefcntQuiCB(refc *atomic.Int32, maxTimeout, totalSoFar time.Duration) core.QuiRes

common ref-counted quiescence

func S2RebID

func S2RebID(id string) (int64, error)

Types

type ArgsMsg added in v1.3.16

type ArgsMsg struct {
	ID   string // xaction UUID
	Kind string // xaction kind _or_ name (see `xact.Table`)

	// optional parameters
	DaemonID    string        // node that runs this xaction
	Bck         cmn.Bck       // bucket
	Buckets     []cmn.Bck     // list of buckets (e.g., copy-bucket, lru-evict, etc.)
	Timeout     time.Duration // max time to wait
	Force       bool          // force
	OnlyRunning bool          // only for running xactions
}

either xaction ID or Kind must be specified is getting passed via ActMsg.Value w/ MorphMarshal extraction

func (*ArgsMsg) String added in v1.3.16

func (args *ArgsMsg) String() (s string)

type Base

type Base struct {
	// contains filtered or unexported fields
}

func (*Base) Abort

func (xctn *Base) Abort(err error) bool

func (*Base) AbortErr

func (xctn *Base) AbortErr() error

func (*Base) AbortedAfter

func (xctn *Base) AbortedAfter(d time.Duration) (err error)

func (*Base) AddErr added in v1.3.18

func (xctn *Base) AddErr(err error, logExtra ...int)

func (*Base) AddNotif

func (xctn *Base) AddNotif(n core.Notif)

func (*Base) Bck

func (xctn *Base) Bck() *meta.Bck

func (*Base) Bytes

func (xctn *Base) Bytes() int64

func (*Base) ChanAbort

func (xctn *Base) ChanAbort() <-chan error

func (*Base) Cname added in v1.3.22

func (xctn *Base) Cname() string

func (*Base) EndTime

func (xctn *Base) EndTime() time.Time

func (*Base) Err added in v1.3.18

func (xctn *Base) Err() error

func (*Base) ErrCnt added in v1.3.18

func (xctn *Base) ErrCnt() int

func (*Base) Finish

func (xctn *Base) Finish()

atomically set end-time

func (*Base) Finished

func (xctn *Base) Finished() bool

func (*Base) FromTo

func (*Base) FromTo() (*meta.Bck, *meta.Bck)

func (*Base) ID

func (xctn *Base) ID() string

func (*Base) InBytes

func (xctn *Base) InBytes() int64

func (*Base) InObjs

func (xctn *Base) InObjs() int64

base stats: receive

func (*Base) InObjsAdd

func (xctn *Base) InObjsAdd(cnt int, size int64)

func (*Base) InitBase

func (xctn *Base) InitBase(id, kind string, bck *meta.Bck)

func (*Base) IsAborted

func (xctn *Base) IsAborted() bool

func (*Base) IsIdle added in v1.3.16

func (xctn *Base) IsIdle() bool

func (*Base) JoinErr added in v1.3.18

func (xctn *Base) JoinErr() (int, error)

func (*Base) Kind

func (xctn *Base) Kind() string

func (*Base) LomAdd added in v1.3.16

func (xctn *Base) LomAdd(lom *core.LOM)

oft. used

func (*Base) Name

func (xctn *Base) Name() (s string)

func (*Base) Objs

func (xctn *Base) Objs() int64

base stats: locally processed

func (*Base) ObjsAdd

func (xctn *Base) ObjsAdd(cnt int, size int64)

func (*Base) OutBytes

func (xctn *Base) OutBytes() int64

func (*Base) OutObjs

func (xctn *Base) OutObjs() int64

base stats: transmit

func (*Base) OutObjsAdd

func (xctn *Base) OutObjsAdd(cnt int, size int64)

func (*Base) Quiesce

func (xctn *Base) Quiesce(d time.Duration, cb core.QuiCB) core.QuiRes

count all the way to duration; reset and adjust every time activity is detected

func (*Base) Running

func (xctn *Base) Running() (yes bool)

func (*Base) StartTime

func (xctn *Base) StartTime() time.Time

func (*Base) String

func (xctn *Base) String() string

func (*Base) ToSnap

func (xctn *Base) ToSnap(snap *core.Snap)

provided for external use to fill-in xaction-specific `SnapExt` part

func (*Base) ToStats

func (xctn *Base) ToStats(stats *core.Stats)

type BckJog

type BckJog struct {
	Config *cmn.Config

	Base
	// contains filtered or unexported fields
}

func (*BckJog) Init

func (r *BckJog) Init(id, kind string, bck *meta.Bck, opts *mpather.JgroupOpts, config *cmn.Config)

func (*BckJog) Run

func (r *BckJog) Run()

func (*BckJog) Wait

func (r *BckJog) Wait() error

type Demand

type Demand interface {
	core.Xact
	IdleTimer() <-chan struct{}
	IncPending()
	DecPending()
	SubPending(n int)
}

xaction that self-terminates after staying idle for a while with an added capability to renew itself and ref-count its pending work

type DemandBase

type DemandBase struct {
	Base
	// contains filtered or unexported fields
}

func (*DemandBase) Abort

func (r *DemandBase) Abort(err error) (ok bool)

func (*DemandBase) DecPending

func (r *DemandBase) DecPending()

func (*DemandBase) IdleTimer

func (r *DemandBase) IdleTimer() <-chan struct{}

func (*DemandBase) IncPending

func (r *DemandBase) IncPending()

func (*DemandBase) Init

func (r *DemandBase) Init(uuid, kind string, bck *meta.Bck, idleDur time.Duration)

func (*DemandBase) IsIdle added in v1.3.16

func (r *DemandBase) IsIdle() bool

NOTE: override `Base.IsIdle`

func (*DemandBase) Pending

func (r *DemandBase) Pending() (cnt int64)

func (*DemandBase) Reset added in v1.3.22

func (r *DemandBase) Reset(idleTime time.Duration)

(e.g. usage: listed last page)

func (*DemandBase) Stop

func (r *DemandBase) Stop()

func (*DemandBase) SubPending

func (r *DemandBase) SubPending(n int)

type Descriptor

type Descriptor struct {
	DisplayName string          // as implied
	Access      apc.AccessAttrs // access permissions (see: apc.Access*)
	Scope       int             // ScopeG (global), etc. - the enum above
	Startable   bool            // true if user can start this xaction (e.g., via `api.StartXaction`)
	Metasync    bool            // true if this xaction changes (and metasyncs) cluster metadata
	RefreshCap  bool            // refresh capacity stats upon completion

	// see xreg for "limited coexistence"
	Rebalance      bool // moves data between nodes
	Resilver       bool // moves data between mountpaths
	ConflictRebRes bool // conflicts with rebalance/resilver
	AbortRebRes    bool // gets aborted upon rebalance/resilver - currently, all `ext`-ensions

	// xaction has an intermediate `idle` state whereby it "idles" between requests
	// (see related: xact/demand.go)
	Idles bool

	// xaction returns extended xaction-specific stats
	// (see related: `Snap.Ext` in core/xaction.go)
	ExtendedStats bool
}

func GetDescriptor added in v1.3.16

func GetDescriptor(kindOrName string) (string, Descriptor, error)

type Marked

type Marked struct {
	Xact        core.Xact
	Interrupted bool // (rebalance | resilver) interrupted
	Restarted   bool // node restarted
}

type MultiSnap added in v1.3.16

type MultiSnap map[string][]*core.Snap // by target ID (tid)

primarily: `api.QueryXactionSnaps`

func (MultiSnap) ByteCounts added in v1.3.16

func (xs MultiSnap) ByteCounts(xid string) (locBytes, outBytes, inBytes int64)

func (MultiSnap) GetUUIDs added in v1.3.16

func (xs MultiSnap) GetUUIDs() []string

func (MultiSnap) IsAborted added in v1.3.16

func (xs MultiSnap) IsAborted(xid string) (bool, error)

func (MultiSnap) IsIdle added in v1.3.16

func (xs MultiSnap) IsIdle(xid string) (aborted, running, notstarted bool)

(all targets, all xactions)

func (MultiSnap) ObjCounts added in v1.3.16

func (xs MultiSnap) ObjCounts(xid string) (locObjs, outObjs, inObjs int64)

func (MultiSnap) RunningTarget added in v1.3.16

func (xs MultiSnap) RunningTarget(xid string) (string, *core.Snap, error)

func (MultiSnap) TotalRunningTime added in v1.3.16

func (xs MultiSnap) TotalRunningTime(xid string) (time.Duration, error)

type NotifXact

type NotifXact struct {
	Xact core.Xact
	nl.Base
}

func (*NotifXact) ToNotifMsg

func (nx *NotifXact) ToNotifMsg(aborted bool) core.NotifMsg

type NotifXactListener

type NotifXactListener struct {
	nl.ListenerBase
}

func NewXactNL

func NewXactNL(uuid, kind string, smap *meta.Smap, srcs meta.NodeMap, bck ...*cmn.Bck) *NotifXactListener

func (*NotifXactListener) QueryArgs

func (nxb *NotifXactListener) QueryArgs() cmn.HreqArgs

func (*NotifXactListener) UnmarshalStats

func (*NotifXactListener) UnmarshalStats(rawMsg []byte) (stats any, finished, aborted bool, err error)

func (*NotifXactListener) WithCause added in v1.3.21

func (nxb *NotifXactListener) WithCause(cause string) *NotifXactListener

type QueryMsg

type QueryMsg struct {
	OnlyRunning *bool     `json:"show_active"`
	Bck         cmn.Bck   `json:"bck"`
	ID          string    `json:"id"`
	Kind        string    `json:"kind"`
	DaemonID    string    `json:"node,omitempty"`
	Buckets     []cmn.Bck `json:"buckets,omitempty"`
}

simplified JSON-tagged version of the above

func (*QueryMsg) String

func (msg *QueryMsg) String() (s string)

Directories

Path Synopsis
Package xreg provides registry and (renew, find) functions for AIS eXtended Actions (xactions).
Package xreg provides registry and (renew, find) functions for AIS eXtended Actions (xactions).
Package xs is a collection of eXtended actions (xactions), including multi-object operations, list-objects, (cluster) rebalance and (target) resilver, ETL, and more.
Package xs is a collection of eXtended actions (xactions), including multi-object operations, list-objects, (cluster) rebalance and (target) resilver, ETL, and more.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL