job

package
v1.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2024 License: LGPL-3.0 Imports: 24 Imported by: 0

Documentation

Overview

Package job provides subcommands to manage the jobs of an user.

USAGE:

dps job command [command options] [arguments...]

COMMANDS:

get      Get job.
panic    Panic a job (need a METASCHEDULER role).
topup    Top up a job.
help, h  Shows a list of commands or help for one command

Index

Constants

This section is empty.

Variables

View Source
var Command = cli.Command{
	Name:  "job",
	Usage: "Manage jobs.",
	Subcommands: []*cli.Command{
		{
			Name:      "get",
			Usage:     "Get job.",
			Flags:     flags,
			ArgsUsage: "<job ID>",
			Action: func(cCtx *cli.Context) error {
				if cCtx.NArg() < 1 {
					return errors.New("missing arguments")
				}
				jobIDBig, ok := new(big.Int).SetString(cCtx.Args().First(), 10)
				if !ok {
					return errors.New("failed to parse job ID")
				}
				var jobID [32]byte
				jobIDBig.FillBytes(jobID[:])
				ctx := cCtx.Context
				rpcClient, err := rpc.DialOptions(
					ctx,
					ethEndpointRPC,
					rpc.WithHTTPClient(http.DefaultClient),
				)
				if err != nil {
					return err
				}
				defer rpcClient.Close()
				ethClientRPC := ethclient.NewClient(rpcClient)
				chainID, err := ethClientRPC.ChainID(ctx)
				if err != nil {
					return err
				}
				clientset := metascheduler.NewRPCClientSet(metascheduler.Backend{
					EthereumBackend:      ethClientRPC,
					MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract),
					ChainID:              chainID,
				})
				job, err := clientset.JobFetcher().GetJob(ctx, jobID)
				if err != nil {
					return err
				}
				jobJSON, err := json.MarshalIndent(job, "", "  ")
				if err != nil {
					return err
				}
				fmt.Println(string(jobJSON))
				return nil
			},
		},
		{
			Name:      "logs",
			Usage:     "Watch job logs.",
			Flags:     logsFlags,
			ArgsUsage: "<job ID>",
			Action: func(cCtx *cli.Context) error {
				if cCtx.NArg() < 1 {
					return errors.New("missing arguments")
				}
				pk, err := utils.GetPrivateKey(ethHexPK, ethHexPKPath)
				if err != nil {
					return err
				}
				jobIDBig, ok := new(big.Int).SetString(cCtx.Args().First(), 10)
				if !ok {
					return errors.New("failed to parse job ID")
				}
				var jobID [32]byte
				jobIDBig.FillBytes(jobID[:])
				ctx := cCtx.Context
				client, err := deepsquare.NewClient(ctx, &deepsquare.ClientConfig{
					MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract),
					RPCEndpoint:          ethEndpointRPC,
					LoggerEndpoint:       loggerEndpoint,
					UserPrivateKey:       pk,
				})
				if err != nil {
					return err
				}
				defer func() {
					if err := client.Close(); err != nil {
						internallog.I.Error("failed to close client", zap.Error(err))
					}
				}()
				watcher, err := deepsquare.NewWatcher(ctx, &deepsquare.WatcherConfig{
					MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract),
					RPCEndpoint:          ethEndpointRPC,
					WSEndpoint:           ethEndpointWS,
					UserPrivateKey:       pk,
				})
				if err != nil {
					return err
				}
				defer func() {
					if err := watcher.Close(); err != nil {
						internallog.I.Error("failed to close watcher", zap.Error(err))
					}
				}()
				transitions := make(chan types.JobTransition, 1)
				sub, err := watcher.SubscribeEvents(
					ctx,
					event.FilterJobTransition(transitions),
				)
				if err != nil {
					return err
				}
				defer sub.Unsubscribe()

				job, err := client.GetJob(ctx, jobID)
				if err != nil {
					return err
				}

				switch metascheduler.JobStatus(job.Status) {
				case metascheduler.JobStatusCancelled,
					metascheduler.JobStatusFailed,
					metascheduler.JobStatusFinished,
					metascheduler.JobStatusPanicked,
					metascheduler.JobStatusOutOfCredits,
					metascheduler.JobStatusRunning:
				default:
					_, err = waitUntilJobRunningOrFinished(sub, transitions, jobID)
					if err != nil {
						fmt.Printf("---Waiting for job running failed---\n%s\n", err)
						return err
					}
				}

				stream, err := client.WatchLogs(ctx, jobID)
				if err != nil {
					fmt.Printf("---Watching logs has unexpectedly failed---\n%s\n", err)
					return err
				}
				defer func() {
					_ = stream.CloseSend()
				}()
				for {
					req, err := stream.Recv()
					if err == io.EOF || errors.Is(err, context.Canceled) {
						fmt.Println("---Connection to logging server closed---")
						return nil
					}
					if err != nil {
						fmt.Printf(
							"---Connection to logging server closed unexpectedly---\n%s\n",
							err,
						)
						return err
					}
					if noTimestamp {
						fmt.Printf("%s\n", string(req.GetData()))
					} else {
						fmt.Printf("%s:\t%s\n", time.Unix(0, req.GetTimestamp()), forbiddenReplacer.Replace(string(req.GetData())))
					}
				}
			},
		},
		{
			Name:      "cancel",
			Usage:     "Cancel job.",
			Flags:     authFlags,
			ArgsUsage: "<job ID>",
			Action: func(cCtx *cli.Context) error {
				if cCtx.NArg() < 1 {
					return errors.New("missing arguments")
				}
				pk, err := utils.GetPrivateKey(ethHexPK, ethHexPKPath)
				if err != nil {
					return err
				}
				jobIDBig, ok := new(big.Int).SetString(cCtx.Args().First(), 10)
				if !ok {
					return errors.New("failed to parse job ID")
				}
				var jobID [32]byte
				jobIDBig.FillBytes(jobID[:])
				ctx := cCtx.Context
				rpcClient, err := rpc.DialOptions(
					ctx,
					ethEndpointRPC,
					rpc.WithHTTPClient(http.DefaultClient),
				)
				if err != nil {
					return err
				}
				defer rpcClient.Close()
				ethClientRPC := ethclient.NewClient(rpcClient)
				chainID, err := ethClientRPC.ChainID(ctx)
				if err != nil {
					return err
				}
				clientset := metascheduler.NewRPCClientSet(metascheduler.Backend{
					EthereumBackend:      ethClientRPC,
					MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract),
					ChainID:              chainID,
					UserPrivateKey:       pk,
				})
				if err := clientset.JobScheduler(nil).CancelJob(ctx, jobID); err != nil {
					return err
				}
				fmt.Println("Done.")
				return nil
			},
		},
		{
			Name:      "panic",
			Usage:     "Panic a job (need a METASCHEDULER role).",
			Flags:     panicFlags,
			ArgsUsage: "<job ID>",
			Action: func(cCtx *cli.Context) error {
				if cCtx.NArg() < 1 {
					return errors.New("missing arguments")
				}
				pk, err := utils.GetPrivateKey(ethHexPK, ethHexPKPath)
				if err != nil {
					return err
				}
				jobIDBig, ok := new(big.Int).SetString(cCtx.Args().First(), 10)
				if !ok {
					return errors.New("failed to parse job ID")
				}
				var jobID [32]byte
				jobIDBig.FillBytes(jobID[:])
				ctx := cCtx.Context
				rpcClient, err := rpc.DialOptions(
					ctx,
					ethEndpointRPC,
					rpc.WithHTTPClient(http.DefaultClient),
				)
				if err != nil {
					return err
				}
				defer rpcClient.Close()
				ethClientRPC := ethclient.NewClient(rpcClient)
				chainID, err := ethClientRPC.ChainID(ctx)
				if err != nil {
					return err
				}
				clientset := metascheduler.NewRPCClientSet(metascheduler.Backend{
					EthereumBackend:      ethClientRPC,
					MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract),
					ChainID:              chainID,
					UserPrivateKey:       pk,
				})
				if err := clientset.JobScheduler(nil).PanicJob(ctx, jobID, panicReason); err != nil {
					return err
				}
				fmt.Println("Done.")
				return nil
			},
		},
		{
			Name:      "topup",
			Usage:     "Top up a job.",
			Flags:     topupFlags,
			ArgsUsage: "<job ID> <amount (use --time to topup with a duration)>",
			Action: func(cCtx *cli.Context) error {
				if cCtx.NArg() < 2 {
					return errors.New("missing arguments")
				}
				pk, err := utils.GetPrivateKey(ethHexPK, ethHexPKPath)
				if err != nil {
					return err
				}
				jobIDBig, ok := new(big.Int).SetString(cCtx.Args().First(), 10)
				if !ok {
					return fmt.Errorf("couldn't parse job ID: %s", cCtx.Args().First())
				}
				var jobID [32]byte
				jobIDBig.FillBytes(jobID[:])

				ctx := cCtx.Context
				rpcClient, err := rpc.DialOptions(
					ctx,
					ethEndpointRPC,
					rpc.WithHTTPClient(http.DefaultClient),
				)
				if err != nil {
					return err
				}
				defer rpcClient.Close()
				ethClientRPC := ethclient.NewClient(rpcClient)
				chainID, err := ethClientRPC.ChainID(ctx)
				if err != nil {
					return err
				}
				clientset := metascheduler.NewRPCClientSet(metascheduler.Backend{
					EthereumBackend:      ethClientRPC,
					MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract),
					ChainID:              chainID,
					UserPrivateKey:       pk,
				})

				var creditsWei *big.Int
				var credits *big.Float
				if !useTime {
					if wei {
						c, ok := new(big.Int).SetString(cCtx.Args().Get(1), 10)
						if !ok {
							return fmt.Errorf("couldn't parse amount: %s", cCtx.Args().Get(1))
						}
						creditsWei = c
						credits = ether.FromWei(creditsWei)
					} else {
						c, ok := new(big.Float).SetString(cCtx.Args().Get(1))
						if !ok {
							return fmt.Errorf("couldn't parse amount: %s", cCtx.Args().Get(1))
						}
						credits = c
						creditsWei = ether.ToWei(credits)
					}
				} else {
					c, ok := new(big.Int).SetString(cCtx.Args().Get(1), 10)
					if !ok {
						return errors.New("couldn't parse duration")
					}
					job, err := clientset.JobFetcher().GetJob(ctx, jobID)
					if err != nil {
						return err
					}
					p, err := clientset.ProviderManager().GetProvider(ctx, job.ProviderAddr)
					if err != nil {
						return err
					}
					creditsWei = metascheduler.DurationToCredit(p.ProviderPrices, job.Definition, c)
					credits = ether.FromWei(creditsWei)
				}

				if !force {
					jobIDBig := new(big.Int).SetBytes(jobID[:])
					msg := fmt.Sprintf(
						"Confirm topup of %s credits (%s wei) to job %s?",
						credits.String(),
						creditsWei.String(),
						jobIDBig.String(),
					)
					input := confirmation.New(msg, confirmation.No)
					ok, err := input.RunPrompt()
					if err != nil {
						return err
					}
					if !ok {
						fmt.Println("Cancelled.")
						return nil
					}
				}

				curr, err := clientset.AllowanceManager().GetAllowance(ctx)
				if err != nil {
					return err
				}
				if err = clientset.AllowanceManager().SetAllowance(ctx, curr.Add(curr, creditsWei)); err != nil {
					return err
				}

				if err := clientset.JobScheduler(nil).TopUpJob(ctx, jobID, creditsWei); err != nil {
					return err
				}
				fmt.Println("done")
				return nil
			},
		},
	},
}

Command is the job subcommand used to manage jobs.

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL