job

package
v0.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0, MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var GapFillCmd = &cli.Command{
	Name:  "fill",
	Usage: "fill gaps in the database for a given range and set of tasks.",
	Description: `
The fill job queries the visor_gap_reports table for gaps to fill and indexes the data reported to have gaps.
A gap in the visor_gap_reports table is any row with status 'GAP'.
fill will index gaps based on the list of tasks (--tasks) provided over the specified range (--from --to).
Each epoch and its corresponding list of tasks found in the visor_gap_reports table will be indexed independently.
When the gap is successfully filled its corresponding entry in the visor_gap_reports table will be updated with status 'FILLED'.

As an example, the below command:
  $ lily job run --tasks=block_header,message fill --from=10 --to=20
fills gaps for block_header and messages tasks from epoch 10 to 20 (inclusive)

Constraints:
- the fill job must be executed AFTER a find job. These jobs must NOT be executed simultaneously.
`,
	Flags: []cli.Flag{
		RangeFromFlag,
		RangeToFlag,
	},
	Subcommands: []*cli.Command{
		GapFillNotifyCmd,
	},
	Before: func(_ *cli.Context) error {
		tasks := RunFlags.Tasks.Value()
		for _, taskName := range tasks {
			if _, found := tasktype.TaskLookup[taskName]; found {
				continue
			} else if _, found := tasktype.TableLookup[taskName]; found {
				continue
			} else {
				return fmt.Errorf("unknown task: %s", taskName)
			}
		}
		return rangeFlags.validate()
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		res, err := api.LilyGapFill(ctx, &lily.LilyGapFillConfig{
			JobConfig: RunFlags.ParseJobConfig("fill"),
			To:        rangeFlags.to,
			From:      rangeFlags.from,
		})
		if err != nil {
			return err
		}

		return commands.PrintNewJob(os.Stdout, res)
	},
}
View Source
var GapFillNotifyCmd = &cli.Command{
	Name:  "notify",
	Usage: "notify the provided queueing system of gaps to index allowing tipset-workers to perform the indexing.",
	Description: `
The notify command will insert tasks into the provided queueing system for consumption by tipset-workers.
This command should be used when lily is configured to perform distributed indexing.
`,
	Flags: []cli.Flag{
		NotifyQueueFlag,
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		cfg := &lily.LilyGapFillNotifyConfig{
			GapFillConfig: lily.LilyGapFillConfig{
				JobConfig: RunFlags.ParseJobConfig("fill-notify"),
				From:      rangeFlags.from,
				To:        rangeFlags.to,
			},
			Queue: notifyFlags.queue,
		}

		res, err := api.LilyGapFillNotify(ctx, cfg)
		if err != nil {
			return err
		}

		return commands.PrintNewJob(os.Stdout, res)

	},
}
View Source
var GapFindCmd = &cli.Command{
	Name:  "find",
	Usage: "find gaps in the database for a given range and a set of tasks.",
	Description: `
The find job searches for gaps in a database storage system by executing the SQL gap_find() function over the visor_processing_reports table.
find will query the database for gaps based on the list of tasks (--tasks) provided over the specified range (--from --to).
An epoch is considered to have gaps iff:
- a task specified by the --task flag is not present at each epoch within the specified range.
- a task specified by the --task flag does not have status 'OK' at each epoch within the specified range.
The results of the find job are written to the visor_gap_reports table with status 'GAP'.

As an example, the below command:
 $ lily job run --tasks=block_header,messages find --from=10 --to=20
searches for gaps in block_header and messages tasks from epoch 10 to 20 (inclusive). 

Constraints:
- the find job must NOT be executed against heights that were imported from historical data dumps: https://lilium.sh/data/dumps/ 
since visor_processing_report entries will not be present for imported data (meaning the entire range will be considered to have gaps).
- the find job must be executed BEFORE a fill job. These jobs must NOT be executed simultaneously.
`,
	Flags: []cli.Flag{
		RangeFromFlag,
		RangeToFlag,
	},
	Before: func(_ *cli.Context) error {
		tasks := RunFlags.Tasks.Value()
		for _, taskName := range tasks {
			if _, found := tasktype.TaskLookup[taskName]; found {
				continue
			} else if _, found := tasktype.TableLookup[taskName]; found {
				continue
			} else {
				return fmt.Errorf("unknown task: %s", taskName)
			}
		}
		return rangeFlags.validate()
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		res, err := api.LilyGapFind(ctx, &lily.LilyGapFindConfig{
			JobConfig: RunFlags.ParseJobConfig("find"),
			To:        rangeFlags.to,
			From:      rangeFlags.from,
		})
		if err != nil {
			return err
		}
		return commands.PrintNewJob(os.Stdout, res)
	},
}
View Source
var IndexCmd = &cli.Command{
	Name:  "index",
	Usage: "Index the state of a tipset from the filecoin blockchain.",
	Description: `
The index command may be used to index a single tipset from the filecoin blockchain specified either by height or by tipset key.
`,
	Subcommands: []*cli.Command{
		IndexTipSetCmd,
		IndexHeightCmd,
	},
	Before: func(_ *cli.Context) error {
		tasks := RunFlags.Tasks.Value()
		for _, taskName := range tasks {
			if _, found := tasktype.TaskLookup[taskName]; found {
				continue
			} else if _, found := tasktype.TableLookup[taskName]; found {
				continue
			} else {
				return fmt.Errorf("unknown task: %s", taskName)
			}
		}
		return nil
	},
}
View Source
var IndexHeightCmd = &cli.Command{
	Name:  "height",
	Usage: "Index the state of a tipset from the filecoin blockchain by height.",
	Description: `
	Index the state of a tipset from the filecoin blockchain by height. If the provided height is a null-round an error will be returned.
`,
	Flags: []cli.Flag{
		&cli.Int64Flag{
			Name:        "height",
			Usage:       "Height to index",
			Destination: &indexFlags.height,
			Required:    true,
		},
	},
	Subcommands: []*cli.Command{
		IndexNotifyCmd,
	},
	Before: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		ts, err := api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(indexFlags.height), types.EmptyTSK)
		if err != nil {
			return err
		}

		if indexFlags.height != int64(ts.Height()) {
			return fmt.Errorf("height (%d) is null round, next non-null round height: %d", indexFlags.height, ts.Height())
		}
		indexFlags.tipsetKey = ts.Key()

		return nil
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		_, err = api.LilyIndex(ctx, &lily.LilyIndexConfig{
			JobConfig: RunFlags.ParseJobConfig("index-height"),
			TipSet:    indexFlags.tipsetKey,
		})
		if err != nil {
			return err
		}

		return nil
	},
}
View Source
var IndexNotifyCmd = &cli.Command{
	Name:  "notify",
	Usage: "notify the provided queueing system of the tipset to index allowing tipset-workers to perform the indexing.",
	Description: `
The notify command will insert tasks into the provided queueing system for consumption by tipset-workers.
This command should be used when lily is configured to perform distributed indexing.
`,
	Flags: []cli.Flag{
		NotifyQueueFlag,
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		cfg := &lily.LilyIndexNotifyConfig{
			IndexConfig: lily.LilyIndexConfig{
				JobConfig: RunFlags.ParseJobConfig("index-notify"),
				TipSet:    indexFlags.tipsetKey,
			},
			Queue: notifyFlags.queue,
		}

		_, err = api.LilyIndexNotify(ctx, cfg)
		if err != nil {
			return err
		}

		return nil
	},
}
View Source
var IndexTipSetCmd = &cli.Command{
	Name:  "tipset",
	Usage: "Index the state of a tipset from the filecoin blockchain by tipset key.",
	Subcommands: []*cli.Command{
		IndexNotifyCmd,
	},
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:        "tipset",
			Usage:       "TipSetKey to index",
			Destination: &indexFlags.tsKey,
			Required:    true,
		},
	},
	Before: func(_ *cli.Context) error {
		tsk, err := parseTipSetKey(indexFlags.tsKey)
		if err != nil {
			return fmt.Errorf("failed to parse tipset key: %w", err)
		}
		indexFlags.tipsetKey = tsk

		return nil
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		_, err = api.LilyIndex(ctx, &lily.LilyIndexConfig{
			JobConfig: RunFlags.ParseJobConfig("index-tipset"),
			TipSet:    indexFlags.tipsetKey,
		})
		if err != nil {
			return err
		}

		return nil
	},
}
View Source
var IntervalFlag = &cli.IntFlag{
	Name:        "interval",
	Usage:       "The interval for specific task",
	Value:       120,
	Destination: &watchFlags.interval,
}
View Source
var JobCmd = &cli.Command{
	Name:  "job",
	Usage: "Manage jobs being run by the daemon.",
	Subcommands: []*cli.Command{
		JobRunCmd,
		JobStartCmd,
		JobStopCmd,
		JobWaitCmd,
		JobListCmd,
	},
}
View Source
var JobListCmd = &cli.Command{
	Name:  "list",
	Usage: "list all jobs and their status",
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)
		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		jobs, err := api.LilyJobList(ctx)
		if err != nil {
			return err
		}
		prettyJobs, err := json.MarshalIndent(jobs, "", "\t")
		if err != nil {
			return err
		}
		if _, err := fmt.Fprintf(os.Stdout, "%s\n", prettyJobs); err != nil {
			return err
		}
		return nil
	},
}
View Source
var JobRunCmd = &cli.Command{
	Name:  "run",
	Usage: "run a job",
	Flags: []cli.Flag{
		RunWindowFlag,
		RunTaskFlag,
		RunStorageFlag,
		RunNameFlag,
		RunRestartDelayFlag,
		RunRestartFailure,
		RunRestartCompletion,
		StopOnError,
	},
	Subcommands: []*cli.Command{
		WalkCmd,
		WatchCmd,
		IndexCmd,
		SurveyCmd,
		GapFillCmd,
		GapFindCmd,
		TipSetWorkerCmd,
	},
}
View Source
var JobStartCmd = &cli.Command{
	Name:  "start",
	Usage: "start a job.",
	Flags: []cli.Flag{
		&cli.IntFlag{
			Name:        "id",
			Usage:       "Identifier of job to start",
			Required:    true,
			Destination: &jobControlFlags.ID,
		},
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)
		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		return api.LilyJobStart(ctx, schedule.JobID(jobControlFlags.ID))
	},
}
View Source
var JobStopCmd = &cli.Command{
	Name:  "stop",
	Usage: "stop a job.",
	Flags: []cli.Flag{
		&cli.IntFlag{
			Name:        "id",
			Usage:       "Identifier of job to stop",
			Required:    true,
			Destination: &jobControlFlags.ID,
		},
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)
		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		return api.LilyJobStop(ctx, schedule.JobID(jobControlFlags.ID))
	},
}
View Source
var JobWaitCmd = &cli.Command{
	Name:  "wait",
	Usage: "wait on a job to complete.",
	Flags: []cli.Flag{
		&cli.IntFlag{
			Name:        "id",
			Usage:       "Identifier of job to wait on",
			Required:    true,
			Destination: &jobControlFlags.ID,
		},
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)
		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		res, err := api.LilyJobWait(ctx, schedule.JobID(jobControlFlags.ID))
		if err != nil {
			return err
		}
		prettyJob, err := json.MarshalIndent(res, "", "\t")
		if err != nil {
			return err
		}
		if _, err := fmt.Fprintf(os.Stdout, "%s\n", prettyJob); err != nil {
			return err
		}
		return nil
	},
}
View Source
var NotifyQueueFlag = &cli.StringFlag{
	Name:        "queue",
	Usage:       "Name of queue system that job will notify.",
	EnvVars:     []string{"LILY_JOB_QUEUE"},
	Value:       "",
	Destination: &notifyFlags.queue,
}
View Source
var RangeFromFlag = &cli.Int64Flag{
	Name:        "from",
	Usage:       "Limit actor and message processing to tipsets at or above `HEIGHT`",
	EnvVars:     []string{"LILY_FROM"},
	Destination: &rangeFlags.from,
	Required:    true,
}
View Source
var RangeToFlag = &cli.Int64Flag{
	Name:        "to",
	Usage:       "Limit actor and message processing to tipsets at or below `HEIGHT`",
	EnvVars:     []string{"LILY_TO"},
	Destination: &rangeFlags.to,
	Required:    true,
}
View Source
var RunFlags runOpts
View Source
var RunNameFlag = &cli.StringFlag{
	Name:        "name",
	Usage:       "Name of job for easy identification later. Will appear as 'reporter' in the visor_processing_reports table.",
	EnvVars:     []string{"LILY_JOB_NAME"},
	Value:       "",
	Destination: &RunFlags.Name,
}
View Source
var RunRestartCompletion = &cli.BoolFlag{
	Name:        "restart-on-completion",
	Usage:       "Restart the job after it completes.",
	EnvVars:     []string{"LILY_JOB_RESTART_COMPLETION"},
	Value:       false,
	Destination: &RunFlags.RestartCompletion,
}
View Source
var RunRestartDelayFlag = &cli.DurationFlag{
	Name:        "restart-delay",
	Usage:       "Duration to wait before restarting job after it ends execution",
	EnvVars:     []string{"LILY_JOB_RESTART_DELAY"},
	Value:       0,
	Destination: &RunFlags.RestartDelay,
}
View Source
var RunRestartFailure = &cli.BoolFlag{
	Name:        "restart-on-failure",
	Usage:       "Restart the job if it fails.",
	EnvVars:     []string{"LILY_JOB_RESTART_FAILURE"},
	Value:       false,
	Destination: &RunFlags.RestartFailure,
}
View Source
var RunStorageFlag = &cli.StringFlag{
	Name:        "storage",
	Usage:       "Name of storage backend the job will write result to.",
	EnvVars:     []string{"LILY_JOB_STORAGE"},
	Value:       "",
	Destination: &RunFlags.Storage,
}
View Source
var RunTaskFlag = &cli.StringSliceFlag{
	Name:        "tasks",
	Usage:       "Comma separated list of tasks to run in job. Each task is reported separately in the storage backend.",
	EnvVars:     []string{"LILY_JOB_TASKS"},
	Value:       cli.NewStringSlice(tasktype.AllTableTasks...),
	Destination: &RunFlags.Tasks,
}
View Source
var RunWindowFlag = &cli.DurationFlag{
	Name:        "window",
	Usage:       "Duaration after which job execution will be canceled",
	EnvVars:     []string{"LILY_JOB_WINDOW"},
	Value:       0,
	Destination: &RunFlags.Window,
}
View Source
var StopOnError = &cli.BoolFlag{
	Name:        "stop-on-error",
	Usage:       "Stop the job if it encounters an error.",
	EnvVars:     []string{"LILY_JOB_STOP_ON_ERROR"},
	Value:       false,
	Destination: &RunFlags.StopOnError,
}
View Source
var SurveyCmd = &cli.Command{
	Name:  "survey",
	Usage: "Start a daemon job to survey the node and its environment.",
	Flags: []cli.Flag{
		&cli.DurationFlag{
			Name:        "interval",
			Usage:       "Interval to wait between each survey",
			Value:       10 * time.Minute,
			Destination: &surveyFlags.interval,
		},
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		res, err := api.LilySurvey(ctx, &lily.LilySurveyConfig{
			JobConfig: RunFlags.ParseJobConfig("survey"),
			Interval:  surveyFlags.interval,
		})
		if err != nil {
			return err
		}

		return commands.PrintNewJob(os.Stdout, res)
	},
}
View Source
var TipSetWorkerCmd = &cli.Command{
	Name:  "tipset-worker",
	Usage: "start a tipset-worker that consumes tasks from the provided queuing system and performs indexing",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:        "queue",
			Usage:       "Name of queue system worker will consume work from.",
			EnvVars:     []string{"LILY_TSWORKER_QUEUE"},
			Value:       "",
			Destination: &tipsetWorkerFlags.queue,
		},
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		res, err := api.StartTipSetWorker(ctx, &lily.LilyTipSetWorkerConfig{
			JobConfig: RunFlags.ParseJobConfig("tipset-worker"),
			Queue:     tipsetWorkerFlags.queue,
		})
		if err != nil {
			return err
		}

		return commands.PrintNewJob(os.Stdout, res)
	},
}
View Source
var WalkCmd = &cli.Command{
	Name:  "walk",
	Usage: "walk and index a range of the filecoin blockchain.",
	Description: `
The walk command will index state based on the list of tasks (--tasks) provided over the specified range (--from --to).
Each epoch will be indexed serially, starting from the heaviest tipset at the upper height (--to) to the lower height (--from).

As and example, the below command:
  $ lily job run --tasks=block_header,messages walk --from=10 --to=20
walks epochs 20 through 10 (inclusive) executing the block_header and messages task for each epoch.
The status of each epoch and its set of tasks can be observed in the visor_processing_reports table.
`,
	Flags: []cli.Flag{
		RangeFromFlag,
		RangeToFlag,
		WalkIntervalFlag,
	},
	Subcommands: []*cli.Command{
		WalkNotifyCmd,
	},
	Before: func(_ *cli.Context) error {
		tasks := RunFlags.Tasks.Value()
		for _, taskName := range tasks {
			if _, found := tasktype.TaskLookup[taskName]; found {
				continue
			} else if _, found := tasktype.TableLookup[taskName]; found {
				continue
			} else {
				return fmt.Errorf("unknown task: %s", taskName)
			}
		}
		return rangeFlags.validate()
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		cfg := &lily.LilyWalkConfig{
			JobConfig: RunFlags.ParseJobConfig("walk"),
			From:      rangeFlags.from,
			To:        rangeFlags.to,
			Interval:  walkFlags.interval,
		}

		res, err := api.LilyWalk(ctx, cfg)
		if err != nil {
			return err
		}

		err = commands.PrintNewJob(os.Stdout, res)

		return err
	},
}
View Source
var WalkIntervalFlag = &cli.IntFlag{
	Name:        "interval",
	Usage:       "The interval for specific task",
	Value:       120,
	Destination: &walkFlags.interval,
}
View Source
var WalkNotifyCmd = &cli.Command{
	Name:  "notify",
	Usage: "notify the provided queueing system of epochs to index allowing tipset-workers to perform the indexing.",
	Description: `
The notify command will insert tasks into the provided queueing system for consumption by tipset-workers.
This command should be used when lily is configured to perform distributed indexing.
`,
	Flags: []cli.Flag{
		NotifyQueueFlag,
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		cfg := &lily.LilyWalkNotifyConfig{
			WalkConfig: lily.LilyWalkConfig{
				JobConfig: RunFlags.ParseJobConfig("walk-notify"),
				From:      rangeFlags.from,
				To:        rangeFlags.to,
			},
			Queue: notifyFlags.queue,
		}

		res, err := api.LilyWalkNotify(ctx, cfg)
		if err != nil {
			return err
		}

		return commands.PrintNewJob(os.Stdout, res)

	},
}
View Source
var WatchBufferSizeFlag = &cli.IntFlag{
	Name:        "buffer-size",
	Usage:       "Set the number of tipsets the watcher will buffer while waiting for a worker to accept the work.",
	EnvVars:     []string{"LILY_WATCH_BUFFER"},
	Value:       5,
	Destination: &watchFlags.bufferSize,
}
View Source
var WatchCmd = &cli.Command{
	Name:  "watch",
	Usage: "watch the head of the filecoin blockchain and index each new head as it becomes available",
	Description: `
The watch command subscribes to incoming tipsets from the filecoin blockchain and indexes them as the arrive.

Since it may be the case that tipsets arrive at a rate greater than their rate of indexing the watch job maintains a
queue of tipsets to index. Consumption of this queue can be configured via the --workers flag. Increasing the value provided
to the --workers flag will allow the watch job to index tipsets simultaneously (Note: this will use a significant amount of system resources).

Since it may be the case that lily experiences a reorg while the watch job is observing the head of the chain
the --confidence flag may be used to buffer the amount of tipsets observed before it begins indexing - illustrated by the below diagram:

             *unshift*        *unshift*      *unshift*       *unshift*
                │  │            │  │            │  │            │  │
             ┌──▼──▼──┐      ┌──▼──▼──┐      ┌──▼──▼──┐      ┌──▼──▼──┐
             │        │      │  ts10  │      │  ts11  │      │  ts12  │
   ...  ---> ├────────┤ ---> ├────────┤ ---> ├────────┤ ---> ├────────┤ --->  ...
             │  ts09  │      │  ts09  │      │  ts10  │      │  ts11  │
             ├────────┤      ├────────┤      ├────────┤      ├────────┤
             │  ts08  │      │  ts08  │      │  ts09  │      │  ts10  │
             ├────────┤      ├────────┤      ├────────┤      ├────────┤
             │  ...   │      │  ...   │      │  ...   │      │  ...   │
             ├────────┤      ├────────┤      ├────────┤      ├────────┤
             │  ts02  │      │  ts02  │      │  ts03  │      │  ts04  │
             ├────────┤      ├────────┤      ├────────┤      ├────────┤
             │  ts01  │      │  ts01  │      │  ts02  │      │  ts03  │
             ├────────┤      ├────────┤      ├────────┤      ├────────┤
             │  ts00  │      │  ts00  │      │  ts01  │      │  ts02  │
             └────────┘      └────────┘      └──│──│──┘      └──│──│──┘
                                                ▼  ▼  *pop*     ▼  ▼  *pop*
                                             ┌────────┐      ┌────────┐
              (confidence=10 :: length=10)   │  ts00  │      │  ts01  │
                                             └────────┘      └────────┘
                                              (process)       (process)

As and example, the below command:
  $ lily job run --tasks-block_header,messages watch --confidence=10 --workers=2
watches the chain head and only indexes a tipset after observing 10 subsequent tipsets indexing at most two tipset simultaneously.
`,
	Flags: []cli.Flag{
		WatchConfidenceFlag,
		WatchWorkersFlag,
		WatchBufferSizeFlag,
		WatchIntervalFlag,
	},
	Before: func(cctx *cli.Context) error {
		tasks := RunFlags.Tasks.Value()
		for _, taskName := range tasks {
			if _, found := tasktype.TaskLookup[taskName]; found {
				continue
			} else if _, found := tasktype.TableLookup[taskName]; found {
				continue
			} else {
				return fmt.Errorf("unknown task: %s", taskName)
			}
		}
		return nil
	},
	Subcommands: []*cli.Command{
		WatchNotifyCmd,
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		var res *schedule.JobSubmitResult
		cfg := &lily.LilyWatchConfig{
			JobConfig:  RunFlags.ParseJobConfig("watch"),
			BufferSize: watchFlags.bufferSize,
			Confidence: watchFlags.confidence,
			Workers:    watchFlags.workers,
			Interval:   watchFlags.interval,
		}

		res, err = api.LilyWatch(ctx, cfg)
		if err != nil {
			return err
		}

		if err := commands.PrintNewJob(os.Stdout, res); err != nil {
			return err
		}

		return nil
	},
}
View Source
var WatchConfidenceFlag = &cli.IntFlag{
	Name:        "confidence",
	Usage:       "Sets the size of the cache used to hold tipsets for possible reversion before being committed to the database.",
	EnvVars:     []string{"LILY_CONFIDENCE"},
	Value:       2,
	Destination: &watchFlags.confidence,
}
View Source
var WatchIntervalFlag = &cli.IntFlag{
	Name:        "interval",
	Usage:       "The interval for specific task",
	Value:       120,
	Destination: &watchFlags.interval,
}
View Source
var WatchNotifyCmd = &cli.Command{
	Name:  "notify",
	Usage: "notify the provided queueing system of epochs to index allowing tipset-workers to perform the indexing.",
	Description: `
The notify command will insert tasks into the provided queueing system for consumption by tipset-workers.
This command should be used when lily is configured to perform distributed indexing.
`,
	Flags: []cli.Flag{
		NotifyQueueFlag,
	},
	Action: func(cctx *cli.Context) error {
		ctx := lotuscli.ReqContext(cctx)

		api, closer, err := commands.GetAPI(ctx)
		if err != nil {
			return err
		}
		defer closer()

		cfg := &lily.LilyWatchNotifyConfig{
			JobConfig: RunFlags.ParseJobConfig("watch-notify"),

			Confidence: watchFlags.confidence,
			BufferSize: watchFlags.bufferSize,

			Queue: notifyFlags.queue,
		}

		res, err := api.LilyWatchNotify(ctx, cfg)
		if err != nil {
			return err
		}

		return commands.PrintNewJob(os.Stdout, res)

	},
}
View Source
var WatchWorkersFlag = &cli.IntFlag{
	Name:        "workers",
	Usage:       "Sets the number of tipsets that may be simultaneous indexed while watching.",
	EnvVars:     []string{"LILY_WATCH_WORKERS"},
	Value:       2,
	Destination: &watchFlags.workers,
}

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL