node

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2024 License: Apache-2.0 Imports: 82 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultComputeConfig = ComputeConfigParams{
	PhysicalResourcesProvider: compute_system.NewPhysicalCapacityProvider(),
	DefaultJobResourceLimits: models.Resources{
		CPU:    0.1,
		Memory: 100 * 1024 * 1024,
	},

	JobNegotiationTimeout:      3 * time.Minute,
	MinJobExecutionTimeout:     500 * time.Millisecond,
	MaxJobExecutionTimeout:     model.NoJobTimeout,
	DefaultJobExecutionTimeout: model.NoJobTimeout,

	LogRunningExecutionsInterval: 10 * time.Second,
	JobSelectionPolicy:           NewDefaultJobSelectionPolicy(),
	LocalPublisher: types.LocalPublisherConfig{
		Directory: path.Join(config.GetStoragePath(), "bacalhau-local-publisher"),
	},
}
View Source
var DefaultNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{
	Interval:             30 * time.Second,
	EagerPublishInterval: 5 * time.Second,
	EagerPublishDuration: 30 * time.Second,
}
View Source
var DefaultRequesterConfig = RequesterConfigParams{
	JobDefaults: transformer.JobDefaults{
		ExecutionTimeout: model.NoJobTimeout,
	},

	HousekeepingBackgroundTaskInterval: 30 * time.Second,
	NodeRankRandomnessRange:            5,
	OverAskForBidsFactor:               3,

	MinBacalhauVersion: models.BuildVersionInfo{
		Major: "1", Minor: "0", GitVersion: "v1.0.4",
	},

	EvalBrokerVisibilityTimeout:    60 * time.Second,
	EvalBrokerInitialRetryDelay:    1 * time.Second,
	EvalBrokerSubsequentRetryDelay: 30 * time.Second,
	EvalBrokerMaxRetryCount:        10,

	WorkerCount:                  runtime.NumCPU(),
	WorkerEvalDequeueTimeout:     5 * time.Second,
	WorkerEvalDequeueBaseBackoff: 1 * time.Second,
	WorkerEvalDequeueMaxBackoff:  30 * time.Second,

	S3PreSignedURLDisabled:   false,
	S3PreSignedURLExpiration: 30 * time.Minute,

	TranslationEnabled: false,
}
View Source
var TestNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{
	Interval:             30 * time.Second,
	EagerPublishInterval: 10 * time.Millisecond,
	EagerPublishDuration: 5 * time.Second,
}

TestNodeInfoPublishConfig speeds up node announcements for tests

View Source
var TestRequesterConfig = RequesterConfigParams{
	JobDefaults: transformer.JobDefaults{
		ExecutionTimeout: 30 * time.Second,
	},
	HousekeepingBackgroundTaskInterval: 30 * time.Second,
	NodeRankRandomnessRange:            5,
	OverAskForBidsFactor:               3,

	MinBacalhauVersion: models.BuildVersionInfo{
		Major: "1", Minor: "0", GitVersion: "v1.0.4",
	},

	EvalBrokerVisibilityTimeout:    5 * time.Second,
	EvalBrokerInitialRetryDelay:    100 * time.Millisecond,
	EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond,
	EvalBrokerMaxRetryCount:        3,

	WorkerCount:                  3,
	WorkerEvalDequeueTimeout:     200 * time.Millisecond,
	WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond,
	WorkerEvalDequeueMaxBackoff:  200 * time.Millisecond,

	TranslationEnabled: false,

	S3PreSignedURLDisabled:   false,
	S3PreSignedURLExpiration: 30 * time.Minute,
}

Functions

func GetNodeInfoPublishConfig added in v1.0.4

func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig

Types

type AuthenticatorsFactory added in v1.2.1

type AuthenticatorsFactory = Factory[authn.Authenticator]

func NewStandardAuthenticatorsFactory added in v1.2.1

func NewStandardAuthenticatorsFactory() AuthenticatorsFactory

type AuthenticatorsFactoryFunc added in v1.2.1

type AuthenticatorsFactoryFunc = FactoryFunc[authn.Authenticator]

type Compute

