am

package
v0.0.0-...-c1e6758 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const DeleteTransferActivityName = "DeleteTransferActivity"
View Source
const PollIngestActivityName = "poll-ingest-activity"
View Source
const PollTransferActivityName = "poll-transfer-activity"
View Source
const StartTransferActivityName = "start-transfer-activity"
View Source
const UploadTransferActivityName = "UploadTransferActivity"

Variables

View Source
var (
	// ErrWorkOngoing indicates work is ongoing and polling should continue.
	ErrWorkOngoing = errors.New("work ongoing")

	// ErrBadRequest respresents an AM "400 Bad request" response, which can
	// occur while a transfer or ingest is still processing and may require
	// special handling.
	ErrBadRequest = errors.New("Archivematica response: 400 Bad request")
)

Functions

func ConvertJobToPreservationTask

func ConvertJobToPreservationTask(job amclient.Job) datatypes.PreservationTask

ConvertJobToPreservationTask converts an amclient.Job to a datatypes.PreservationTask.

Types

type Config

type Config struct {
	// Archivematica server address.
	Address string

	// Archivematica API user.
	User string

	// Archivematica API key.
	APIKey string

	// Archivematica processing configuration to use (default: "automated").
	ProcessingConfig string

	// SFTP configuration for uploading transfers to Archivematica.
	SFTP sftp.Config

	// Capacity sets the maximum number of worker sessions the worker can
	// handle at one time (default: 1).
	Capacity int

	// PollInterval is the time to wait between poll requests to the AM API.
	PollInterval time.Duration

	// TransferDeadline is the maximum time to wait for a transfer to complete.
	// Set to zero for no deadline.
	TransferDeadline time.Duration
}

type DeleteTransferActivity

type DeleteTransferActivity struct {
	// contains filtered or unexported fields
}

func NewDeleteTransferActivity

func NewDeleteTransferActivity(logger logr.Logger, client sftp.Client) *DeleteTransferActivity

func (*DeleteTransferActivity) Execute

type DeleteTransferActivityParams

type DeleteTransferActivityParams struct {
	Destination string
}

type DeleteTransferActivityResult

type DeleteTransferActivityResult struct{}

type JobTracker

type JobTracker struct {
	// contains filtered or unexported fields
}

func NewJobTracker

func NewJobTracker(
	clock clockwork.Clock,
	jobSvc amclient.JobsService,
	pkgSvc package_.Service,
	presActionID uint,
) *JobTracker

func (*JobTracker) SavePreservationTasks

func (jt *JobTracker) SavePreservationTasks(ctx context.Context, unitID string) (int, error)

SavePreservationTasks queries the Archivematica jobs list endpoint to get a list of completed jobs related to the transfer or ingest identified by unitID, then saves any new jobs as preservation tasks.

type PollIngestActivity

type PollIngestActivity struct {
	// contains filtered or unexported fields
}

func NewPollIngestActivity

func NewPollIngestActivity(
	logger logr.Logger,
	cfg *Config,
	clock clockwork.Clock,
	ingSvc amclient.IngestService,
	jobSvc amclient.JobsService,
	pkgSvc package_.Service,
) *PollIngestActivity

func (*PollIngestActivity) Execute

Execute polls Archivematica for the status of an ingest and returns when ingest is complete or returns an error status. Execute sends an activity heartbeat after each poll.

A response status of "REJECTED", "FAILED", "USER_INPUT", or "BACKLOG" returns a temporal.NonRetryableApplicationError to indicate that processing can not continue.

type PollIngestActivityParams

type PollIngestActivityParams struct {
	PresActionID uint
	SIPID        string
}

type PollIngestActivityResult

type PollIngestActivityResult struct {
	Status        string
	PresTaskCount int
}

type PollTransferActivity

type PollTransferActivity struct {
	// contains filtered or unexported fields
}

func NewPollTransferActivity

func NewPollTransferActivity(
	logger logr.Logger,
	cfg *Config,
	clock clockwork.Clock,
	tfrSvc amclient.TransferService,
	jobSvc amclient.JobsService,
	pkgSvc package_.Service,
) *PollTransferActivity

func (*PollTransferActivity) Execute

Execute polls Archivematica for the status of a transfer and returns when the transfer is complete or returns an error status. Execute sends an activity heartbeat after each poll.

On each poll, Execute requests an updated list of AM jobs performed and saves the job data to the package service as preservation tasks.

A transfer status of "REJECTED", "FAILED", "USER_INPUT", or "BACKLOG" returns a temporal.NonRetryableApplicationError to indicate that processing can not continue.

type PollTransferActivityParams

type PollTransferActivityParams struct {
	PresActionID uint
	TransferID   string
}

type PollTransferActivityResult

type PollTransferActivityResult struct {
	SIPID         string
	Path          string
	PresTaskCount int
}

type StartTransferActivity

type StartTransferActivity struct {
	// contains filtered or unexported fields
}

func NewStartTransferActivity

func NewStartTransferActivity(logger logr.Logger, cfg *Config, amps amclient.PackageService) *StartTransferActivity

func (*StartTransferActivity) Execute

Execute sends a request to the Archivematica API to start a new "auto-approved" transfer. If the request is successful a transfer UUID is returned. An error response will return a retryable or non-retryable temporal.ApplicationError, depending on the nature of the error.

type StartTransferActivityParams

type StartTransferActivityParams struct {
	Name string
	Path string
}

type StartTransferActivityResult

type StartTransferActivityResult struct {
	TransferID string
}

type UploadTransferActivity

type UploadTransferActivity struct {
	// contains filtered or unexported fields
}

UploadTransferActivity uploads a transfer via the SFTP client, and sends a periodic Temporal Heartbeat at the given heartRate.

func NewUploadTransferActivity

func NewUploadTransferActivity(
	logger logr.Logger,
	client sftp.Client,
	heartRate time.Duration,
) *UploadTransferActivity

NewUploadTransferActivity initializes and returns a new UploadTransferActivity.

func (*UploadTransferActivity) Execute

Execute copies the source transfer to the destination via SFTP.

func (*UploadTransferActivity) Heartbeat

func (a *UploadTransferActivity) Heartbeat(ctx context.Context, upload sftp.AsyncUpload, fileSize int64) error

Heartbeat sends a periodic Temporal heartbeat, which includes the number of bytes uploaded, until the upload is complete, cancelled or returns an error.

type UploadTransferActivityParams

type UploadTransferActivityParams struct {
	// Local path of the source file.
	SourcePath string
}

type UploadTransferActivityResult

type UploadTransferActivityResult struct {
	// Bytes copied to the remote file over the SFTP connection.
	BytesCopied int64
	// Full path of the destination file including `remoteDir` config path.
	RemoteFullPath string
	// Path of the destination file relative to the `remoteDir` config path.
	RemoteRelativePath string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL