pachyderm: Index | Files

package ppsutil

import ""

Package ppsutil contains utilities for various PPS-related tasks, which are shared by both the PPS API and the worker binary. These utilities include: - Getting the RC name and querying k8s reguarding pipelines - Reading and writing pipeline resource requests and limits - Reading and writing EtcdPipelineInfos and PipelineInfos[1]

[1] Note that PipelineInfo in particular is complicated because it contains fields that are not always set or are stored in multiple places ('job_state', for example, is not stored in PFS along with the rest of each PipelineInfo, because this field is volatile and we cannot commit to PFS every time it changes. 'job_counts' is the same, and 'reason' is in etcd because it is only updated alongside 'job_state'). As of 12/7/2017, these are the only fields not stored in PFS.


Package Files

decoder.go util.go

func ContainsS3Inputs Uses

func ContainsS3Inputs(in *pps.Input) bool

ContainsS3Inputs returns 'true' if 'in' is or contains any PFS inputs with 'S3' set to true. Any pipelines with s3 inputs lj

func CrashingPipeline Uses

func CrashingPipeline(ctx context.Context, etcdClient *etcd.Client, pipelinesCollection col.Collection, pipelineName string, reason string) error

CrashingPipeline updates the pipeline's state to crashing and sets the reason

func ErrorState Uses

func ErrorState(s pps.PipelineState) bool

ErrorState returns true if s is an error state for a pipeline, that is, a state that users should be aware of and one which will have a "Reason" set for why it's in this state.

func FailPipeline Uses

func FailPipeline(ctx context.Context, etcdClient *etcd.Client, pipelinesCollection col.Collection, pipelineName string, reason string) error

FailPipeline updates the pipeline's state to failed and sets the failure reason

func GetExpectedNumHashtrees Uses

func GetExpectedNumHashtrees(spec *pps.HashtreeSpec) (int64, error)

GetExpectedNumHashtrees computes the expected number of hashtrees that Pachyderm will create given the HashtreeSpec 'spec'.

func GetLimitsResourceList Uses

func GetLimitsResourceList(limits *pps.ResourceSpec) (*v1.ResourceList, error)

GetLimitsResourceList returns a list of resources from a pipeline ResourceSpec that it is maximally limited to.

func GetPipelineInfo Uses

func GetPipelineInfo(pachClient *client.APIClient, name string, ptr *pps.EtcdPipelineInfo) (*pps.PipelineInfo, error)

GetPipelineInfo retrieves and returns a valid PipelineInfo from PFS. It does the PFS read/unmarshalling of bytes as well as filling in missing fields

func GetRequestsResourceListFromPipeline Uses

func GetRequestsResourceListFromPipeline(pipelineInfo *pps.PipelineInfo) (*v1.ResourceList, error)

GetRequestsResourceListFromPipeline returns a list of resources that the pipeline, minimally requires.

func IsTerminal Uses

func IsTerminal(state pps.JobState) bool

IsTerminal returns 'true' if 'state' indicates that the job is done (i.e. the state will not change later: SUCCESS, FAILURE, KILLED) and 'false' otherwise.

func JobInput Uses

func JobInput(pipelineInfo *pps.PipelineInfo, outputCommitInfo *pfs.CommitInfo) *pps.Input

JobInput fills in the commits for a JobInfo

func PipelineRcName Uses

func PipelineRcName(name string, version uint64) string

PipelineRcName generates the name of the k8s replication controller that manages a pipeline's workers

func PipelineRepo Uses

func PipelineRepo(pipeline *pps.Pipeline) *pfs.Repo

PipelineRepo creates a pfs repo for a given pipeline.

func PipelineReqFromInfo Uses

func PipelineReqFromInfo(pipelineInfo *pps.PipelineInfo) *pps.CreatePipelineRequest

PipelineReqFromInfo converts a PipelineInfo into a CreatePipelineRequest.

func SetPipelineState Uses

func SetPipelineState(ctx context.Context, etcdClient *etcd.Client, pipelinesCollection col.Collection, pipeline string, from *pps.PipelineState, to pps.PipelineState, reason string) (retErr error)

SetPipelineState is a helper that moves the state of 'pipeline' from 'from' (if not nil) to 'to'. It will annotate any trace in 'ctx' with information about 'pipeline' that it reads.

func SidecarS3GatewayService Uses

func SidecarS3GatewayService(jobID string) string

SidecarS3GatewayService returns the name of the kubernetes service created for the job 'jobID' to hand sidecar s3 gateway requests. This helper is in ppsutil because both PPS (which creates the service, in the s3 gateway sidecar server) and the worker (which passes the endpoint to the user code) need to know it.

func UpdateJobState Uses

func UpdateJobState(pipelines col.ReadWriteCollection, jobs col.ReadWriteCollection, jobPtr *pps.EtcdJobInfo, state pps.JobState, reason string) error

UpdateJobState performs the operations involved with a job state transition.

type PipelineManifestReader Uses

type PipelineManifestReader struct {
    // contains filtered or unexported fields

PipelineManifestReader helps with unmarshalling pipeline configs from JSON. It's used by 'create pipeline' and 'update pipeline'

func NewPipelineManifestReader Uses

func NewPipelineManifestReader(path string) (result *PipelineManifestReader, retErr error)

NewPipelineManifestReader creates a new manifest reader from a path.

func (*PipelineManifestReader) NextCreatePipelineRequest Uses

func (r *PipelineManifestReader) NextCreatePipelineRequest() (*ppsclient.CreatePipelineRequest, error)

NextCreatePipelineRequest gets the next request from the manifest reader.

type PipelineTransitionError Uses

type PipelineTransitionError struct {
    Pipeline                  string
    Expected, Target, Current pps.PipelineState

PipelineTransitionError represents an error transitioning a pipeline from one state to another.

func (PipelineTransitionError) Error Uses

func (p PipelineTransitionError) Error() string

Package ppsutil imports 26 packages (graph) and is imported by 12 packages. Updated 2020-07-23. Refresh now. Tools for package owners.