type Compute struct {
	// Visible for testing
	ID               string
	LocalEndpoint    compute.Endpoint
	Capacity         capacity.Tracker
	ExecutionStore   store.ExecutionStore
	Executors        executor.ExecutorProvider
	Storages         storage.StorageProvider
	Bidder           compute.Bidder
	ManagementClient *compute.ManagementClient
	// contains filtered or unexported fields
}

func NewComputeNode

func NewComputeNode(
	ctx context.Context,
	nodeID string,
	cleanupManager *system.CleanupManager,
	apiServer *publicapi.Server,
	config ComputeConfig,
	storagePath string,
	storages storage.StorageProvider,
	executors executor.ExecutorProvider,
	publishers publisher.PublisherProvider,
	computeCallback compute.Callback,
	managementProxy compute.ManagementEndpoint,
	configuredLabels map[string]string,
) (*Compute, error)

func (*Compute) Cleanup added in v1.2.2

func (c *Compute) Cleanup(ctx context.Context)

type ComputeConfig

type ComputeConfig struct {
	// Capacity config
	TotalResourceLimits          models.Resources
	QueueResourceLimits          models.Resources
	JobResourceLimits            models.Resources
	DefaultJobResourceLimits     models.Resources
	IgnorePhysicalResourceLimits bool

	// JobNegotiationTimeout default timeout value to hold a bid for a job
	JobNegotiationTimeout time.Duration
	// MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with
	// lower timeout requirements will not be bid on.
	MinJobExecutionTimeout time.Duration
	// MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with
	// higher timeout requirements will not be bid on.
	MaxJobExecutionTimeout time.Duration
	// DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with
	// no timeout requirement defined.
	DefaultJobExecutionTimeout time.Duration

	// JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout
	// check.
	JobExecutionTimeoutClientIDBypassList []string

	// Bid strategies config
	JobSelectionPolicy JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	// How many messages to buffer in the log stream channel
	LogStreamBufferSize int

	FailureInjectionConfig model.FailureInjectionComputeConfig

	BidSemanticStrategy bidstrategy.SemanticBidStrategy

	BidResourceStrategy bidstrategy.ResourceBidStrategy

	ExecutionStore store.ExecutionStore

	LocalPublisher types.LocalPublisherConfig
}

func NewComputeConfigWith

func NewComputeConfigWith(params ComputeConfigParams) (ComputeConfig, error)

func NewComputeConfigWithDefaults

func NewComputeConfigWithDefaults() (ComputeConfig, error)

type ComputeConfigParams

type ComputeConfigParams struct {
	// Capacity config
	TotalResourceLimits          models.Resources
	QueueResourceLimits          models.Resources
	JobResourceLimits            models.Resources
	DefaultJobResourceLimits     models.Resources
	PhysicalResourcesProvider    capacity.Provider
	IgnorePhysicalResourceLimits bool

	// Timeout config
	JobNegotiationTimeout      time.Duration
	MinJobExecutionTimeout     time.Duration
	MaxJobExecutionTimeout     time.Duration
	DefaultJobExecutionTimeout time.Duration

	JobExecutionTimeoutClientIDBypassList []string

	// Bid strategies config
	JobSelectionPolicy JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	// How many messages to buffer in the log stream channel
	LogStreamBufferSize int

	FailureInjectionConfig model.FailureInjectionComputeConfig

	BidSemanticStrategy bidstrategy.SemanticBidStrategy
	BidResourceStrategy bidstrategy.ResourceBidStrategy

	ExecutionStore store.ExecutionStore

	LocalPublisher types.LocalPublisherConfig
}

type ConfigLabelsProvider added in v1.2.1

type ConfigLabelsProvider struct {
	// contains filtered or unexported fields
}

func (*ConfigLabelsProvider) GetLabels added in v1.2.1

func (p *ConfigLabelsProvider) GetLabels(context.Context) map[string]string

type ExecutorsFactory

type ExecutorsFactory = Factory[executor.Executor]

func NewPluginExecutorFactory added in v1.0.4

func NewPluginExecutorFactory() ExecutorsFactory

func NewStandardExecutorsFactory

func NewStandardExecutorsFactory() ExecutorsFactory

type ExecutorsFactoryFunc

type ExecutorsFactoryFunc = FactoryFunc[executor.Executor]

type Factory added in v1.2.1

type Factory[P provider.Providable] interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
}

Interfaces to inject dependencies into the stack

type FactoryFunc added in v1.2.1

type FactoryFunc[P provider.Providable] func(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)

Functions that implement the factories for easier creation of new implementations

func (FactoryFunc[P]) Get added in v1.2.1

func (f FactoryFunc[P]) Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)

type FeatureConfig added in v0.3.29

type FeatureConfig struct {
	Engines    []string
	Publishers []string
	Storages   []string
}

type JobSelectionPolicy added in v1.0.4

type JobSelectionPolicy struct {
	// this describes if we should run a job based on
	// where the data is located - i.e. if the data is "local"
	// or if the data is "anywhere"
	Locality semantic.JobSelectionDataLocality `json:"locality"`
	// should we reject jobs that don't specify any data
	// the default is "accept"
	RejectStatelessJobs bool `json:"reject_stateless_jobs"`
	// should we accept jobs that specify networking
	// the default is "reject"
	AcceptNetworkedJobs bool `json:"accept_networked_jobs"`
	// external hooks that decide if we should take on the job or not
	// if either of these are given they will override the data locality settings
	ProbeHTTP string `json:"probe_http,omitempty"`
	ProbeExec string `json:"probe_exec,omitempty"`
}

JobSelectionPolicy describe the rules for how a compute node selects an incoming job

func NewDefaultJobSelectionPolicy added in v1.0.4

func NewDefaultJobSelectionPolicy() JobSelectionPolicy

type NetworkConfig added in v1.2.1

type NetworkConfig struct {
	Type           string
	Libp2pHost     host.Host // only set if using libp2p transport, nil otherwise
	ReconnectDelay time.Duration

	// NATS config for requesters to be reachable by compute nodes
	Port              int
	AdvertisedAddress string
	Orchestrators     []string

	// Storage directory for NATS features that require it
	StoreDir string

	// AuthSecret is a secret string that clients must use to connect. NATS servers
	// must supply this config, while clients can also supply it as the user part
	// of their Orchestrator URL.
	AuthSecret string

	// NATS config for requester nodes to connect with each other
	ClusterName              string
	ClusterPort              int
	ClusterAdvertisedAddress string

	// When using NATS, never set this value unless you are connecting multiple requester
	// nodes together. This should never reference this current running instance (e.g.
	// don't use localhost).
	ClusterPeers []string
}

func (*NetworkConfig) Validate added in v1.2.1

func (c *NetworkConfig) Validate() error

type Node

type Node struct {
	// Visible for testing
	ID             string
	APIServer      *publicapi.Server
	ComputeNode    *Compute
	RequesterNode  *Requester
	CleanupManager *system.CleanupManager
	IPFSClient     ipfs.Client
	Libp2pHost     host.Host // only set if using libp2p transport, nil otherwise
}

func NewNode

func NewNode(
	ctx context.Context,
	config NodeConfig) (*Node, error)

func (*Node) IsComputeNode

func (n *Node) IsComputeNode() bool

IsComputeNode returns true if the node is a compute node

func (*Node) IsRequesterNode

func (n *Node) IsRequesterNode() bool

IsRequesterNode returns true if the node is a requester node

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

type NodeConfig

type NodeConfig struct {
	NodeID                      string
	IPFSClient                  ipfs.Client
	CleanupManager              *system.CleanupManager
	HostAddress                 string
	APIPort                     uint16
	RequesterAutoCert           string
	RequesterAutoCertCache      string
	RequesterTLSCertificateFile string
	RequesterTLSKeyFile         string
	DisabledFeatures            FeatureConfig
	ComputeConfig               ComputeConfig
	RequesterNodeConfig         RequesterConfig
	APIServerConfig             publicapi.Config
	AuthConfig                  types.AuthConfig
	NodeType                    models.NodeType
	IsRequesterNode             bool
	IsComputeNode               bool
	Labels                      map[string]string
	NodeInfoPublisherInterval   routing.NodeInfoPublisherIntervalConfig
	DependencyInjector          NodeDependencyInjector
	AllowListedLocalPaths       []string
	NodeInfoStoreTTL            time.Duration

	NetworkConfig NetworkConfig
}

Node configuration

func (*NodeConfig) Validate added in v1.2.1

func (c *NodeConfig) Validate() error

type NodeDependencyInjector

type NodeDependencyInjector struct {
	StorageProvidersFactory StorageProvidersFactory
	ExecutorsFactory        ExecutorsFactory
	PublishersFactory       PublishersFactory
	AuthenticatorsFactory   AuthenticatorsFactory
}

Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.

func NewExecutorPluginNodeDependencyInjector added in v1.0.4

func NewExecutorPluginNodeDependencyInjector() NodeDependencyInjector

func NewStandardNodeDependencyInjector

func NewStandardNodeDependencyInjector() NodeDependencyInjector

type PublishersFactory

type PublishersFactory = Factory[publisher.Publisher]

func NewStandardPublishersFactory

func NewStandardPublishersFactory() PublishersFactory

type PublishersFactoryFunc

type PublishersFactoryFunc = FactoryFunc[publisher.Publisher]

type Requester

type Requester struct {
	// Visible for testing
	Endpoint   requester.Endpoint
	EndpointV2 *orchestrator.BaseEndpoint
	JobStore   jobstore.Store
	// We need a reference to the node info store until libp2p is removed
	NodeInfoStore  routing.NodeInfoStore
	NodeDiscoverer orchestrator.NodeDiscoverer
	// contains filtered or unexported fields
}

func NewRequesterNode

func NewRequesterNode(
	ctx context.Context,
	nodeID string,
	apiServer *publicapi.Server,
	requesterConfig RequesterConfig,
	storageProvider storage.StorageProvider,
	authnProvider authn.Provider,
	nodeInfoStore routing.NodeInfoStore,
	computeProxy compute.Endpoint,
	nodeManager *manager.NodeManager,
) (*Requester, error)

type RequesterConfig

type RequesterConfig struct {
	RequesterConfigParams
}

func NewRequesterConfigWith

func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error)

func NewRequesterConfigWithDefaults

func NewRequesterConfigWithDefaults() (RequesterConfig, error)

type RequesterConfigParams

type RequesterConfigParams struct {
	JobDefaults transformer.JobDefaults

	HousekeepingBackgroundTaskInterval time.Duration
	NodeRankRandomnessRange            int
	OverAskForBidsFactor               uint
	JobSelectionPolicy                 JobSelectionPolicy
	ExternalValidatorWebhook           *url.URL
	FailureInjectionConfig             model.FailureInjectionRequesterConfig

	// minimum version of compute nodes that the requester will accept and route jobs to
	MinBacalhauVersion models.BuildVersionInfo

	RetryStrategy orchestrator.RetryStrategy

	// evaluation broker config
	EvalBrokerVisibilityTimeout    time.Duration
	EvalBrokerInitialRetryDelay    time.Duration
	EvalBrokerSubsequentRetryDelay time.Duration
	EvalBrokerMaxRetryCount        int

	// worker config
	WorkerCount                  int
	WorkerEvalDequeueTimeout     time.Duration
	WorkerEvalDequeueBaseBackoff time.Duration
	WorkerEvalDequeueMaxBackoff  time.Duration

	// Should the orchestrator attempt to translate jobs?
	TranslationEnabled bool

	S3PreSignedURLDisabled   bool
	S3PreSignedURLExpiration time.Duration

	JobStore jobstore.Store

	DefaultPublisher string
}

type RuntimeLabelsProvider added in v1.2.1

type RuntimeLabelsProvider struct{}

func (*RuntimeLabelsProvider) GetLabels added in v1.2.1

GetLabels implements models.LabelsProvider.

type StorageProvidersFactory

type StorageProvidersFactory = Factory[storage.Storage]

func NewStandardStorageProvidersFactory

func NewStandardStorageProvidersFactory() StorageProvidersFactory

Standard implementations used in prod and when testing prod behavior

type StorageProvidersFactoryFunc

type StorageProvidersFactoryFunc = FactoryFunc[storage.Storage]

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL