dpn

package
v0.0.0-...-9369c8b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2016 License: Apache-2.0 Imports: 30 Imported by: 13

Documentation

Index

Constants

View Source
const (
	STAGE_PRE_COPY  = "Pre Copy"
	STAGE_COPY      = "Copying from ingest node"
	STAGE_PACKAGE   = "Packaging"
	STAGE_RECEIVE   = "Receiving"
	STAGE_VALIDATE  = "Validation"
	STAGE_STORE     = "Storage"
	STAGE_RECORD    = "Record"
	STAGE_COMPLETE  = "Complete"
	STAGE_CANCELLED = "Cancelled"

	DEFAULT_TOKEN_FORMAT_STRING = "token %s"

	BAG_TYPE_DATA         = "data"
	BAG_TYPE_RIGHTS       = "rights"
	BAG_TYPE_INTERPRETIVE = "interpretive"

	PATH_TYPE_LOCAL = "Local Filesystem"
	PATH_TYPE_S3    = "S3 Bucket"

	// These values are part of the published APTrust spec.
	APTRUST_BAGIT_VERSION  = "0.97"
	APTRUST_BAGIT_ENCODING = "UTF-8"
)
View Source
const MAX_ERR_MSG_SIZE = 2048

Don't log error messages longer than this

Variables

View Source
var BAGIT_TAGS = []string{
	"BagIt-Version",
	"Tag-File-Character-Encoding",
}

BAGIT_TAGS contains a list of tags required in the bagit file.

View Source
var BAG_INFO_TAGS = []string{
	"Source-Organization",
	"Organization-Address",
	"Contact-Name",
	"Contact-Phone",
	"Contact-Email",
	"Bagging-Date",
	"Bag-Size",
	"Bag-Group-Identifier",
	"Bag-Count",
}

BAG_INFO_TAGS contains a list of tags required in the bag-info file.

View Source
var DPN_INFO_TAGS = []string{
	"DPN-Object-ID",
	"Local-ID",
	"Ingest-Node-Name",
	"Ingest-Node-Address",
	"Ingest-Node-Contact-Name",
	"Ingest-Node-Contact-Email",
	"Version-Number",
	"First-Version-Object-ID",
	"Interpretive-Object-ID",
	"Rights-Object-ID",
	"Bag-Type",
}

DPN_INFO_TAGS contains a list tags required in the dpn-info file.

View Source
var TAGS_FOR_FILE = map[string][]string{
	"bagit.txt":    BAGIT_TAGS,
	"bag-info.txt": BAG_INFO_TAGS,
	filepath.Join("dpn-tags", "dpn-info.txt"): DPN_INFO_TAGS,
}

TAGS_FOR_FILE maps a tag file to the list of tags it should contain.

Functions

func CalculateSha256Digest

func CalculateSha256Digest(filePath string) (string, error)

Run the sha256 checksum on the bag we just copied from the remote node.

func GetRemoteClients

func GetRemoteClients(localClient *DPNRestClient, config *DPNConfig, logger *logging.Logger) (map[string]*DPNRestClient, error)

TODO: Fix this. This forces us to have empty entries in RemoteNodeTokens to ensure that we build remote node clients. Not good!

func GetRsyncCommand

func GetRsyncCommand(copyFrom, copyTo string, useSSH bool) *exec.Cmd

func HackNullDates

func HackNullDates(jsonBytes []byte) []byte

This hack works around the JSON decoding bug in Golang's core time and json libraries. The bug is described here: https://go-review.googlesource.com/#/c/9376/ We could do a "proper" work-around by changing all our structs to use pointers to time.Time instead of time.Time values, but then we have to check for nil in many places. There's already a patch in to fix this bug in the next release of Golang, so I'd rather have this hack for now and remove it when the next version of Golang comes out. That's better than changing to pointers, checking for nil in a hundred places, and then reverting ALL THAT when the bug is fixed. In practice the only null dates that should ever come out of our REST services are Node.LastPullDate, and that should only happen on the first day a new node is up and runnning. We just have to set these back to a reasonably old timestamp so we can ask the node for all items updated since that time. "Reasonably old" is anything before about June 1, 2015.

func PathWithinArchive

func PathWithinArchive(result *DPNResult, filePath, bagDir string) (string, error)

func RedirectHandler

func RedirectHandler(req *http.Request, via []*http.Request) error

By default, the Go HTTP client does not send headers from the original request to the redirect location. See the issue at https://code.google.com/p/go/issues/detail?id=4800&q=request%20header

We want to send all headers from the original request, but we'll send the auth header only if the host of the redirect URL matches the host of the original URL.

func SendToRecordQueue

func SendToRecordQueue(result *DPNResult, procUtil *bagman.ProcessUtil)

func SendToStorageQueue

func SendToStorageQueue(result *DPNResult, procUtil *bagman.ProcessUtil)

func SendToTroubleQueue

func SendToTroubleQueue(result *DPNResult, procUtil *bagman.ProcessUtil)

func SendToValidationQueue

func SendToValidationQueue(result *DPNResult, procUtil *bagman.ProcessUtil)

func TagFiles

func TagFiles() []string

TagFiles() returns a list of tag files we should check while performing validation.

Types

type BagBuilder

type BagBuilder struct {
	// LocalPath is the full, absolute path the the untarred bag
	// the builder will create. It will end with the bag's UUID,
	// so it should look something like this:
	// /mnt/dpn/bags/00000000-0000-0000-0000-000000000000.
	LocalPath string

	// IntellectualObject is the APTrust IntellectualObject that
	// we'll be repackaging as a DPN bag.
	IntellectualObject *bagman.IntellectualObject

	// DefaultMetadata is some metadata that goes into EVERY DPN
	// bag we create. This includes our name and address in the
	// DPN data section that describes who packaged the bag.
	// DefaultMetadata should be loaded from a JSON file using
	// the dpn.LoadConfig() function.
	DefaultMetadata *DefaultMetadata

	// UUID is the DPN identifier for this bag. This has nothing to
	// do with any APTrust UUID. It's generated in the constructor.
	UUID string

	// ErrorMessage describes what when wrong while trying to
	// package this bag. If it's an empty string, packaging
	// succeeded.
	ErrorMessage string

	// What type of bag is this? Data, rights or interpretive?
	BagType string

	// The underlying bag object.
	Bag *bagins.Bag
	// contains filtered or unexported fields
}

BagBuilder builds a DPN bag from an APTrust intellectual object.

func NewBagBuilder

func NewBagBuilder(localPath string, obj *bagman.IntellectualObject, defaultMetadata *DefaultMetadata) (*BagBuilder, error)

NewBagBuilder returns a new BagBuilder. Param localPath is the path to which the bag builder should write the DPN bag. Param obj is an IntellectualObject containing metadata about the APTrust bag that we'll be repackaging. Param defaultMetadata contains default metadata, such as the BagIt version, ingest node name, etc.

The BagBuilder just creates the skeleton of a valid DPN bag, with the required files. After you create this, call the following for each file you want to put in the bag's data directory:

err := builder.Bag.AddFile("/abs/path/to/source.txt", "rel/path/to/dest.txt")

That will copy the file at "/abs/path/to/source.txt" into the data directory at "rel/path/to/dest.txt", so its full relative path inside the bag would be "data/rel/path/to/dest.txt"

You can also add non-payload files outside the data directory. That usually means adding custom tag files to custom tag directories.

err := builder.Bag.AddCustomTagFile("/abs/path/to/source.txt", "rel/path/to/dest.txt", true)

That adds "/abs/path/to/source.txt" into "rel/path/to/dest.txt" inside the bag, but notice it's not in the data directory. The final param to AddCustomTagFile indicates whether you want to put the tag file's checksum in the tag manifest.

You should not have to add any of the DPN standard tag files or manifests. BagBuilder does that for you.

When you're done adding files to the bag, call this to write it all out to disk:

errors := builder.Bag.Save()

func (*BagBuilder) AddTagFile

func (builder *BagBuilder) AddTagFile(tagFileName string) (*bagins.TagFile, error)

func (*BagBuilder) BagTime

func (builder *BagBuilder) BagTime() string

BagTime returns the datetime the bag was created, in RFC3339 format (e.g. "2015-03-05T10:10:00Z")

type BagListResult

type BagListResult struct {
	Count    int32     `json:count`
	Next     *string   `json:next`
	Previous *string   `json:previous`
	Results  []*DPNBag `json:results`
}

BagListResult is what the REST service returns when we ask for a list of bags.

type Cleanup

type Cleanup struct {
	DPNConfig   *DPNConfig
	ProcUtil    *bagman.ProcessUtil
	LocalClient *DPNRestClient
}

func NewCleanup

func NewCleanup(procUtil *bagman.ProcessUtil, dpnConfig *DPNConfig) (*Cleanup, error)

func (*Cleanup) DeleteReplicatedBags

func (cleanup *Cleanup) DeleteReplicatedBags()

type Copier

type Copier struct {
	LookupChannel      chan *DPNResult
	CopyChannel        chan *DPNResult
	PostProcessChannel chan *DPNResult
	DPNConfig          *DPNConfig
	ProcUtil           *bagman.ProcessUtil
	LocalClient        *DPNRestClient
	RemoteClients      map[string]*DPNRestClient
	// WaitGroup is for running local tests only.
	WaitGroup sync.WaitGroup
}

func NewCopier

func NewCopier(procUtil *bagman.ProcessUtil, dpnConfig *DPNConfig) (*Copier, error)

func (*Copier) HandleMessage

func (copier *Copier) HandleMessage(message *nsq.Message) error

func (*Copier) RunTest

func (copier *Copier) RunTest(dpnResult *DPNResult)

type CopyResult

type CopyResult struct {
	LocalPath    string
	ErrorMessage string
	RsyncStdout  string
	RsyncStderr  string
	InfoMessage  string
	BagWasCopied bool
}

type DPNBag

type DPNBag struct {

	// UUID is the unique identifier for a bag
	UUID string `json:"uuid"`

	// LocalId is the depositor's local identifier for a bag.
	LocalId string `json:"local_id"`

	// Member is the UUID of the member who deposited this bag.
	Member string `json:"member"`

	// Size is the size, in bytes of the bag.
	Size uint64 `json:"size"`

	// FirstVersionUUID is the UUID of the first version
	// of this bag.
	FirstVersionUUID string `json:"first_version_uuid"`

	// Version is the version or revision number of the bag. Starts at 1.
	Version uint32 `json:"version"`

	// IngestNode is the node that first ingested or produced the bag.
	IngestNode string `json:"ingest_node"`

	// AdminNode is the authoritative node for this bag. If various nodes
	// have conflicting registry info for this bag, the admin node wins.
	// The admin node also has some authority in restoring and (if its ever
	// possible) deleting bags.
	AdminNode string `json:"admin_node"`

	// BagType is one of 'D' (Data), 'R' (Rights) or 'I' (Interpretive)
	BagType string `json:"bag_type"`

	// Rights is a list of UUIDs of rights objects for this bag.
	Rights []string `json:"rights"`

	// Interpretive is a list of UUIDs of interpretive objects for this bag.
	Interpretive []string `json:"interpretive"`

	// ReplicatingNodes is a list of one more nodes that has stored
	// copies of this bag. The items in the list are node namespaces,
	// which are strings. E.g. ['aptrust', 'chron', 'tdr']
	ReplicatingNodes []string `json:"replicating_nodes"`

	// Fixities are the checksum/fixity values for this bag.
	Fixities *DPNFixity `json:"fixities"`

	// CreatedAt is when this record was created.
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is when this record was last updated.
	UpdatedAt time.Time `json:"updated_at"`
}

DPNBag represents a Bag object in the DPN REST service. Like all of the DPN REST objects, it contains metadata only.

type DPNConfig

type DPNConfig struct {
	// EnvironmentName is the name of this configuration: "dev", "test",
	// "production", etc.
	EnvironmentName string
	// LocalNode is the namespace of the node this code is running on.
	// E.g. "aptrust", "chron", "hathi", "tdr", "sdr"
	LocalNode string
	// Where should DPN service logs go?
	LogDirectory string
	// Log level (4 = debug)
	LogLevel logging.Level
	// Should we log to Stderr in addition to writing to
	// the log file?
	LogToStderr bool
	// Number of nodes we should replicate bags to.
	ReplicateToNumNodes int
	// Should we accept self-signed and otherwise invalid SSL
	// certificates? We need to do this in testing, but it
	// should not be allowed in production. Bools in Go default
	// to false, so if this is not set in config, we should be
	// safe.
	AcceptInvalidSSLCerts bool
	// When copying bags from remote nodes, should we use rsync
	// over SSH (true) or just plain rsync (false)?
	UseSSHWithRsync bool
	// Default metadata that goes into bags produced at our node.
	DefaultMetadata *DefaultMetadata
	// Settings for connecting to our own REST service
	RestClient *RestClientConfig
	// Standard Auth token header format for REST services
	// is "token %s", where "%s" will be the token. Rails
	// REST services require the format "Token token=%s".
	// This map of formats lets us override the standard
	// "token %s" with whatever the remote REST service
	// expects. Since the default is "token %s", there is no
	// need to create entries in this map for most nodes.
	AuthTokenHeaderFormats map[string]string
	// RemoteNodeAdminTokensForTesting are used in integration
	// tests only, when we want to perform admin-only operations,
	// such as creating bags and replication requests on a remote
	// node in the test cluster.
	RemoteNodeAdminTokensForTesting map[string]string
	// API Tokens for connecting to remote nodes
	RemoteNodeTokens map[string]string
	// URLs for remote nodes. Set these only if you want to
	// override the node URLs we get back from our local
	// DPN REST server.
	RemoteNodeURLs map[string]string
}

func LoadConfig

func LoadConfig(pathToFile, requestedConfig string) (*DPNConfig, error)

func (*DPNConfig) TokenFormatStringFor

func (dpnConfig *DPNConfig) TokenFormatStringFor(nodeNamespace string) string

type DPNFetchResult

type DPNFetchResult struct {
	FetchResult *bagman.FetchResult
	GenericFile *bagman.GenericFile
}

func (*DPNFetchResult) Succeeded

func (result *DPNFetchResult) Succeeded() bool

type DPNFixity

type DPNFixity struct {

	// The algorithm used to check the fixity. Usually 'sha256',
	// but others may be valid in the future.
	Sha256 string `json:"sha256"`
}

DPNFixity represents a checksum for a bag in the DPN REST service.

type DPNMember

type DPNMember struct {

	// UUID is the unique identifier for a member
	UUID string `json:"uuid"`

	// Name is the member's name
	Name string `json:"name"`

	// Email is the member's email address
	Email string `json:"email"`

	// CreatedAt is when this record was created.
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is when this record was last updated.
	UpdatedAt time.Time `json:"updated_at"`
}

DPNMember describes an institution or depositor that owns a bag.

type DPNNode

type DPNNode struct {
	Name             string      `json:"name"`
	Namespace        string      `json:"namespace"`
	APIRoot          string      `json:"api_root"`
	SSHPubKey        string      `json:"ssh_pubkey"`
	ReplicateFrom    []string    `json:"replicate_from"`
	ReplicateTo      []string    `json:"replicate_to"`
	RestoreFrom      []string    `json:"restore_from"`
	RestoreTo        []string    `json:"restore_to"`
	Protocols        []string    `json:"protocols"`
	FixityAlgorithms []string    `json:"fixity_algorithms"`
	CreatedAt        time.Time   `json:"created_at"`
	UpdatedAt        time.Time   `json:"updated_at"`
	LastPullDate     time.Time   `json:"last_pull_date"`
	Storage          *DPNStorage `json:"storage"`
}

func (*DPNNode) ChooseNodesForReplication

func (node *DPNNode) ChooseNodesForReplication(howMany int) []string

This randomly chooses nodes for replication, returning a slice of strings. Each string is the namespace of a node we should replicate to. This may return fewer nodes than you specified in the howMany param if this node replicates to fewer nodes.

We may have to revisit this in the future, if DPN specifies logic for how to choose remote nodes. For now, we can choose any node, because they are all geographically diverse and all use different storage backends.

type DPNReplicationTransfer

type DPNReplicationTransfer struct {

	// FromNode is the node where the bag is coming from.
	// The FromNode initiates the replication request.
	FromNode string `json:"from_node"`

	// ToNode is the node the bag is being transfered to
	ToNode string `json:"to_node"`

	// Bag is the UUID of the bag to be replicated
	BagId string `json:"uuid"`

	// ReplicationId is a unique id for this replication request.
	// It's a UUID in string format.
	ReplicationId string `json:"replication_id"`

	// FixityAlgorithm is the algorithm used to calculate the fixity digest.
	FixityAlgorithm string `json:"fixity_algorithm"`

	// FixityNonce is an optional nonce used to calculate the fixity digest.
	FixityNonce *string `json:"fixity_nonce"`

	// FixityValue is the fixity value calculated by the ToNode after
	// it receives the bag. This will be null/empty until the replicating
	// node sends the info back to the FromNode.
	FixityValue *string `json:"fixity_value"`

	// FixityAccept describes whether the FromNode accepts the fixity
	// value calculated by the ToNode. This is a nullable boolean,
	// so it has to be a pointer.
	FixityAccept *bool `json:"fixity_accept"`

	// BagValid is a value set by the ToNode to indicate whether
	// the bag it received was valid. This is a nullable boolean,
	// so it has to be a pointer.
	BagValid *bool `json:"bag_valid"`

	// Status is the status of the request, which can be any of:
	//
	// "requested"  - The FromNode has requested this transfer.
	//                This means the transfer is new, and no
	//                action has been taken yet.
	// "rejected"   - Set by the ToNode when it will not or cannot
	//                accept this transfer. (Usually due to disk space.)
	// "received"   - Set by the ToNode to indicate it has received the
	//                the bag.
	// "confirmed"  - Set by the FromNode after the bag has been confirmed
	//                valid, the fixity value has been approved, and the bag
	//                has been stored by the ToNode.
	// "stored"     - Set by the ToNode after the bag has been copied to
	//                long-term storage.
	// "cancelled"  - Can be set by either node for any reason. No further
	//                processing should occur on a cancelled request.
	Status string `json:"status"`

	// Protocol is the network protocol used to transfer the bag.
	// At launch, the only valid value for this is 'R' for rsync.
	Protocol string `json:"protocol"`

	// Link is a URL that the ToNode can use to copy the bag from the
	// FromNode. This value is set by the FromNode.
	Link string `json:"link"`

	// CreatedAt is the datetime when this record was created.
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is the datetime when this record was last updated.
	UpdatedAt time.Time `json:"updated_at"`
}

type DPNRestClient

type DPNRestClient struct {
	HostUrl    string
	APIVersion string
	APIKey     string
	Node       string
	// contains filtered or unexported fields
}

DPNRestClient is a client for the DPN REST API.

func NewDPNRestClient

func NewDPNRestClient(hostUrl, apiVersion, apiKey, node string, dpnConfig *DPNConfig, logger *logging.Logger) (*DPNRestClient, error)

Creates a new DPN REST client.

func (*DPNRestClient) BuildUrl

func (client *DPNRestClient) BuildUrl(relativeUrl string, queryParams *url.Values) string

BuildUrl combines the host and protocol in client.HostUrl with relativeUrl to create an absolute URL. For example, if client.HostUrl is "http://localhost:3456", then client.BuildUrl("/path/to/action.json") would return "http://localhost:3456/path/to/action.json".

func (*DPNRestClient) DPNBagCreate

func (client *DPNRestClient) DPNBagCreate(bag *DPNBag) (*DPNBag, error)

func (*DPNRestClient) DPNBagGet

func (client *DPNRestClient) DPNBagGet(identifier string) (*DPNBag, error)

func (*DPNRestClient) DPNBagListGet

func (client *DPNRestClient) DPNBagListGet(queryParams *url.Values) (*BagListResult, error)

func (*DPNRestClient) DPNBagUpdate

func (client *DPNRestClient) DPNBagUpdate(bag *DPNBag) (*DPNBag, error)

func (*DPNRestClient) DPNMemberCreate

func (client *DPNRestClient) DPNMemberCreate(bag *DPNMember) (*DPNMember, error)

func (*DPNRestClient) DPNMemberGet

func (client *DPNRestClient) DPNMemberGet(identifier string) (*DPNMember, error)

func (*DPNRestClient) DPNMemberGetByName

func (client *DPNRestClient) DPNMemberGetByName(name string) (*DPNMember, error)

Returns the DPN Member with the specified name, or an error if there is not exactly one DPN member with that name.

func (*DPNRestClient) DPNMemberListGet

func (client *DPNRestClient) DPNMemberListGet(queryParams *url.Values) (*MemberListResult, error)

func (*DPNRestClient) DPNMemberUpdate

func (client *DPNRestClient) DPNMemberUpdate(bag *DPNMember) (*DPNMember, error)

func (*DPNRestClient) DPNNodeGet

func (client *DPNRestClient) DPNNodeGet(identifier string) (*DPNNode, error)

func (*DPNRestClient) DPNNodeGetLastPullDate

func (client *DPNRestClient) DPNNodeGetLastPullDate(identifier string) (time.Time, error)

Returns the last time we pulled data from the specified node.

func (*DPNRestClient) DPNNodeListGet

func (client *DPNRestClient) DPNNodeListGet(queryParams *url.Values) (*NodeListResult, error)

func (*DPNRestClient) DPNNodeUpdate

func (client *DPNRestClient) DPNNodeUpdate(node *DPNNode) (*DPNNode, error)

DPNNodeUpdate updates a DPN Node record. You can update node records only if you are the admin on the server where you're updating the record. Though this method lets you update any attributes related to the node, you should update only the LastPullDate attribute through this client. Use the web admin interface to perform more substantive node updates.

func (*DPNRestClient) DPNReplicationListGet

func (client *DPNRestClient) DPNReplicationListGet(queryParams *url.Values) (*ReplicationListResult, error)

func (*DPNRestClient) DPNRestoreListGet

func (client *DPNRestClient) DPNRestoreListGet(queryParams *url.Values) (*RestoreListResult, error)

func (*DPNRestClient) GetRemoteClient

func (client *DPNRestClient) GetRemoteClient(remoteNodeNamespace string, dpnConfig *DPNConfig, logger *logging.Logger) (*DPNRestClient, error)

Returns a DPN REST client that can talk to a remote node. This function has to connect to out local DPN node to get information about the remote node. It returns a new client that can connect to the remote node with the correct URL and API key. We use this function to get a client that can update a replication request or a restore request on the originating node.

func (*DPNRestClient) NewJsonRequest

func (client *DPNRestClient) NewJsonRequest(method, targetUrl string, body io.Reader) (*http.Request, error)

newJsonGet returns a new request with headers indicating JSON request and response formats.

func (*DPNRestClient) ReplicationTransferCreate

func (client *DPNRestClient) ReplicationTransferCreate(xfer *DPNReplicationTransfer) (*DPNReplicationTransfer, error)

func (*DPNRestClient) ReplicationTransferGet

func (client *DPNRestClient) ReplicationTransferGet(identifier string) (*DPNReplicationTransfer, error)

func (*DPNRestClient) ReplicationTransferUpdate

func (client *DPNRestClient) ReplicationTransferUpdate(xfer *DPNReplicationTransfer) (*DPNReplicationTransfer, error)

func (*DPNRestClient) RestoreTransferCreate

func (client *DPNRestClient) RestoreTransferCreate(xfer *DPNRestoreTransfer) (*DPNRestoreTransfer, error)

func (*DPNRestClient) RestoreTransferGet

func (client *DPNRestClient) RestoreTransferGet(identifier string) (*DPNRestoreTransfer, error)

func (*DPNRestClient) RestoreTransferUpdate

func (client *DPNRestClient) RestoreTransferUpdate(xfer *DPNRestoreTransfer) (*DPNRestoreTransfer, error)

type DPNRestoreTransfer

type DPNRestoreTransfer struct {

	// RestoreId is a unique id for this restoration request.
	RestoreId string `json:"restore_id"`

	// FromNode is the node from which the bag should be restored.
	FromNode string `json:"from_node"`

	// ToNode is the node to which the bag should be restored.
	// The ToNode initiates a restoration request.
	ToNode string `json:"to_node"`

	// Bag is the unique identifier of the bag to be restored.
	BagId string `json:"uuid"`

	// Status is the status of the restoration operation. It can
	// have any of the following values:
	//
	// "requested" - Default status used when record first created.
	// "accepted"  - Indicates the FromNode has accepted the request to
	//               restore the bag.
	// "rejected"  - Set by the FromNode if it cannot or will not restore
	//               the bag.
	// "prepared"  - Set by the FromNode when the content has been restored
	//               locally and staged for transfer back to the to_node.
	// "finished"  - Set by the ToNode after it has retrieved the restored
	//               bag from the FromNode.
	// "cancelled" - Set by either node to indicate the restore operation
	//               was cancelled.
	Status string `json:"status"`

	// Protocol is the network protocol used to transfer the bag.
	// At launch, the only valid value for this is 'R' for rsync.
	Protocol string `json:"protocol"`

	// Link is a URL that the ToNode can use to copy the bag from the
	// FromNode. This value is set by the FromNode.
	Link string `json:"link"`

	// CreatedAt is the datetime when this record was created.
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is the datetime when this record was last updated.
	UpdatedAt time.Time `json:"updated_at"`
}

type DPNResult

type DPNResult struct {
	// BagIdentifier is the APTrust bag identifier, composed of
	// the institution domain name, a slash, and the institution's
	// internal bag identifier. E.g. "test.edu/ncsu.1840.16-1004"
	// For bags coming from other nodes, this will be blank.
	BagIdentifier string

	// If this is an APTrust bag that we're ingesting into DPN,
	// this is the ID of the "send to DPN" request in the
	// ProcessedItem queue. For bags being replicated from
	// other nodes, this will be nil.
	ProcessedItemId int

	// LocalPath is where this bag is stored on disk. The bag
	// may be a file ending in .tar or a directory if the bag
	// is not tarred.
	LocalPath string

	// The bag's md5 digest. We need this to copy to Amazon S3/Glacier.
	BagMd5Digest string

	// Sha256Digest of the tarred bag file. No longer used.
	BagSha256Digest string

	// Sha256 digest of the tagmanifest-sha256.txt file. We use this
	// to verify that the bag was correctly copied by replicating
	// nodes.
	TagManifestDigest string

	// The size of the bag. Used when copying to S3/Glacier.
	BagSize int64

	// The NSQ message being processed. May be nil if we're
	// running tests.
	NsqMessage *nsq.Message `json:"-"`

	// The current stage of processing for this bag.
	Stage string

	// A general error message describing what went wrong with
	// processing. More specific errors will appear in the
	// PackageResult or ValidationResult, depending
	// on the stage where processing failed. If this is empty,
	// there was no error.
	ErrorMessage string

	// The DPN bag record for this object. This will be nil for
	// bags ingested through APTrust and in the packaging stage,
	// since the bag won't have a UUID until after it's packaged.
	DPNBag *DPNBag

	// The DPN transfer request associated with this bag. This will
	// be nil if it's a bag created at our own node. It will be
	// non-nil for bags we're replicating from other nodes.
	TransferRequest *DPNReplicationTransfer

	// The results of attempts to fetch the S3 files that make up this
	// bag. This will be nil for bags that we're replicating from
	// other nodes, because in those cases, we just copy the whole
	// tar file from the remote node. For bags ingested at APTrust,
	// this should contain a number of records, once we've reached
	// the fetch stage of the process. Before that stage, it will
	// be nil.
	//
	// It would be nice to serialize this, but we wind up with
	// a lot of JSON.
	FetchResults *FetchResultCollection `json:"-"`

	// The result of the attempt to package this object as a DPN
	// bag. We only package APTrust bags that we ingested and that
	// the depositor has indicated should go to DPN. Bags we
	// replicate from other nodes will already have been packaged
	// by the ingesting node, so the PackageResult for those will
	// be nil. On successful copy, check DPNResult.LocalPath to
	// find where we stored the file.
	PackageResult *PackageResult

	// The result of the attempt to copy the bag from its admin
	// or ingest node (typically ingest node). When a remote
	// node asks us to replicate a bag, we have to copy it from the
	// remote node to our staging area, usually via rsync. This
	// structure records the result of that copy. For bags that we
	// created at APTrust, this will be nil because we don't have
	// to copy from ourselves.
	CopyResult *CopyResult

	// The URL of this item in long-term storage. This will be an
	// AWS S3 or Glacier URL. An empty string indicates the bag
	// has not yet been copied to storage.
	StorageURL string

	// The result of the attempt to record information about the bag
	// in DPN and in APTrust. This object is defined in recorder.go.
	RecordResult *RecordResult

	// The result of the attempt to validate the bag. This includes
	// information about whether the bag's structure is valid, whether
	// all required tags are present, checksums checked out, etc.
	ValidationResult *ValidationResult

	// Indicates whether we should try to process this bag again.
	// For transient problems, such as network outages and lack of
	// disk space, this will be true. For fatal problems, such as
	// an invalid bag, this will be false.
	Retry bool
	// contains filtered or unexported fields
}

func NewDPNResult

func NewDPNResult(bagIdentifier string) *DPNResult

func (*DPNResult) OriginalBagName

func (result *DPNResult) OriginalBagName() (string, error)

func (*DPNResult) TarFilePath

func (result *DPNResult) TarFilePath() string

type DPNStorage

type DPNStorage struct {
	Region string `json:"region"`
	Type   string `json:"type"`
}

type DPNSync

type DPNSync struct {
	LocalClient   *DPNRestClient
	RemoteClients map[string]*DPNRestClient
	Logger        *logging.Logger
	Config        *DPNConfig
}

func NewDPNSync

func NewDPNSync(config *DPNConfig) (*DPNSync, error)

func (*DPNSync) GetAllNodes

func (dpnSync *DPNSync) GetAllNodes() ([]*DPNNode, error)

Returns a list of all the nodes that our node knows about.

func (*DPNSync) LocalNodeName

func (dpnSync *DPNSync) LocalNodeName() string

func (*DPNSync) RemoteNodeNames

func (dpnSync *DPNSync) RemoteNodeNames() []string

func (*DPNSync) SyncBags

func (dpnSync *DPNSync) SyncBags(remoteNode *DPNNode) ([]*DPNBag, error)

Syncs bags from the specified node to our own local DPN registry if the bags match these critieria:

1. The node we are querying is the admin node for the bag. 2. The bag was updated since the last time we queried the node.

Returns a list of the bags that were successfully updated. Even on error, this may still return a list with whatever bags were updated before the error occurred.

func (*DPNSync) SyncEverythingFromNode

func (dpnSync *DPNSync) SyncEverythingFromNode(remoteNode *DPNNode) *SyncResult

Sync all bags, replication requests and restore requests from the specified remote node. Note that this is a pull-only sync. We are not writing any data to other nodes, just reading what they have and updating our own registry with their info.

func (*DPNSync) SyncReplicationRequests

func (dpnSync *DPNSync) SyncReplicationRequests(remoteNode *DPNNode) ([]*DPNReplicationTransfer, error)

func (*DPNSync) SyncRestoreRequests

func (dpnSync *DPNSync) SyncRestoreRequests(remoteNode *DPNNode) ([]*DPNRestoreTransfer, error)

type DefaultMetadata

type DefaultMetadata struct {
	Comment                string
	BagItVersion           string
	BagItEncoding          string
	IngestNodeName         string
	IngestNodeAddress      string
	IngestNodeContactName  string
	IngestNodeContactEmail string
}

DefaultMetadata includes mostly static information about bags that APTrust packages for DPN. You can specify this information in a json config file and load it with LoadConfig.

type FetchResultCollection

type FetchResultCollection struct {
	Items []*DPNFetchResult
	// contains filtered or unexported fields
}

func FetchObjectFiles

func FetchObjectFiles(s3Client *bagman.S3Client, genericFiles []*bagman.GenericFile, dir string) (*FetchResultCollection, error)

FetchFiles fetches remote S3 files that make up the specified IntellectualObject into the specified directory.

func NewFetchResultCollection

func NewFetchResultCollection() *FetchResultCollection

func (*FetchResultCollection) Add

func (results *FetchResultCollection) Add(result *DPNFetchResult)

func (*FetchResultCollection) Errors

func (results *FetchResultCollection) Errors() []string

func (*FetchResultCollection) FindByIdentifier

func (results *FetchResultCollection) FindByIdentifier(identifier string) *DPNFetchResult

Finds a DPNFetchResult by GenericFile.Identifier

func (*FetchResultCollection) SuccessCount

func (results *FetchResultCollection) SuccessCount() int

type MemberListResult

type MemberListResult struct {
	Count    int32        `json:count`
	Next     *string      `json:next`
	Previous *string      `json:previous`
	Results  []*DPNMember `json:results`
}

type NodeListResult

type NodeListResult struct {
	Count    int32      `json:count`
	Next     *string    `json:next`
	Previous *string    `json:previous`
	Results  []*DPNNode `json:results`
}

type PackageResult

type PackageResult struct {
	BagBuilder      *BagBuilder
	DPNFetchResults []*DPNFetchResult
	TarFilePath     string
	ErrorMessage    string
}

PackageResult maintains information about the state of the packaging process. This struct is passed from channel to channel, accumulating information along the way. If packaging fails after max attempts, this struct will be dumped into the trouble queue as JSON.

func (*PackageResult) Errors

func (result *PackageResult) Errors() []string

func (*PackageResult) Succeeded

func (result *PackageResult) Succeeded() bool

type Packager

type Packager struct {
	LookupChannel      chan *DPNResult
	FetchChannel       chan *DPNResult
	BuildChannel       chan *DPNResult
	TarChannel         chan *DPNResult
	CleanupChannel     chan *DPNResult
	PostProcessChannel chan *DPNResult
	DPNConfig          *DPNConfig
	ProcUtil           *bagman.ProcessUtil
	// WaitGroup is for running local tests only.
	WaitGroup sync.WaitGroup
}

func NewPackager

func NewPackager(procUtil *bagman.ProcessUtil, dpnConfig *DPNConfig) *Packager

func (*Packager) DPNBagDirectory

func (packager *Packager) DPNBagDirectory(result *DPNResult) (string, error)

Returns the path to the directory where we will build the DPN bag. If the DPN staging dir is at /mnt/dpn, and the bag we're restoring has the identifier test.edu/my_bag, this will return /mnt/dpn/test.edu/my_bag

func (*Packager) FilesAlreadyFetched

func (packager *Packager) FilesAlreadyFetched(result *DPNResult) (map[string]bool, error)

func (*Packager) FilesToFetch

func (packager *Packager) FilesToFetch(result *DPNResult) ([]*bagman.GenericFile, error)

func (*Packager) HandleMessage

func (packager *Packager) HandleMessage(message *nsq.Message) error

MessageHandler handles messages from NSQ, putting each item into the pipleline.

func (*Packager) RunTest

func (packager *Packager) RunTest(bagIdentifier string) *DPNResult

Packages the bag identified by bagIdentifier. This is for local dev testing. You still need to have Fluctus running to retrieve bag info, and you need to have your S3 environment or config vars set up. Run: `dpn_package_devtest -config=dev`

type RecordResult

type RecordResult struct {
	// Did we create the DPN bag record in our local DPN registry?
	// We do this only if APTrust was the ingest node. If we created
	// the bag, this should be set to the bag's CreatedAt timestamp,
	// as returned by the server.
	DPNBagCreatedAt time.Time
	// If we created replication requests for this bag, the
	// namespaces of the replicating nodes should go here.
	// Note that this just means we created replication requests;
	// it does not mean those requests have been fulfilled.
	DPNReplicationRequests []string
	// If this was a local APTrust bag, we create a PREMIS
	// event saying that the bag has been ingested to DPN.
	// The PREMIS event identifier is a UUID string.
	PremisIngestEventId string
	// PREMIS identifier assignment event ID for bags ingested
	// by APTrust.
	PremisIdentifierEventId string
	// What time did we update the processed item request for this bag?
	// This lets Fluctus know that the task is complete.
	ProcessedItemUpdatedAt time.Time
	// If this is not an APTrust bag, did we send the copy receipt
	// the remote node that asked us to replicate this bag?
	// If sent the copy receipt, this should be set to the
	// ReplicationTransfer object's UpdatedAt timestamp, as returned
	// by the remote DPN REST server.
	CopyReceiptSentAt time.Time
	// If this is not an APTrust bag, did we send a message to the
	// remote node describing the outcome of our attempt to copy
	// this bag into long-term storage? If so, set this to the
	// UpdatedAt timestamp of the ReplicationTransfer object, as
	// returned by the remote DPN REST server.
	StorageResultSentAt time.Time
	// ErrorMessage contains information about an error that occurred
	// at any step during the recording process. If ErrorMessage is
	// an empty string, no error occurred.
	ErrorMessage string
}

func NewRecordResult

func NewRecordResult() *RecordResult

type Recorder

type Recorder struct {
	RecordChannel      chan *DPNResult
	PostProcessChannel chan *DPNResult
	ProcUtil           *bagman.ProcessUtil
	DPNConfig          *DPNConfig
	LocalRESTClient    *DPNRestClient
	RemoteClients      map[string]*DPNRestClient
	// WaitGroup is for running local tests only.
	WaitGroup sync.WaitGroup
}

func NewRecorder

func NewRecorder(procUtil *bagman.ProcessUtil, dpnConfig *DPNConfig) (*Recorder, error)
func (recorder *Recorder) CreateSymLink(result *DPNResult, toNode string) (string, error)

func (*Recorder) HandleMessage

func (recorder *Recorder) HandleMessage(message *nsq.Message) error

func (*Recorder) MakeReplicationTransfer

func (recorder *Recorder) MakeReplicationTransfer(result *DPNResult, toNode string) *DPNReplicationTransfer

func (*Recorder) RecordAPTrustDPNData

func (recorder *Recorder) RecordAPTrustDPNData(result *DPNResult)

Records data for DPN bags ingested at APTrust. 1. Create a new bag record in our local DPN node. 2. Create a PREMIS event in Fluctus saying this bag has been copied to DPN. 3. Create replication requests for this bag in our local DPN node.

func (*Recorder) RecordCopyReceipt

func (recorder *Recorder) RecordCopyReceipt(result *DPNResult)

Tell the remote node that we succeeded or failed in copying the bag from the remote node to our local staging area. (This is about the rsync copy, not the copy to long-term storage.)

We update the remote node for transfer requests only. We don't to this for bags we packaged locally.

When we receive a valid bag, tell the remote node that we got the bag and it looks OK. If the remote node accepts the checksum, we'll send the bag off to storage. There could be one of two problems here:

  1. We determined that the bag was not valid. (Bad checksum, missing files, or some similar issue.)
  2. The remote node did not accept the checksum we calculated on the tag manifest.

In either case, the remote node will set the status of the transfer request to 'Cancelled'. If that happens, we'll set the error message on the result and we will delete the bag without sending it to storage.

If the bag is valid and the remote node accepts our tag manifest checksum, this bag will go into the storage queue.

func (*Recorder) RecordStorageResult

func (recorder *Recorder) RecordStorageResult(result *DPNResult)

Tell the remote node that we managed to copy the bag successfully into long-term storage, or that we failed to store it.

Set result.ErrorMessage and result.Retry if there are problems.

func (*Recorder) RunTest

func (recorder *Recorder) RunTest(result *DPNResult)

type ReplicationListResult

type ReplicationListResult struct {
	Count    int32                     `json:count`
	Next     *string                   `json:next`
	Previous *string                   `json:previous`
	Results  []*DPNReplicationTransfer `json:results`
}

ReplicationListResult is what the REST service returns when we ask for a list of transfer requests.

type RestClientConfig

type RestClientConfig struct {
	Comment         string
	LocalServiceURL string
	LocalAPIRoot    string
	LocalAuthToken  string
}

type RestoreListResult

type RestoreListResult struct {
	Count    int32                 `json:count`
	Next     *string               `json:next`
	Previous *string               `json:previous`
	Results  []*DPNRestoreTransfer `json:results`
}

RestoreListResult is what the REST service returns when we ask for a list of restore requests.

type Storer

type Storer struct {
	StorageChannel     chan *DPNResult
	CleanupChannel     chan *DPNResult
	BagCreateChannel   chan *DPNResult
	PostProcessChannel chan *DPNResult
	ProcUtil           *bagman.ProcessUtil
	DPNConfig          *DPNConfig
	LocalRESTClient    *DPNRestClient
	// WaitGroup is for running local tests only.
	WaitGroup sync.WaitGroup
}

func NewStorer

func NewStorer(procUtil *bagman.ProcessUtil, dpnConfig *DPNConfig) (*Storer, error)

func (*Storer) GetS3Options

func (storer *Storer) GetS3Options(result *DPNResult) (s3.Options, error)

func (*Storer) HandleMessage

func (storer *Storer) HandleMessage(message *nsq.Message) error

func (*Storer) RunTest

func (storer *Storer) RunTest(result *DPNResult)

type SyncResult

type SyncResult struct {
	// Node is the node we are pulling information from.
	RemoteNode *DPNNode
	// Bags is a list of bags successfully synched.
	Bags []*DPNBag
	// ReplicationTransfers successfully synched.
	ReplicationTransfers []*DPNReplicationTransfer
	// RestoreTransfers successfully synched.
	RestoreTransfers []*DPNRestoreTransfer
	// BagSyncError contains the error (if any) that occurred
	// during the bag sync process. The first error will stop
	// the synching of all subsquent bags.
	BagSyncError error
	// ReplicationSyncError contains the error (if any) that occurred
	// during the synching of Replication Transfers. The first error
	// will stop the synching of all subsquent replication requests.
	ReplicationSyncError error
	// RestoreSyncError contains the error (if any) that occurred
	// during the synching of Restore Transfers. The first error
	// will stop the synching of all subsquent restore requests.
	RestoreSyncError error
}

SyncResult describes the result of an operation where we pull info about all updated bags, replication requests and restore requests from a remote node and copy that data into our own local DPN registry.

func (*SyncResult) HasSyncErrors

func (syncResult *SyncResult) HasSyncErrors() bool

type TroubleProcessor

type TroubleProcessor struct {
	ProcUtil *bagman.ProcessUtil
	// WaitGroup is for running local tests only.
	WaitGroup sync.WaitGroup
}

TroubleProcessor dumps the ProcessResult structure of items that failed the ingest process into JSON files. The JSON is formatted and human-readable, and may be deserialized and loaded into other processes in the future. The ProcessResult structure contains fairly detailed information about every stage of the ingest process.

func NewTroubleProcessor

func NewTroubleProcessor(procUtil *bagman.ProcessUtil) *TroubleProcessor

func (*TroubleProcessor) HandleMessage

func (troubleProcessor *TroubleProcessor) HandleMessage(message *nsq.Message) error

func (*TroubleProcessor) RunTest

func (troubleProcessor *TroubleProcessor) RunTest(result *DPNResult)

type ValidationResult

type ValidationResult struct {
	// TarFilePath is the path to the tarred bag we'll be validating.
	TarFilePath string

	// UntarredPath is the path to the untarred version of this bag.
	UntarredPath string

	// The NSQ message we're currently working on. This will be nil
	// outside of production. In production, we need to touch the
	// message periodically to keep it from timing out, especially
	// on very large bags.
	NsqMessage *nsq.Message `json:"-"`

	// TagManifestChecksum is the sha256 digest (calculated with a nonce)
	// that we need to send back to the originating node as a receipt
	// when we're fulfilling replication requests. Outside of fulfilling
	// replication requests, we don't need to even calculate this value.
	TagManifestChecksum string

	// ErrorMessages contains a list of everything that's wrong with the
	// bag. If this list is empty, the bag is valid.
	ErrorMessages []string

	// Warning messages about non-fatal issues we might want to look into.
	Warnings []string
}

ValidationResult stores information about whether a DPN bag is valid.

func NewValidationResult

func NewValidationResult(pathToFile string, nsqMessage *nsq.Message) (*ValidationResult, error)

func (*ValidationResult) AddError

func (validator *ValidationResult) AddError(message string)

AddError adds a message to the list of validation errors.

func (*ValidationResult) AddWarning

func (validator *ValidationResult) AddWarning(message string)

AddWarning adds a message to the list of validation errors.

func (*ValidationResult) BagNameValid

func (validator *ValidationResult) BagNameValid() bool

func (*ValidationResult) CalculateTagManifestDigest

func (validator *ValidationResult) CalculateTagManifestDigest(nonce string)

func (*ValidationResult) DeleteUntarredBag

func (validator *ValidationResult) DeleteUntarredBag()

We had to untar the bag to validate it, but once validation is done, all we need is the tarred bag, which we'll send to storage. Delete the untarred dir if we're not in test mode. We know we're in test mode if there's no validator.NsqMessage.

func (*ValidationResult) IsValid

func (validator *ValidationResult) IsValid() bool

IsValid() returns true if the bag is valid.

func (*ValidationResult) PathToFileInBag

func (validator *ValidationResult) PathToFileInBag(relativePath string) string

PathToFileInBag returns the path the to file within a bag. If your bag is untarred to /mnt/data/my_bag and you call this function with param 'dpn-tags/dpn-info.txt', you'll get /mnt/data/my_bag/dpn-tags/dpn-info.txt

func (*ValidationResult) ValidateBag

func (validator *ValidationResult) ValidateBag()

Run all validation checks on the bag.

type Validator

type Validator struct {
	ValidationChannel  chan *DPNResult
	PostProcessChannel chan *DPNResult
	ProcUtil           *bagman.ProcessUtil
	DPNConfig          *DPNConfig
	LocalRESTClient    *DPNRestClient
	// WaitGroup is for running local tests only.
	WaitGroup sync.WaitGroup
}

func NewValidator

func NewValidator(procUtil *bagman.ProcessUtil, dpnConfig *DPNConfig) (*Validator, error)

func (*Validator) HandleMessage

func (validator *Validator) HandleMessage(message *nsq.Message) error

func (*Validator) RunTest

func (validator *Validator) RunTest(result *DPNResult)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL