actions

package
v1.21.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2024 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LogOutputExtension = ".log"
	LogOutputLocation  = "_lakefs/actions/log"
)
View Source
const (
	HeadersPropertyKey = "headers"
)
View Source
const (
	PartitionKey = "actions"
)

Variables

View Source
var (
	ErrInvalidAction         = errors.New("invalid action")
	ErrInvalidEventParameter = errors.New("invalid event parameter")
)
View Source
var (
	ErrNotFound      = errors.New("not found")
	ErrNilValue      = errors.New("nil value")
	ErrIfExprNotBool = errors.New("hook 'if' expression should evaluate to a boolean")
)
View Source
var ErrParamConflict = errors.New("parameters conflict")
View Source
var ErrUnknownHookType = errors.New("unknown hook type")
View Source
var File_actions_actions_proto protoreflect.FileDescriptor

Functions

func DescendArgs added in v0.87.0

func DescendArgs(args interface{}, getter EnvGetter) (interface{}, error)

func FormatHookOutputPath

func FormatHookOutputPath(runID, hookRunID string) string

func FormatRunManifestOutputPath

func FormatRunManifestOutputPath(runID string) string

func LuaRun added in v0.87.0

func LuaRun(l *lua.State, code, name string) error

func NewHookRunID

func NewHookRunID(actionIdx, hookIdx int) string

func RunByBranchPath added in v0.67.0

func RunByBranchPath(repoID, branchID, runID string) []byte

func RunByCommitPath added in v0.67.0

func RunByCommitPath(repoID, commitID, runID string) []byte

func RunPath added in v0.67.0

func RunPath(repoID, runID string) []byte

func TasksPath added in v0.67.0

func TasksPath(repoID, runID string) string

Types

type Action

type Action struct {
	Name        string                           `yaml:"name"`
	Description string                           `yaml:"description"`
	On          map[graveler.EventType]*ActionOn `yaml:"on"`
	Hooks       []ActionHook                     `yaml:"hooks"`
}

func LoadActions

func LoadActions(ctx context.Context, source Source, record graveler.HookRecord) ([]*Action, error)

func MatchedActions

func MatchedActions(actions []*Action, spec MatchSpec) ([]*Action, error)

func ParseAction

func ParseAction(data []byte) (*Action, error)

ParseAction helper function to read, parse and validate Action from a reader

func (*Action) Match

func (a *Action) Match(spec MatchSpec) (bool, error)

func (*Action) Validate

func (a *Action) Validate() error

type ActionHook

type ActionHook struct {
	ID          string     `yaml:"id"`
	Type        HookType   `yaml:"type"`
	Description string     `yaml:"description"`
	If          string     `yaml:"if"`
	Properties  Properties `yaml:"properties"`
}

type ActionOn

type ActionOn struct {
	Branches []string `yaml:"branches"`
}

type Airflow added in v0.47.0

type Airflow struct {
	HookBase
	URL        string
	DagID      string
	Username   string
	Password   SecureString
	DAGConf    map[string]interface{}
	Timeout    time.Duration
	WaitForDAG bool
}

func (*Airflow) Run added in v0.47.0

func (a *Airflow) Run(ctx context.Context, record graveler.HookRecord, buf *bytes.Buffer) error

type Config added in v0.99.0

type Config struct {
	Enabled bool
	Lua     struct {
		NetHTTPEnabled bool
	}
	Env struct {
		Enabled bool
		Prefix  string
	}
}

type DagRunReq added in v0.47.0

type DagRunReq struct {
	// DagRunID Run ID. This together with DAG_ID are a unique key.
	DagRunID string `json:"dag_run_id,omitempty"`
	// Conf JSON object describing additional configuration parameters.
	Conf map[string]interface{} `json:"conf,omitempty"`
}

type DecreasingIDGenerator added in v0.67.0

type DecreasingIDGenerator struct{}

DecreasingIDGenerator creates IDs that are decreasing with time

func (*DecreasingIDGenerator) NewRunID added in v0.67.0

func (gen *DecreasingIDGenerator) NewRunID() string

type EnvGetter added in v1.3.1

type EnvGetter interface {
	Lookup(name string) (string, bool)
}

type EnvironmentVariableGetter added in v1.3.1

type EnvironmentVariableGetter struct {
	Enabled bool
	Prefix  string
}

func NewEnvironmentVariableGetter added in v1.3.1

func NewEnvironmentVariableGetter(envEnabled bool, prefix string) *EnvironmentVariableGetter

NewEnvironmentVariableGetter creates a new EnvironmentVariableGetter. If envEnabled is false, the value we evaluate is an empty string. If filterPrefix is not empty, we only evaluate environment variables that start with this prefix.

func (*EnvironmentVariableGetter) Lookup added in v1.3.1

func (o *EnvironmentVariableGetter) Lookup(name string) (string, bool)

Lookup retrieves the value of the environment variable named by the key. If the variable is present in the environment, the value (which may be empty) is returned and the boolean is true. This function doesn't provide a way to extract variables that can be used by viper.

type EventInfo added in v0.47.0

type EventInfo struct {
	EventType      string            `json:"event_type"`
	EventTime      string            `json:"event_time"`
	ActionName     string            `json:"action_name"`
	HookID         string            `json:"hook_id"`
	RepositoryID   string            `json:"repository_id"`
	BranchID       string            `json:"branch_id,omitempty"`
	SourceRef      string            `json:"source_ref,omitempty"`
	TagID          string            `json:"tag_id,omitempty"`
	CommitID       string            `json:"commit_id,omitempty"`
	CommitMessage  string            `json:"commit_message,omitempty"`
	Committer      string            `json:"committer,omitempty"`
	CommitMetadata map[string]string `json:"commit_metadata,omitempty"`
}

type Hook

type Hook interface {
	Run(ctx context.Context, record graveler.HookRecord, buf *bytes.Buffer) error
}

Hook is the abstraction of the basic user-configured runnable building-stone

func NewAirflowHook added in v0.47.0

func NewAirflowHook(h ActionHook, action *Action, cfg Config, endpoint *http.Server, _ string, _ stats.Collector) (Hook, error)

func NewHook

func NewHook(hook ActionHook, action *Action, cfg Config, server *http.Server, serverAddress string, collector stats.Collector) (Hook, error)

func NewLuaHook added in v0.87.0

func NewLuaHook(h ActionHook, action *Action, cfg Config, e *http.Server, serverAddress string, collector stats.Collector) (Hook, error)

func NewWebhook

func NewWebhook(h ActionHook, action *Action, cfg Config, e *http.Server, _ string, _ stats.Collector) (Hook, error)

type HookBase added in v0.47.0

type HookBase struct {
	ID         string
	ActionName string
	Config     Config
	Endpoint   *http.Server
}

type HookOutputWriter

type HookOutputWriter struct {
	StorageNamespace string
	RunID            string
	HookRunID        string
	ActionName       string
	HookID           string
	Writer           OutputWriter
}

func (*HookOutputWriter) OutputWrite

func (h *HookOutputWriter) OutputWrite(ctx context.Context, reader io.Reader, size int64) error

type HookType

type HookType string
const (
	HookTypeWebhook HookType = "webhook"
	HookTypeAirflow HookType = "airflow"
	HookTypeLua     HookType = "lua"
)

type IDGenerator added in v0.67.0

type IDGenerator interface {
	// NewRunID creates IDs for Runs.
	NewRunID() string
}

type KVRunResultIterator added in v0.67.0

type KVRunResultIterator struct {
	// contains filtered or unexported fields
}

func NewKVRunResultIterator added in v0.67.0

func NewKVRunResultIterator(ctx context.Context, store kv.Store, repositoryID, branchID, commitID, after string) (*KVRunResultIterator, error)

NewKVRunResultIterator returns a new iterator over actions run results 'after' determines the runID which we should start the scan from, used for pagination

func (*KVRunResultIterator) Close added in v0.67.0

func (i *KVRunResultIterator) Close()

func (*KVRunResultIterator) Err added in v0.67.0

func (i *KVRunResultIterator) Err() error

func (*KVRunResultIterator) Next added in v0.67.0

func (i *KVRunResultIterator) Next() bool

func (*KVRunResultIterator) Value added in v0.67.0

func (i *KVRunResultIterator) Value() *RunResult

type KVTaskResultIterator added in v0.67.0

type KVTaskResultIterator struct {
	// contains filtered or unexported fields
}

func NewKVTaskResultIterator added in v0.67.0

func NewKVTaskResultIterator(ctx context.Context, store kv.Store, repositoryID, runID, after string) (*KVTaskResultIterator, error)

NewKVTaskResultIterator returns a new iterator over actions task results of a specific run 'after' determines the hook run ID which we should start the scan from, used for pagination

func (*KVTaskResultIterator) Close added in v0.67.0

func (i *KVTaskResultIterator) Close()

func (*KVTaskResultIterator) Err added in v0.67.0

func (i *KVTaskResultIterator) Err() error

func (*KVTaskResultIterator) Next added in v0.67.0

func (i *KVTaskResultIterator) Next() bool

func (*KVTaskResultIterator) Value added in v0.67.0

func (i *KVTaskResultIterator) Value() *TaskResult

type LuaHook added in v0.87.0

type LuaHook struct {
	HookBase
	Script     string
	ScriptPath string
	Args       map[string]interface{}
	// contains filtered or unexported fields
}

func (*LuaHook) Run added in v0.87.0

func (h *LuaHook) Run(ctx context.Context, record graveler.HookRecord, buf *bytes.Buffer) error

type MatchSpec

type MatchSpec struct {
	EventType graveler.EventType
	BranchID  graveler.BranchID
}

type NewHookFunc

type NewHookFunc func(ActionHook, *Action, Config, *http.Server, string, stats.Collector) (Hook, error)

type OutputWriter

type OutputWriter interface {
	OutputWrite(ctx context.Context, storageNamespace, name string, reader io.Reader, size int64) error
}

type Properties added in v0.47.0

type Properties map[string]interface{}

type RunManifest

type RunManifest struct {
	Run      RunResult    `json:"run"`
	HooksRun []TaskResult `json:"hooks,omitempty"`
}

type RunResult

type RunResult struct {
	RunID     string    `db:"run_id" json:"run_id"`
	BranchID  string    `db:"branch_id" json:"branch_id"`
	SourceRef string    `db:"source_ref" json:"source_ref"`
	EventType string    `db:"event_type" json:"event_type"`
	CommitID  string    `db:"commit_id" json:"commit_id,omitempty"`
	StartTime time.Time `db:"start_time" json:"start_time"`
	EndTime   time.Time `db:"end_time" json:"end_time"`
	Passed    bool      `db:"passed" json:"passed"`
}

func RunResultFromProto added in v0.67.0

func RunResultFromProto(pb *RunResultData) *RunResult

type RunResultData added in v0.67.0

type RunResultData struct {
	RunId     string                 `protobuf:"bytes,1,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"`
	BranchId  string                 `protobuf:"bytes,2,opt,name=branch_id,json=branchId,proto3" json:"branch_id,omitempty"`
	CommitId  string                 `protobuf:"bytes,3,opt,name=commit_id,json=commitId,proto3" json:"commit_id,omitempty"`
	SourceRef string                 `protobuf:"bytes,4,opt,name=source_ref,json=sourceRef,proto3" json:"source_ref,omitempty"`
	EventType string                 `protobuf:"bytes,5,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
	StartTime *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
	EndTime   *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=end_time,json=endTime,proto3" json:"end_time,omitempty"`
	Passed    bool                   `protobuf:"varint,8,opt,name=passed,proto3" json:"passed,omitempty"`
	// contains filtered or unexported fields
}

message data model for RunResult struct

func (*RunResultData) Descriptor deprecated added in v0.67.0

func (*RunResultData) Descriptor() ([]byte, []int)

Deprecated: Use RunResultData.ProtoReflect.Descriptor instead.

func (*RunResultData) GetBranchId added in v0.67.0

func (x *RunResultData) GetBranchId() string

func (*RunResultData) GetCommitId added in v0.67.0

func (x *RunResultData) GetCommitId() string

func (*RunResultData) GetEndTime added in v0.67.0

func (x *RunResultData) GetEndTime() *timestamppb.Timestamp

func (*RunResultData) GetEventType added in v0.67.0

func (x *RunResultData) GetEventType() string

func (*RunResultData) GetPassed added in v0.67.0

func (x *RunResultData) GetPassed() bool

func (*RunResultData) GetRunId added in v0.67.0

func (x *RunResultData) GetRunId() string

func (*RunResultData) GetSourceRef added in v0.67.0

func (x *RunResultData) GetSourceRef() string

func (*RunResultData) GetStartTime added in v0.67.0

func (x *RunResultData) GetStartTime() *timestamppb.Timestamp

func (*RunResultData) ProtoMessage added in v0.67.0

func (*RunResultData) ProtoMessage()

func (*RunResultData) ProtoReflect added in v0.67.0

func (x *RunResultData) ProtoReflect() protoreflect.Message

func (*RunResultData) Reset added in v0.67.0

func (x *RunResultData) Reset()

func (*RunResultData) String added in v0.67.0

func (x *RunResultData) String() string

type RunResultIterator

type RunResultIterator interface {
	Next() bool
	Value() *RunResult
	Err() error
	Close()
}

type SecureString added in v0.48.0

type SecureString struct {
	// contains filtered or unexported fields
}

SecureString is a string that may be populated from an environment variable. If constructed with a string of the form {{ ENV.EXAMPLE_VARIABLE }}, the value is populated from EXAMPLE_VARIABLE and is considered a secret. Otherwise the value is taken from the string as-is, and is not considered a secret.

func NewSecureString added in v0.48.0

func NewSecureString(s string, envGetter EnvGetter) (SecureString, error)

NewSecureString creates a new SecureString, reading env var if needed. If the string is not of the form {{ ENV.EXAMPLE_VARIABLE }}, the value is not considered a secret. If the string is of the form {{ ENV.EXAMPLE_VARIABLE }}, the value is populated from EXAMPLE_VARIABLE and is considered a secret. If the environment variable is not found, an error is returned. IF envEnabled is false, the value we evaluate is an empty string.

func (SecureString) String added in v0.48.0

func (s SecureString) String() string

Returns the string for non-secrets, or asterisks otherwise.

type Service

type Service interface {
	Stop()
	Run(ctx context.Context, record graveler.HookRecord) error
	UpdateCommitID(ctx context.Context, repositoryID string, storageNamespace string, runID string, commitID string) error
	GetRunResult(ctx context.Context, repositoryID string, runID string) (*RunResult, error)
	GetTaskResult(ctx context.Context, repositoryID string, runID string, hookRunID string) (*TaskResult, error)
	ListRunResults(ctx context.Context, repositoryID string, branchID, commitID string, after string) (RunResultIterator, error)
	ListRunTaskResults(ctx context.Context, repositoryID string, runID string, after string) (TaskResultIterator, error)
	graveler.HooksHandler
}

type Source

type Source interface {
	List(ctx context.Context, record graveler.HookRecord) ([]string, error)
	Load(ctx context.Context, record graveler.HookRecord, name string) ([]byte, error)
}

type Store added in v0.67.0

type Store interface {
	// UpdateCommitID will update an already stored run with the commit results
	UpdateCommitID(ctx context.Context, repositoryID string, runID string, commitID string) (*RunManifest, error)

	GetRunResult(ctx context.Context, repositoryID string, runID string) (*RunResult, error)
	GetTaskResult(ctx context.Context, repositoryID string, runID string, hookRunID string) (*TaskResult, error)
	ListRunResults(ctx context.Context, repositoryID string, branchID, commitID string, after string) (RunResultIterator, error)
	ListRunTaskResults(ctx context.Context, repositoryID string, runID string, after string) (TaskResultIterator, error)
	// contains filtered or unexported methods
}

Store is an abstraction over our datasource (key-value store) that provides actions operations

func NewActionsKVStore added in v0.67.0

func NewActionsKVStore(store kv.Store) Store

type StoreService added in v0.67.0

type StoreService struct {
	Store Store

	Source Source
	Writer OutputWriter
	// contains filtered or unexported fields
}

StoreService is an implementation of actions.Service that saves the run data to the blockstore and to the actions.Store (which is a fancy name for a DB - kv style or postgres directly)

func NewService

func NewService(ctx context.Context, store Store, source Source, writer OutputWriter, idGen IDGenerator, stats stats.Collector, cfg Config, serverAddress string) *StoreService

func (*StoreService) GetRunResult added in v0.67.0

func (s *StoreService) GetRunResult(ctx context.Context, repositoryID string, runID string) (*RunResult, error)

func (*StoreService) GetTaskResult added in v0.67.0

func (s *StoreService) GetTaskResult(ctx context.Context, repositoryID string, runID string, hookRunID string) (*TaskResult, error)

func (*StoreService) ListRunResults added in v0.67.0

func (s *StoreService) ListRunResults(ctx context.Context, repositoryID string, branchID, commitID string, after string) (RunResultIterator, error)

func (*StoreService) ListRunTaskResults added in v0.67.0

func (s *StoreService) ListRunTaskResults(ctx context.Context, repositoryID string, runID string, after string) (TaskResultIterator, error)

func (*StoreService) NewRunID added in v0.67.0

func (s *StoreService) NewRunID() string

func (*StoreService) PostCommitHook added in v0.67.0

func (s *StoreService) PostCommitHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PostCreateBranchHook added in v0.67.0

func (s *StoreService) PostCreateBranchHook(ctx context.Context, record graveler.HookRecord)

func (*StoreService) PostCreateTagHook added in v0.67.0

func (s *StoreService) PostCreateTagHook(ctx context.Context, record graveler.HookRecord)

func (*StoreService) PostDeleteBranchHook added in v0.67.0

func (s *StoreService) PostDeleteBranchHook(ctx context.Context, record graveler.HookRecord)

func (*StoreService) PostDeleteTagHook added in v0.67.0

func (s *StoreService) PostDeleteTagHook(ctx context.Context, record graveler.HookRecord)

func (*StoreService) PostMergeHook added in v0.67.0

func (s *StoreService) PostMergeHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreCommitHook added in v0.67.0

func (s *StoreService) PreCommitHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreCreateBranchHook added in v0.67.0

func (s *StoreService) PreCreateBranchHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreCreateTagHook added in v0.67.0

func (s *StoreService) PreCreateTagHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreDeleteBranchHook added in v0.67.0

func (s *StoreService) PreDeleteBranchHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreDeleteTagHook added in v0.67.0

func (s *StoreService) PreDeleteTagHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreMergeHook added in v0.67.0

func (s *StoreService) PreMergeHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) Run added in v0.67.0

func (s *StoreService) Run(ctx context.Context, record graveler.HookRecord) error

Run load and run actions based on the event information

func (*StoreService) SetEndpoint added in v0.87.0

func (s *StoreService) SetEndpoint(h *http.Server)

func (*StoreService) Stop added in v0.67.0

func (s *StoreService) Stop()

func (*StoreService) UpdateCommitID added in v0.67.0

func (s *StoreService) UpdateCommitID(ctx context.Context, repositoryID string, storageNamespace string, runID string, commitID string) error

UpdateCommitID assume record is a post event, we use the PreRunID to update the commit_id and save the run manifest again

type Task

type Task struct {
	RunID     string
	HookRunID string
	Action    *Action
	HookID    string
	Hook      Hook
	If        string
	Err       error
	StartTime time.Time
	EndTime   time.Time
}

type TaskResult

type TaskResult struct {
	RunID      string    `db:"run_id" json:"run_id"`
	HookRunID  string    `db:"hook_run_id" json:"hook_run_id"`
	HookID     string    `db:"hook_id" json:"hook_id"`
	ActionName string    `db:"action_name" json:"action_name"`
	StartTime  time.Time `db:"start_time" json:"start_time"`
	EndTime    time.Time `db:"end_time" json:"end_time"`
	Passed     bool      `db:"passed" json:"passed"`
}

func (*TaskResult) LogPath

func (r *TaskResult) LogPath() string

type TaskResultData added in v0.67.0

type TaskResultData struct {
	RunId      string                 `protobuf:"bytes,1,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"`
	HookRunId  string                 `protobuf:"bytes,2,opt,name=hook_run_id,json=hookRunId,proto3" json:"hook_run_id,omitempty"`
	HookId     string                 `protobuf:"bytes,3,opt,name=hook_id,json=hookId,proto3" json:"hook_id,omitempty"`
	ActionName string                 `protobuf:"bytes,4,opt,name=action_name,json=actionName,proto3" json:"action_name,omitempty"`
	StartTime  *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
	EndTime    *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=end_time,json=endTime,proto3" json:"end_time,omitempty"`
	Passed     bool                   `protobuf:"varint,9,opt,name=passed,proto3" json:"passed,omitempty"`
	// contains filtered or unexported fields
}

message data model for TaskResult struct

func (*TaskResultData) Descriptor deprecated added in v0.67.0

func (*TaskResultData) Descriptor() ([]byte, []int)

Deprecated: Use TaskResultData.ProtoReflect.Descriptor instead.

func (*TaskResultData) GetActionName added in v0.67.0

func (x *TaskResultData) GetActionName() string

func (*TaskResultData) GetEndTime added in v0.67.0

func (x *TaskResultData) GetEndTime() *timestamppb.Timestamp

func (*TaskResultData) GetHookId added in v0.67.0

func (x *TaskResultData) GetHookId() string

func (*TaskResultData) GetHookRunId added in v0.67.0

func (x *TaskResultData) GetHookRunId() string

func (*TaskResultData) GetPassed added in v0.67.0

func (x *TaskResultData) GetPassed() bool

func (*TaskResultData) GetRunId added in v0.67.0

func (x *TaskResultData) GetRunId() string

func (*TaskResultData) GetStartTime added in v0.67.0

func (x *TaskResultData) GetStartTime() *timestamppb.Timestamp

func (*TaskResultData) ProtoMessage added in v0.67.0

func (*TaskResultData) ProtoMessage()

func (*TaskResultData) ProtoReflect added in v0.67.0

func (x *TaskResultData) ProtoReflect() protoreflect.Message

func (*TaskResultData) Reset added in v0.67.0

func (x *TaskResultData) Reset()

func (*TaskResultData) String added in v0.67.0

func (x *TaskResultData) String() string

type TaskResultIterator

type TaskResultIterator interface {
	Next() bool
	Value() *TaskResult
	Err() error
	Close()
}

type Webhook

type Webhook struct {
	HookBase
	URL         string
	Timeout     time.Duration
	QueryParams map[string][]SecureString
	Headers     map[string]SecureString
}

func (*Webhook) Run

func (w *Webhook) Run(ctx context.Context, record graveler.HookRecord, buf *bytes.Buffer) (err error)

Directories

Path Synopsis
lua
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL