channels

package
v2.0.0-rc7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2023 License: Apache-2.0, MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ChannelEvents = fsm.Events{

	fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.Accept).
		From(datatransfer.Requested).To(datatransfer.Queued).
		From(datatransfer.AwaitingAcceptance).To(datatransfer.Ongoing).
		Action(func(chst *internal.ChannelState) error {
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.TransferInitiated).
		From(datatransfer.Requested).To(datatransfer.AwaitingAcceptance).
		From(datatransfer.Queued).To(datatransfer.Ongoing).
		From(datatransfer.Ongoing).ToJustRecord().
		Action(func(chst *internal.ChannelState) error {
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.Restart).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.Message = ""
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.Opened).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.Message = ""
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.DataReceived).FromAny().ToNoChange().
		Action(func(chst *internal.ChannelState, rcvdBlocksTotal int64) error {
			if rcvdBlocksTotal > chst.ReceivedBlocksTotal {
				chst.ReceivedBlocksTotal = rcvdBlocksTotal
			}
			chst.AddLog("")
			return nil
		}),
	fsm.Event(datatransfer.DataReceivedProgress).FromMany(datatransfer.TransferringStates.AsFSMStates()...).ToNoChange().
		Action(func(chst *internal.ChannelState, delta uint64) error {
			chst.Received += delta
			chst.AddLog("received data")
			return nil
		}),

	fsm.Event(datatransfer.DataSent).
		FromMany(datatransfer.TransferringStates.AsFSMStates()...).ToNoChange().
		From(datatransfer.TransferFinished).ToNoChange().
		Action(func(chst *internal.ChannelState, sentBlocksTotal int64) error {
			if sentBlocksTotal > chst.SentBlocksTotal {
				chst.SentBlocksTotal = sentBlocksTotal
			}
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.DataSentProgress).FromMany(datatransfer.TransferringStates.AsFSMStates()...).ToNoChange().
		Action(func(chst *internal.ChannelState, delta uint64) error {
			chst.Sent += delta
			chst.AddLog("sending data")
			return nil
		}),

	fsm.Event(datatransfer.DataQueued).
		FromMany(datatransfer.TransferringStates.AsFSMStates()...).ToNoChange().
		From(datatransfer.TransferFinished).ToNoChange().
		Action(func(chst *internal.ChannelState, queuedBlocksTotal int64) error {
			if queuedBlocksTotal > chst.QueuedBlocksTotal {
				chst.QueuedBlocksTotal = queuedBlocksTotal
			}
			chst.AddLog("")
			return nil
		}),
	fsm.Event(datatransfer.DataQueuedProgress).FromMany(datatransfer.TransferringStates.AsFSMStates()...).ToNoChange().
		Action(func(chst *internal.ChannelState, delta uint64) error {
			chst.Queued += delta
			chst.AddLog("")
			return nil
		}),
	fsm.Event(datatransfer.SetDataLimit).FromAny().ToJustRecord().
		Action(func(chst *internal.ChannelState, dataLimit uint64) error {
			chst.DataLimit = dataLimit
			chst.AddLog("")
			return nil
		}),
	fsm.Event(datatransfer.SetRequiresFinalization).FromAny().ToJustRecord().
		Action(func(chst *internal.ChannelState, RequiresFinalization bool) error {
			chst.RequiresFinalization = RequiresFinalization
			chst.AddLog("")
			return nil
		}),
	fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer disconnected: %s", chst.Message)
		return nil
	}),
	fsm.Event(datatransfer.SendDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer send error: %s", chst.Message)
		return nil
	}),
	fsm.Event(datatransfer.ReceiveDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer receive error: %s", chst.Message)
		return nil
	}),
	fsm.Event(datatransfer.RequestCancelled).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer request cancelled: %s", chst.Message)
		return nil
	}),
	fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer erred: %s", chst.Message)
		return nil
	}),

	fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
		Action(func(chst *internal.ChannelState, voucher datatransfer.TypedVoucher) error {
			chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: voucher.Type, Voucher: internal.CborGenCompatibleNode{Node: voucher.Voucher}})
			chst.AddLog("got new voucher")
			return nil
		}),

	fsm.Event(datatransfer.NewVoucherResult).FromAny().ToNoChange().
		Action(func(chst *internal.ChannelState, voucherResult datatransfer.TypedVoucher) error {
			chst.VoucherResults = append(chst.VoucherResults,
				internal.EncodedVoucherResult{Type: voucherResult.Type, VoucherResult: internal.CborGenCompatibleNode{Node: voucherResult.Voucher}})
			chst.AddLog("got new voucher result")
			return nil
		}),

	fsm.Event(datatransfer.PauseInitiator).
		FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance).ToJustRecord().
		Action(func(chst *internal.ChannelState) error {
			chst.InitiatorPaused = true
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.PauseResponder).
		FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance, datatransfer.TransferFinished).ToJustRecord().
		Action(func(chst *internal.ChannelState) error {
			chst.ResponderPaused = true
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.DataLimitExceeded).
		FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance, datatransfer.ResponderCompleted, datatransfer.ResponderFinalizing).ToJustRecord().
		Action(func(chst *internal.ChannelState) error {
			chst.ResponderPaused = true
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.ResumeInitiator).
		FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance, datatransfer.ResponderCompleted, datatransfer.ResponderFinalizing).ToJustRecord().
		Action(func(chst *internal.ChannelState) error {
			chst.InitiatorPaused = false
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.ResumeResponder).
		FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance, datatransfer.TransferFinished).ToJustRecord().
		From(datatransfer.Finalizing).To(datatransfer.Completing).
		Action(func(chst *internal.ChannelState) error {
			chst.ResponderPaused = false
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.FinishTransfer).
		FromAny().To(datatransfer.TransferFinished).
		FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
		From(datatransfer.ResponderCompleted).To(datatransfer.Completing).
		From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished).
		From(datatransfer.AwaitingAcceptance).To(datatransfer.Completing).
		Action(func(chst *internal.ChannelState) error {
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.ResponderBeginsFinalization).
		FromAny().To(datatransfer.ResponderFinalizing).
		FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
		From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished).
		FromMany(datatransfer.ResponderFinalizing, datatransfer.ResponderFinalizingTransferFinished).ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.ResponderCompletes).
		FromAny().To(datatransfer.ResponderCompleted).
		FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
		From(datatransfer.TransferFinished).To(datatransfer.Completing).
		From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.CleanupComplete).
		From(datatransfer.Cancelling).To(datatransfer.Cancelled).
		From(datatransfer.Failing).To(datatransfer.Failed).
		From(datatransfer.Completing).To(datatransfer.Completed).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),
}

ChannelEvents describe the events taht can

ChannelFinalityStates are the final states for a channel

View Source
var ChannelStateEntryFuncs = fsm.StateEntryFuncs{
	datatransfer.Cancelling: cleanupConnection,
	datatransfer.Failing:    cleanupConnection,
	datatransfer.Completing: cleanupConnection,
}

ChannelStateEntryFuncs are handlers called as we enter different states (currently unused for this fsm)

CleanupStates are the penultimate states for a channel

View Source
var EmptyChannelState = channelState{}

EmptyChannelState is the zero value for channel state, meaning not present

View Source
var ErrWrongType = errors.New("Cannot change type of implementation specific data after setting it")

ErrWrongType is returned when a caller attempts to change the type of implementation data after setting it

Functions

func IsChannelCleaningUp

func IsChannelCleaningUp(st datatransfer.Status) bool

IsChannelCleaningUp returns true if channel was being cleaned up and finished

func IsChannelTerminated

func IsChannelTerminated(st datatransfer.Status) bool

IsChannelTerminated returns true if the channel is in a finality state

func NewErrNotFound

func NewErrNotFound(chid datatransfer.ChannelID) error

Types

type ChannelEnvironment

type ChannelEnvironment interface {
	Protect(id peer.ID, tag string)
	Unprotect(id peer.ID, tag string) bool
	ID() peer.ID
	CleanupChannel(chid datatransfer.ChannelID)
}

ChannelEnvironment -- just a proxy for DTNetwork for now

type Channels

type Channels struct {
	// contains filtered or unexported fields
}

Channels is a thread safe list of channels

func New

func New(ds datastore.Batching,
	notifier Notifier,
	env ChannelEnvironment,
	selfPeer peer.ID) (*Channels, error)

New returns a new thread safe list of channels

func (*Channels) Accept

func (c *Channels) Accept(chid datatransfer.ChannelID) error

Accept marks a data transfer as accepted

func (*Channels) BeginFinalizing

func (c *Channels) BeginFinalizing(chid datatransfer.ChannelID) error

BeginFinalizing indicates a responder has finished processing but is awaiting confirmation from the initiator

func (*Channels) Cancel

func (c *Channels) Cancel(chid datatransfer.ChannelID) error

Cancel indicates a channel was cancelled prematurely

func (*Channels) ChannelOpened

func (c *Channels) ChannelOpened(chid datatransfer.ChannelID) error

func (*Channels) Complete

func (c *Channels) Complete(chid datatransfer.ChannelID) error

Complete indicates responder has completed sending/receiving data

func (*Channels) CompleteCleanupOnRestart

func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error

func (*Channels) CreateNew

func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector datamodel.Node, voucher datatransfer.TypedVoucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error)

CreateNew creates a new channel id and channel state and saves to channels. returns error if the channel exists already.

func (*Channels) DataQueued

func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) error

func (*Channels) DataReceived

func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) error

Returns true if this is the first time the block has been received

func (*Channels) DataSent

func (c *Channels) DataSent(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) error

func (*Channels) Disconnected

func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error

Disconnected indicates that the connection went down and it was not possible to restart it

func (*Channels) Error

func (c *Channels) Error(chid datatransfer.ChannelID, err error) error

Error indicates something that went wrong on a channel

func (*Channels) FinishTransfer

func (c *Channels) FinishTransfer(chid datatransfer.ChannelID) error

FinishTransfer an initiator has finished sending/receiving data

func (*Channels) GetByID

GetByID searches for a channel in the slice of channels with id `chid`. Returns datatransfer.EmptyChannelState if there is no channel with that id

func (*Channels) HasChannel

func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error)

HasChannel returns true if the given channel id is being tracked

func (*Channels) InProgress

InProgress returns a list of in progress channels

func (*Channels) NewVoucher

func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.TypedVoucher) error

NewVoucher records a new voucher for this channel

func (*Channels) NewVoucherResult

func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.TypedVoucher) error

NewVoucherResult records a new voucher result for this channel

func (*Channels) Open

func (c *Channels) Open(chid datatransfer.ChannelID) error

func (*Channels) PauseInitiator

func (c *Channels) PauseInitiator(chid datatransfer.ChannelID) error

PauseInitiator pauses the initator of this channel

func (*Channels) PauseResponder

func (c *Channels) PauseResponder(chid datatransfer.ChannelID) error

PauseResponder pauses the responder of this channel

func (*Channels) ReceiveDataError

func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) error

ReceiveDataError indicates that the transport layer had an error receiving data from the remote peer

func (*Channels) RequestCancelled

func (c *Channels) RequestCancelled(chid datatransfer.ChannelID, err error) error

RequestCancelled indicates that a transport layer request was cancelled by the request opener

func (*Channels) ResponderBeginsFinalization

func (c *Channels) ResponderBeginsFinalization(chid datatransfer.ChannelID) error

ResponderBeginsFinalization indicates a responder has finished processing but is awaiting confirmation from the initiator

func (*Channels) ResponderCompletes

func (c *Channels) ResponderCompletes(chid datatransfer.ChannelID) error

ResponderCompletes indicates an initator has finished receiving data from a responder

func (*Channels) Restart

func (c *Channels) Restart(chid datatransfer.ChannelID) error

Restart marks a data transfer as restarted

func (*Channels) ResumeInitiator

func (c *Channels) ResumeInitiator(chid datatransfer.ChannelID) error

ResumeInitiator resumes the initator of this channel

func (*Channels) ResumeResponder

func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error

ResumeResponder resumes the responder of this channel

func (*Channels) SendDataError

func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error

SendDataError indicates that the transport layer had an error trying to send data to the remote peer

func (*Channels) SetDataLimit

func (c *Channels) SetDataLimit(chid datatransfer.ChannelID, dataLimit uint64) error

SetDataLimit means a data limit has been set on this channel

func (*Channels) SetRequiresFinalization

func (c *Channels) SetRequiresFinalization(chid datatransfer.ChannelID, RequiresFinalization bool) error

SetRequiresFinalization sets the state of whether a data transfer can complete

func (*Channels) Start

func (c *Channels) Start(ctx context.Context) error

Start migrates the channel data store as needed

func (*Channels) Stop

func (c *Channels) Stop(ctx context.Context) error

Stop stops the channel statemachine

func (*Channels) TransferInitiated

func (c *Channels) TransferInitiated(chid datatransfer.ChannelID) error

type ErrNotFound

type ErrNotFound struct {
	ChannelID datatransfer.ChannelID
}

ErrNotFound is returned when a channel cannot be found with a given channel ID

func (*ErrNotFound) Error

func (e *ErrNotFound) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL