workers

package
v2.1.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2019 License: Apache-2.0 Imports: 27 Imported by: 11

README

DPN Workers

DPN workers perform the work required for DPN ingest and replication. The common.go file contains shared code. All other files have a corresponding "main" file in the exchange/apps directory. The main file compiles to a standalone binary, sets up the proper context, instantiates a worker, and then runs as a service.

Because the workers run as services, and because they depend a number of external services, they require integration tests rather than unit tests. The integration tests for workers are in exchange/integration, and they are set up and run by the Ruby tests scripts in the exchange/scripts directory.

Documentation

Index

Constants

View Source
const DAYS_TO_KEEP_IN_S3 = 3

Keep the files in S3 up to X days, in case we're having system problems and we need to attempt the restore multiple times. We'll have other processes clean out the S3 bucket when necessary.

View Source
const HOURS_BETWEEN_CHECKS = 2

After a Glacier restore request has been accepted, we will check S3 periodically to see if the item has been restored. This is the interval between checks.

View Source
const MAX_FIXITY_CHECKS_PER_RUN = 20

As we're still in the probationary period, limit the number fixity checks queued on each run.

View Source
const MINUTES_BETWEEN_RETRIES = 3

If S3 download fails with a non-fatal error, how many minutes should we wait before trying again?

View Source
const RETRIEVAL_OPTION = "Bulk"

Standard retrieval is 3-5 hours. Bulk is 5-12 hours, and is cheaper. There's no rush on DPN fixity checking, so use the cheaper option. https://docs.aws.amazon.com/amazonglacier/latest/dev/downloading-an-archive-two-steps.html#api-downloading-an-archive-two-steps-retrieval-options For retrieval pricing, see https://aws.amazon.com/glacier/pricing/

View Source
const SYNC_BATCH_SIZE = 50

SYNC_BATCH_SIZE describes how many records should request per page from remote nodes when we're synching bags, replication requests, etc.

Variables

View Source
var DO_NOT_REPLICATE_TO = map[string]string{

	"7277cbab-d539-4a81-ac1e-70cefc28fb2e": "hathi",
}

DO_NOT_REPLICATE_TO describes which nodes we should NOT replicate member content to. The key is the member uuid, and the value is the node to avoid.

Functions

func EnsureItemIsMarkedCancelled

func EnsureItemIsMarkedCancelled(_context *context.Context, manifest *models.ReplicationManifest)

EnsureItemIsMarkedCancelled makes sure a stored replication is marked as cancelled in the DPNWorkItems table.

func EnsureItemIsMarkedComplete

func EnsureItemIsMarkedComplete(_context *context.Context, manifest *models.ReplicationManifest)

EnsureItemIsMarkedComplete makes sure a stored replication is marked as complete in the DPNWorkItems table. Normally, the workers handle this, but in some cases, a previous store attempt may have succeeded and then failed to update the DPNWorkItems table. That's usually the result of a message timeout or restarting a service.

func GetDPNBag

func GetDPNBag(dpnClient *network.DPNRestClient, manifest *models.ReplicationManifest, workSummary *apt_models.WorkSummary)

GetDPNBag gets the bag record fom the DPN REST server that describes the bag we are being asked to copy. Param _context is a context object, manifest is a ReplicationManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on copy, workSummary should be manifest.CopySummary; on validation, it should be manifest.ValidationSummary; and on store it should be manifest.StoreSummary.

func GetDPNWorkItem

func GetDPNWorkItem(_context *context.Context, manifest *models.ReplicationManifest, workSummary *apt_models.WorkSummary)

GetDPNWorkItem fetches the DPNWorkItem associated with this message and attaches it to the manifest.

Param _context is a context object, manifest is a ReplicationManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on copy, workSummary should be manifest.CopySummary; on validation, it should be manifest.ValidationSummary; and on store it should be manifest.StoreSummary.

func GetRsyncCommand

func GetRsyncCommand(copyFrom, copyTo string, useSSH bool) *exec.Cmd

GetRsyncCommand returns a command object for copying from the remote location to the local filesystem. The copy is done via rsync over ssh, and the command will capture stdout and stderr. The copyFrom param should be a valid scp target in this format:

remoteuser@remotehost:/remote/dir/bag.tar

The copyTo param should be an absolute path on a locally-accessible file system, such as:

/mnt/dpn/data/bag.tar

Using this assumes a few things:

  1. You have rsync installed.
  2. You have an ssh client installed.
  3. You have an entry in your ~/.ssh/config file specifying connection and key information for the remote host.

Usage:

command := GetRsyncCommand("aptrust@tdr:bag.tar", "/mnt/dpn/bag.tar") err := command.Run()

if err != nil {
   ... do something ...
}

-- OR --

output, err := command.CombinedOutput()

if err != nil {
   fmt.Println(err.Error())
   fmt.Println(string(output))
}

func GetWorkItem

func GetWorkItem(_context *context.Context, manifest *models.DPNIngestManifest, workSummary *apt_models.WorkSummary)

GetWorkItem fetches the WorkItem associated with this message and attaches it to the manifest.

Param _context is a context object, manifest is an IngestManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on package, workSummary should be manifest.PackageSummary; on store it should be manifest.StoreSummary, and on record, it should be manifest.RecordSummary.

func GetWorkItemState

func GetWorkItemState(_context *context.Context, manifest *models.DPNIngestManifest, workSummary *apt_models.WorkSummary)

GetWorkItemState fetches the WorkItemState associated with this message and attaches it to the manifest.

Param _context is a context object, manifest is an IngestManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on package, workSummary should be manifest.PackageSummary; on store it should be manifest.StoreSummary, and on record, it should be manifest.RecordSummary.

func GetXferRequest

func GetXferRequest(dpnClient *network.DPNRestClient, manifest *models.ReplicationManifest, workSummary *apt_models.WorkSummary)

GetXferRequest gets the ReplicationTransfer request from the DPN REST server that describes the replication we're about to perform. Param _context is a context object, manifest is a ReplicationManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on copy, workSummary should be manifest.CopySummary; on validation, it should be manifest.ValidationSummary; and on store it should be manifest.StoreSummary.

func LoadDPNBagValidationConfig

func LoadDPNBagValidationConfig(_context *context.Context) *validation.BagValidationConfig

LoadBagValidationConfig loads the bag validation config file specified in the general config options. This will die if the bag validation config cannot be loaded or is invalid.

func LogIngestJson

func LogIngestJson(manifest *models.DPNIngestManifest, jsonLog *log.Logger)

LogIntestJson dumps the WorkItemState.State into the JSON log, surrounded by markers that make it easy to find. This log gets big.

func LogReplicationJson

func LogReplicationJson(manifest *models.ReplicationManifest, jsonLog *log.Logger)

LogReplicationJson dumps the WorkItemState.State into the JSON log, surrounded by markers that make it easy to find. This log gets big.

func PushToQueue

func PushToQueue(_context *context.Context, manifest *models.DPNIngestManifest, activeSummary *apt_models.WorkSummary, queueTopic string)

PushToQueue pushes a WorkItem into the specified NSQ topic.

func ReserveSpaceOnVolume

func ReserveSpaceOnVolume(_context *context.Context, manifest *models.ReplicationManifest) bool

reserveSpaceOnVolume does just what it says. Make sure we have space to copy this item from the remote node. We will be validating this bag in a later step without untarring it, so we just have to reserve enough room for the tar file.

func SaveDPNWorkItemState

func SaveDPNWorkItemState(_context *context.Context, manifest *models.ReplicationManifest, workSummary *apt_models.WorkSummary)

SaveDPNWorkItemState saves the manifest.DPNWorkItem to Pharos, after it's State property to a JSON serialization of the manifest.

func SaveWorkItem

func SaveWorkItem(_context *context.Context, manifest *models.DPNIngestManifest, workSummary *apt_models.WorkSummary)

SaveWorkItem saves the WorkItem in the manifest to Pharos. Param workSummary should be the WorkSummary from the manifest for the current stage of processing.

func SaveWorkItemState

func SaveWorkItemState(_context *context.Context, manifest *models.DPNIngestManifest, activeSummary *apt_models.WorkSummary)

SaveWorkItemState sends a copy of this processes' WorkItemState back to Pharos. It also dumps the ingest manifest to the JSON log.

Param activeSummary will change, depending on what stage of processing we're in. It could be the DPNIngestState.PackageSummary, DPNIngestState.StoreSummary, etc.

func SetupIngestManifest

func SetupIngestManifest(message *nsq.Message, stage string, _context *context.Context, includeIntelObj bool) *models.DPNIngestManifest

SetupIngestManifest loads the existing DPNIngestManifest associated with the NSQ message, or creates a new one if necessary. Param message should be the NSQ message we're working on. Param stage should be one of "package", "store" or "record". Param _context is the context of the worker calling this fuction. The caller should check for errors in the manifest's Package, Store or Record summary (whichever is the current stage) before proceeding. If param includeIntelObj is true, this will load the IntellectualObject record from Pharos, which can be an expensive operation. The IntellectualObject is required for dpn_packager, and is not used at all in dpn_ingest_store or dpn_record.

func SetupReplicationManifest

func SetupReplicationManifest(message *nsq.Message, stage string, _context *context.Context, localClient *network.DPNRestClient, remoteClients map[string]*network.DPNRestClient) *models.ReplicationManifest

SetupReplicationManifest loads the existing ReplicationManifest associated with the NSQ message, or creates a new one if necessary. Param message should be the NSQ message we're working on. Param stage should be one of "copy", "validate" or "store". Param _context is the context of the worker calling this fuction.

func UpdateReplicationTransfer

func UpdateReplicationTransfer(_context *context.Context, remoteClient *network.DPNRestClient, manifest *models.ReplicationManifest)

UpdateReplicationTransfer updates manifest.ReplicationTransfer at the remote DPN node that remoteClient is connected to. That must be the FromNode of the ReplicationTransfer.

Types

type DPNCopier

type DPNCopier struct {
	CopyChannel        chan *models.ReplicationManifest
	ChecksumChannel    chan *models.ReplicationManifest
	PostProcessChannel chan *models.ReplicationManifest
	Context            *context.Context
	LocalClient        *network.DPNRestClient
	RemoteClients      map[string]*network.DPNRestClient
}

DPNCopier copies tarred bags from other nodes via rsync. This is used when replicating content from other nodes. For putting together DPN bags from APTrust files, see fetcher.go.

func NewDPNCopier

func NewDPNCopier(_context *context.Context) (*DPNCopier, error)

NewDPNCopier returns a new DPNCopier object.

func (*DPNCopier) HandleMessage

func (copier *DPNCopier) HandleMessage(message *nsq.Message) error

HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.

type DPNFixityChecker

type DPNFixityChecker struct {
	Context            *context.Context
	LocalDPNRestClient *network.DPNRestClient
	ValidationChannel  chan *DPNRestoreHelper
	RecordChannel      chan *DPNRestoreHelper
	CleanupChannel     chan *DPNRestoreHelper
	// PreTestChannel is used in testing only to set some properties
	// on the helper/manifest. PreTestChannel should push directly into
	// the ValidationChannel.
	PreTestChannel chan *DPNRestoreHelper
	// PostTestChannel is for testing only. It allows us to inspect the
	// state of our helper and manifest when processing completes.
	PostTestChannel     chan *DPNRestoreHelper
	BagValidationConfig *validation.BagValidationConfig
}

func NewDPNFixityChecker

func NewDPNFixityChecker(_context *context.Context) (*DPNFixityChecker, error)

NewDPNFixityChecker creates a new DPNFixityChecker.

func (*DPNFixityChecker) FinishWithError

func (checker *DPNFixityChecker) FinishWithError(helper *DPNRestoreHelper)

func (*DPNFixityChecker) FinishWithSuccess

func (checker *DPNFixityChecker) FinishWithSuccess(helper *DPNRestoreHelper)

func (*DPNFixityChecker) HandleMessage

func (checker *DPNFixityChecker) HandleMessage(message *nsq.Message) error

HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.

func (*DPNFixityChecker) SaveFixityRecord

func (checker *DPNFixityChecker) SaveFixityRecord(helper *DPNRestoreHelper)

Save the fixity record to the local DPN REST server.

func (*DPNFixityChecker) ValidateBag

func (checker *DPNFixityChecker) ValidateBag(helper *DPNRestoreHelper)

DPN fixity check requires us to validate the entire bag and extract the sha256 checksum of tagmanifest-sha256.txt file. If the bag is valid, and the fixity value of the tag manifest matches what's in the DPN registry, the fixity check passes. DPN currently has no way of recording that a fixity check has failed, other than a human looking at the records. We can record the DPNWorkItem as failed in Pharos.

type DPNGlacierRestoreInit

type DPNGlacierRestoreInit struct {
	// Context includes logging, config, network connections, and
	// other general resources for the worker.
	Context *context.Context
	// LocalDPNRestClient lets us talk to our local DPN server.
	LocalDPNRestClient *dpn_network.DPNRestClient
	// RequestChannel is for requesting an item be moved from Glacier
	// into S3.
	RequestChannel chan *DPNRestoreHelper
	// CleanupChannel is for housekeeping, like updating NSQ.
	CleanupChannel chan *DPNRestoreHelper
	// PostTestChannel is for testing only. In production, nothing listens
	// on this channel.
	PostTestChannel chan *DPNRestoreHelper
	// S3Url is a custom URL that the S3 client should connect to.
	// We use this only in testing, when we want the client to talk
	// to a local test server. This should not be set in demo or
	// production.
	S3Url string
}

Requests that an object be restored from Glacier to S3. This is the first step toward performing fixity checks on DPN bags, and restoring DPN bags, all of which are stored in Glacier.

func DPNNewGlacierRestoreInit

func DPNNewGlacierRestoreInit(_context *context.Context) (*DPNGlacierRestoreInit, error)

func (*DPNGlacierRestoreInit) Cleanup

func (restorer *DPNGlacierRestoreInit) Cleanup()

func (*DPNGlacierRestoreInit) FinishWithError

func (restorer *DPNGlacierRestoreInit) FinishWithError(helper *DPNRestoreHelper)

func (*DPNGlacierRestoreInit) FinishWithSuccess

func (restorer *DPNGlacierRestoreInit) FinishWithSuccess(helper *DPNRestoreHelper)

func (*DPNGlacierRestoreInit) HandleMessage

func (restorer *DPNGlacierRestoreInit) HandleMessage(message *nsq.Message) error

This is the callback that NSQ workers use to handle messages from NSQ.

func (*DPNGlacierRestoreInit) InitializeRetrieval

func (restorer *DPNGlacierRestoreInit) InitializeRetrieval(helper *DPNRestoreHelper)

func (*DPNGlacierRestoreInit) RequestRestore

func (restorer *DPNGlacierRestoreInit) RequestRestore()

func (*DPNGlacierRestoreInit) RestoreRequestNeeded

func (restorer *DPNGlacierRestoreInit) RestoreRequestNeeded(helper *DPNRestoreHelper) (bool, error)

func (*DPNGlacierRestoreInit) SendToDownloadQueue

func (restorer *DPNGlacierRestoreInit) SendToDownloadQueue(helper *DPNRestoreHelper)

type DPNIngestRecorder

type DPNIngestRecorder struct {
	RecordChannel      chan *models.DPNIngestManifest
	PostProcessChannel chan *models.DPNIngestManifest
	Context            *context.Context
	LocalClient        *network.DPNRestClient
	RemoteClients      map[string]*network.DPNRestClient
}

DPNIngestRecorder records information about locally-ingested DPN bags in both APTrust and DPN.

func NewDPNIngestRecorder

func NewDPNIngestRecorder(_context *context.Context) (*DPNIngestRecorder, error)

NewDPNIngestRecord returns a new DPNIngestRecorder.

func (*DPNIngestRecorder) HandleMessage

func (recorder *DPNIngestRecorder) HandleMessage(message *nsq.Message) error

HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.

type DPNIngestStorer

type DPNIngestStorer struct {
	StoreChannel       chan *models.DPNIngestManifest
	PostProcessChannel chan *models.DPNIngestManifest
	Context            *context.Context
	LocalClient        *network.DPNRestClient
	RemoteClients      map[string]*network.DPNRestClient
}

DPNIngestStorer copies bags ingested from APTrust into Glacier long-term storage.

func NewDPNIngestStorer

func NewDPNIngestStorer(_context *context.Context) (*DPNIngestStorer, error)

NewDPNIngestStorer returns a new DPNIngestStorer object.

func (*DPNIngestStorer) HandleMessage

func (storer *DPNIngestStorer) HandleMessage(message *nsq.Message) error

HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.

type DPNPackager

type DPNPackager struct {
	PackageChannel      chan *models.DPNIngestManifest
	TarChannel          chan *models.DPNIngestManifest
	ValidationChannel   chan *models.DPNIngestManifest
	PostProcessChannel  chan *models.DPNIngestManifest
	BagValidationConfig *validation.BagValidationConfig
	Context             *context.Context
	LocalClient         *network.DPNRestClient
	RemoteClients       map[string]*network.DPNRestClient
}

DPNPackager repackages APTrust bags as DPN bags so they can be copied into DPN.

func NewDPNPackager

func NewDPNPackager(_context *context.Context) (*DPNPackager, error)

NewDPNPackager creates a new DPNPackager.

func (*DPNPackager) HandleMessage

func (packager *DPNPackager) HandleMessage(message *nsq.Message) error

HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.

type DPNQueue

type DPNQueue struct {
	// LocalClient is the DPN REST client that talks to our own
	// local DPN REST server.
	LocalClient *network.DPNRestClient
	// RemoteNodes is a map of remote nodes. Key is the namespace
	// and value is the node.
	RemoteNodes map[string]*models.Node
	// RemoteClients is a collection of clients that talk to the
	// DPN REST servers on other nodes. The key is the namespace
	// of the remote node, and the value is the client that talks
	// to that node.
	RemoteClients map[string]*network.DPNRestClient
	// Context provides access to information about our environment
	// and config settings, and access to basic services like
	// logging and a Pharos client.
	Context *context.Context
	// ExamineItemsSince is a timestamp. We will examine any items
	// updated since this timestamp to see if they need to be queued.
	ExamineItemsSince time.Time
	// QueueResult contains information about which items were
	// queued during this run of the program.
	QueueResult *models.QueueResult
}

DPNQueue queues DPN ingest requests (found in the Pharos WorkItems table) and DPN replication requests (found in the Pharos DPNWorkItems table). These items will go into the proper NSQ topics for DPN ingest or replication.

func NewDPNQueue

func NewDPNQueue(_context *context.Context, hours int) (*DPNQueue, error)

NewDPNQueue creates a new DPNQueue object. Param _context is a Context object, and param hours tells the code to examine all Replication, Restore and DPN Ingest requests from the past N hours.

func (*DPNQueue) Run

func (dpnQueue *DPNQueue) Run()

Run checks for ReplicationTransfers, RestoreTransfers and IngestRequests that need to be queued. It creates DPNWorkItems and NSQ entries for each request that needs to be queued.

type DPNReplicationStorer

type DPNReplicationStorer struct {
	StoreChannel       chan *models.ReplicationManifest
	PostProcessChannel chan *models.ReplicationManifest
	Context            *context.Context
	LocalClient        *network.DPNRestClient
	RemoteClients      map[string]*network.DPNRestClient
}

DPNReplicationStorer copies replicated bags from our staging area to Glacier long-term storage. We only copy bags that have been validated.

func NewDPNReplicationStorer

func NewDPNReplicationStorer(_context *context.Context) (*DPNReplicationStorer, error)

NewDPNReplicationStorer creates a new DPNReplicationStorer object.

func (*DPNReplicationStorer) HandleMessage

func (storer *DPNReplicationStorer) HandleMessage(message *nsq.Message) error

HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.

type DPNRestoreHelper

type DPNRestoreHelper struct {
	Manifest    *models.DPNRetrievalManifest
	WorkSummary *apt_models.WorkSummary
	// contains filtered or unexported fields
}

func NewDPNRestoreHelper

func NewDPNRestoreHelper(message *nsq.Message, _context *context.Context, dpnRestClient *network.DPNRestClient, action, summaryName string) (*DPNRestoreHelper, error)

func (*DPNRestoreHelper) FileExistsAndIsComplete

func (helper *DPNRestoreHelper) FileExistsAndIsComplete() bool

func (*DPNRestoreHelper) SaveDPNWorkItem

func (helper *DPNRestoreHelper) SaveDPNWorkItem()

type DPNS3Retriever

type DPNS3Retriever struct {
	// Context includes logging, config, network connections, and
	// other general resources for the worker.
	Context *context.Context
	// LocalDPNRestClient lets us talk to our local DPN server.
	LocalDPNRestClient *dpn_network.DPNRestClient
	// FetchChannel is for fetching files from S3.
	FetchChannel chan *DPNRestoreHelper
	// CleanupChannel is for post-fetch processing.
	CleanupChannel chan *DPNRestoreHelper
	// PostTestChannel is for testing only. In production, nothing listens
	// on this channel.
	PostTestChannel chan *DPNRestoreHelper
}

Fetches from S3 to local storage.

func NewDPNS3Retriever

func NewDPNS3Retriever(_context *context.Context) (*DPNS3Retriever, error)

func (*DPNS3Retriever) DownloadFile

func (fetcher *DPNS3Retriever) DownloadFile(helper *DPNRestoreHelper)

func (*DPNS3Retriever) FinishWithError

func (fetcher *DPNS3Retriever) FinishWithError(helper *DPNRestoreHelper)

func (*DPNS3Retriever) FinishWithSuccess

func (fetcher *DPNS3Retriever) FinishWithSuccess(helper *DPNRestoreHelper)

func (*DPNS3Retriever) HandleMessage

func (retriever *DPNS3Retriever) HandleMessage(message *nsq.Message) error

This is the callback that NSQ workers use to handle messages from NSQ.

func (*DPNS3Retriever) SendToFixityQueue

func (fetcher *DPNS3Retriever) SendToFixityQueue(helper *DPNRestoreHelper)

type DPNSync

type DPNSync struct {
	// LocalClient is the DPN REST client that talks to our own
	// local DPN REST server.
	LocalClient *network.DPNRestClient
	// RemoteNodes is a map of remote nodes. Key is the namespace
	// and value is the node.
	RemoteNodes map[string]*models.Node
	// RemoteClients is a collection of clients that talk to the
	// DPN REST servers on other nodes. The key is the namespace
	// of the remote node, and the value is the client that talks
	// to that node.
	RemoteClients map[string]*network.DPNRestClient
	// Context provides access to information about our environment
	// and config settings, and access to basic services like
	// logging and a Pharos client.
	Context *context.Context
	// Results contains information about the results of the sync
	// operations with each node. Key is the node namespace,
	// value is the SyncResult object for that node.
	Results map[string]*models.SyncResult
}

DPNSync copies data from remote DPN nodes to our local DPN node. Data includes information about bags, replication transfers, etc. Each node is the authority on bags where they are listed as the admin node, so when synching from DPN node X, we ask for all bags where X is the admin node, as well as all ReplicationTransfers, RestoreTransfers, FixityChecks, etc. We do NOT ask node X for info about or related to bags whose admin node is Y or Z.

func NewDPNSync

func NewDPNSync(_context *context.Context) (*DPNSync, error)

NewDPNSync creates a new DPNSync object.

func (*DPNSync) GetAllNodes

func (dpnSync *DPNSync) GetAllNodes() ([]*models.Node, error)

GetAllNodes returns a list of all the nodes that our node knows about.

func (*DPNSync) LocalNodeName

func (dpnSync *DPNSync) LocalNodeName() string

LocalNodeName returns the namespace of our local DPN node.

func (*DPNSync) RemoteNodeNames

func (dpnSync *DPNSync) RemoteNodeNames() []string

RemoteNodeNames returns the namespaces of all known remote DPN nodes.

func (*DPNSync) Run

func (dpnSync *DPNSync) Run() bool

Run runs all sync operations against all nodes. This is the only function your cron job needs to call. The boolean return value will be true if all sync operations completed without error, false otherwise. For errors, check the log.

func (*DPNSync) SyncBags

func (dpnSync *DPNSync) SyncBags(node *models.Node)

SyncBags syncs bags from the specified node to our own local DPN registry if the bags match these critieria:

1. The node we are querying is the admin node for the bag. 2. The bag was updated since the last time we queried the node.

Returns a list of the bags that were successfully updated. Even on error, this may still return a list with whatever bags were updated before the error occurred.

func (*DPNSync) SyncDigests

func (dpnSync *DPNSync) SyncDigests(remoteNode *models.Node)

func (*DPNSync) SyncEverythingFromNode

func (dpnSync *DPNSync) SyncEverythingFromNode(remoteNode *models.Node)

SyncEverythingFromNode syncs all bags, replication requests and restore requests from the specified remote node. Note that this is a pull-only sync.We are not writing any data to other nodes, just reading what they have and updating our own registry with their info.

func (*DPNSync) SyncFixities

func (dpnSync *DPNSync) SyncFixities(remoteNode *models.Node)

func (*DPNSync) SyncIngests

func (dpnSync *DPNSync) SyncIngests(bag *models.DPNBag)

func (*DPNSync) SyncMembers

func (dpnSync *DPNSync) SyncMembers(remoteNode *models.Node)

SyncMembers copies remote member records to our own node. This does not update existing records, it only creates new ones.

func (*DPNSync) SyncNode

func (dpnSync *DPNSync) SyncNode(remoteNode *models.Node)

SyncNode copies the latest node record from the node itself to our DPN registry. E.g. It copies the SDR record from SDR to us, but only if the remote record is newer.

func (*DPNSync) SyncReplicationRequests

func (dpnSync *DPNSync) SyncReplicationRequests(remoteNode *models.Node)

SyncReplicationRequests copies ReplicationTransfer records from remote nodes to our own local node.

func (*DPNSync) SyncRestoreRequests

func (dpnSync *DPNSync) SyncRestoreRequests(remoteNode *models.Node)

SyncRestoreRequests copies RestoreTransfer records from remote nodes to our local node.

type DPNValidator

type DPNValidator struct {
	ValidationChannel   chan *dpn_models.ReplicationManifest
	PostProcessChannel  chan *dpn_models.ReplicationManifest
	BagValidationConfig *validation.BagValidationConfig
	Context             *context.Context
	LocalClient         *network.DPNRestClient
	RemoteClients       map[string]*network.DPNRestClient
}

DPNValidator validates DPN bags (tar files) before we send them off to long-term storage.

func NewDPNValidator

func NewDPNValidator(_context *context.Context) (*DPNValidator, error)

NewDPNValidator returns a new DPNValidator object.

func (*DPNValidator) HandleMessage

func (validator *DPNValidator) HandleMessage(message *nsq.Message) error

HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL