channels

package
v1.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2021 License: Apache-2.0, MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ChannelEvents = fsm.Events{
	fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
	fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),
	fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
		chst.Message = ""
		return nil
	}),

	fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),

	fsm.Event(datatransfer.DataReceived).FromMany(
		datatransfer.Requested,
		datatransfer.Ongoing,
		datatransfer.InitiatorPaused,
		datatransfer.ResponderPaused,
		datatransfer.BothPaused,
		datatransfer.ResponderCompleted,
		datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
		chst.Received += delta
		return nil
	}),

	fsm.Event(datatransfer.DataSent).FromMany(
		datatransfer.Requested,
		datatransfer.Ongoing,
		datatransfer.InitiatorPaused,
		datatransfer.ResponderPaused,
		datatransfer.BothPaused,
		datatransfer.ResponderCompleted,
		datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
		chst.Sent += delta
		return nil
	}),
	fsm.Event(datatransfer.DataQueued).FromMany(
		datatransfer.Requested,
		datatransfer.Ongoing,
		datatransfer.InitiatorPaused,
		datatransfer.ResponderPaused,
		datatransfer.BothPaused,
		datatransfer.ResponderCompleted,
		datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
		chst.Queued += delta
		return nil
	}),
	fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
		chst.Message = datatransfer.ErrDisconnected.Error()
		return nil
	}),

	fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		return nil
	}),
	fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
		Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error {
			chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}})
			return nil
		}),
	fsm.Event(datatransfer.NewVoucherResult).FromAny().ToNoChange().
		Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherResultBytes []byte) error {
			chst.VoucherResults = append(chst.VoucherResults,
				internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}})
			return nil
		}),
	fsm.Event(datatransfer.PauseInitiator).
		FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.InitiatorPaused).
		From(datatransfer.ResponderPaused).To(datatransfer.BothPaused).
		FromAny().ToJustRecord(),
	fsm.Event(datatransfer.PauseResponder).
		FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.ResponderPaused).
		From(datatransfer.InitiatorPaused).To(datatransfer.BothPaused).
		FromAny().ToJustRecord(),
	fsm.Event(datatransfer.ResumeInitiator).
		From(datatransfer.InitiatorPaused).To(datatransfer.Ongoing).
		From(datatransfer.BothPaused).To(datatransfer.ResponderPaused).
		FromAny().ToJustRecord(),
	fsm.Event(datatransfer.ResumeResponder).
		From(datatransfer.ResponderPaused).To(datatransfer.Ongoing).
		From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused).
		From(datatransfer.Finalizing).To(datatransfer.Completing).
		FromAny().ToJustRecord(),
	fsm.Event(datatransfer.FinishTransfer).
		FromAny().To(datatransfer.TransferFinished).
		FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
		From(datatransfer.ResponderCompleted).To(datatransfer.Completing).
		From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished),
	fsm.Event(datatransfer.ResponderBeginsFinalization).
		FromAny().To(datatransfer.ResponderFinalizing).
		FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
		From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished),
	fsm.Event(datatransfer.ResponderCompletes).
		FromAny().To(datatransfer.ResponderCompleted).
		FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
		From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing).
		From(datatransfer.TransferFinished).To(datatransfer.Completing).
		From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
		From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing),
	fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing),
	fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing),
	fsm.Event(datatransfer.CleanupComplete).
		From(datatransfer.Cancelling).To(datatransfer.Cancelled).
		From(datatransfer.Failing).To(datatransfer.Failed).
		From(datatransfer.Completing).To(datatransfer.Completed),

	fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange(),
}

ChannelEvents describe the events taht can

ChannelFinalityStates are the final states for a channel

View Source
var ChannelStateEntryFuncs = fsm.StateEntryFuncs{
	datatransfer.Cancelling: cleanupConnection,
	datatransfer.Failing:    cleanupConnection,
	datatransfer.Completing: cleanupConnection,
}

ChannelStateEntryFuncs are handlers called as we enter different states (currently unused for this fsm)

CleanupStates are the penultimate states for a channel

View Source
var EmptyChannelState = channelState{}

EmptyChannelState is the zero value for channel state, meaning not present

View Source
var ErrWrongType = errors.New("Cannot change type of implementation specific data after setting it")

ErrWrongType is returned when a caller attempts to change the type of implementation data after setting it

Functions

func IsChannelCleaningUp

func IsChannelCleaningUp(st datatransfer.Status) bool

IsChannelCleaningUp returns true if channel was being cleaned up and finished

func IsChannelTerminated

func IsChannelTerminated(st datatransfer.Status) bool

IsChannelTerminated returns true if the channel is in a finality state

func NewErrNotFound

func NewErrNotFound(chid datatransfer.ChannelID) error

Types

type ChannelCIDsReader

type ChannelCIDsReader func(chid datatransfer.ChannelID) ([]cid.Cid, error)

type ChannelEnvironment

type ChannelEnvironment interface {
	Protect(id peer.ID, tag string)
	Unprotect(id peer.ID, tag string) bool
	ID() peer.ID
	CleanupChannel(chid datatransfer.ChannelID)
}

ChannelEnvironment -- just a proxy for DTNetwork for now

type Channels

type Channels struct {
	// contains filtered or unexported fields
}

Channels is a thread safe list of channels

func New

func New(ds datastore.Batching,
	cidLists cidlists.CIDLists,
	notifier Notifier,
	voucherDecoder DecoderByTypeFunc,
	voucherResultDecoder DecoderByTypeFunc,
	env ChannelEnvironment,
	selfPeer peer.ID) (*Channels, error)

New returns a new thread safe list of channels

func (*Channels) Accept

func (c *Channels) Accept(chid datatransfer.ChannelID) error

Accept marks a data transfer as accepted

func (*Channels) BeginFinalizing

func (c *Channels) BeginFinalizing(chid datatransfer.ChannelID) error

BeginFinalizing indicates a responder has finished processing but is awaiting confirmation from the initiator

func (*Channels) Cancel

func (c *Channels) Cancel(chid datatransfer.ChannelID) error

Cancel indicates a channel was cancelled prematurely

func (*Channels) Complete

func (c *Channels) Complete(chid datatransfer.ChannelID) error

Complete indicates responder has completed sending/receiving data

func (*Channels) CompleteCleanupOnRestart

func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error

func (*Channels) CreateNew

func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error)

CreateNew creates a new channel id and channel state and saves to channels. returns error if the channel exists already.

func (*Channels) DataQueued

func (c *Channels) DataQueued(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error

func (*Channels) DataReceived

func (c *Channels) DataReceived(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error

func (*Channels) DataSent

func (c *Channels) DataSent(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error

func (*Channels) Disconnected

func (c *Channels) Disconnected(chid datatransfer.ChannelID) error

func (*Channels) Error

func (c *Channels) Error(chid datatransfer.ChannelID, err error) error

Error indicates something that went wrong on a channel

func (*Channels) FinishTransfer

func (c *Channels) FinishTransfer(chid datatransfer.ChannelID) error

FinishTransfer an initiator has finished sending/receiving data

func (*Channels) GetByID

GetByID searches for a channel in the slice of channels with id `chid`. Returns datatransfer.EmptyChannelState if there is no channel with that id

func (*Channels) HasChannel

func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error)

HasChannel returns true if the given channel id is being tracked

func (*Channels) InProgress

InProgress returns a list of in progress channels

func (*Channels) NewVoucher

func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.Voucher) error

NewVoucher records a new voucher for this channel

func (*Channels) NewVoucherResult

func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.VoucherResult) error

NewVoucherResult records a new voucher result for this channel

func (*Channels) PauseInitiator

func (c *Channels) PauseInitiator(chid datatransfer.ChannelID) error

PauseInitiator pauses the initator of this channel

func (*Channels) PauseResponder

func (c *Channels) PauseResponder(chid datatransfer.ChannelID) error

PauseResponder pauses the responder of this channel

func (*Channels) ResponderBeginsFinalization

func (c *Channels) ResponderBeginsFinalization(chid datatransfer.ChannelID) error

ResponderBeginsFinalization indicates a responder has finished processing but is awaiting confirmation from the initiator

func (*Channels) ResponderCompletes

func (c *Channels) ResponderCompletes(chid datatransfer.ChannelID) error

ResponderCompletes indicates an initator has finished receiving data from a responder

func (*Channels) Restart

func (c *Channels) Restart(chid datatransfer.ChannelID) error

Restart marks a data transfer as restarted

func (*Channels) ResumeInitiator

func (c *Channels) ResumeInitiator(chid datatransfer.ChannelID) error

ResumeInitiator resumes the initator of this channel

func (*Channels) ResumeResponder

func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error

ResumeResponder resumes the responder of this channel

func (*Channels) Start

func (c *Channels) Start(ctx context.Context) error

Start migrates the channel data store as needed

type DecoderByTypeFunc

type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)

type ErrNotFound

type ErrNotFound struct {
	ChannelID datatransfer.ChannelID
}

ErrNotFound is returned when a channel cannot be found with a given channel ID

func (*ErrNotFound) Error

func (e *ErrNotFound) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL