sync

package
v0.0.0-...-667e438 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2019 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AccessStubHelper func(*pb.StreamStub, func(*SyncStub)) = func(*pb.StreamStub, func(*SyncStub)) {}

this help us to access the stub object from streamstub

View Source
var SyncPacketCommHelper = packageMsgHelper

Functions

func BlocSyncSimplePlan

func BlocSyncSimplePlan(l *ledger.Ledger, targetHeight uint64, targetBlock []byte) []*pb.SyncBlockRange

simple mode for the syncclient: from top, gen only one task

func CheckpointToSyncPlan

func CheckpointToSyncPlan(checkpoints map[uint64][]byte) (ret []*pb.SyncBlockRange)

build new plans from an a bunch of checkpoints, we turn n checkpoints into corresponding array of n tasks adjactent to each other, the last one of the tasks has end at 0 NOTICE: the input map of checkpoints will be clear after execution

func DefaultClientOption

func DefaultClientOption() *clientOpts

func DefaultSyncOption

func DefaultSyncOption() *syncOpt

func ExecuteSyncTask

func ExecuteSyncTask(ctx context.Context, cf ClientFactory, sstub *pb.StreamStub) error

func GenTestSyncHub

func GenTestSyncHub(ctx context.Context, l *ledger.Ledger, opt *syncOpt) *pb.SimuPeerStub

func NewBlockSyncClient

func NewBlockSyncClient(pf func(uint64, *pb.Block) error, tsk []*pb.SyncBlockRange) *sessionCliAdapter

func NewStateSyncClient

func NewStateSyncClient(ctx context.Context, syncer StateSyncer) (*sessionCliAdapter, func())

an additional cancel func is provided and should be called before the sync task is stopped

func NewStateSyncDetector

func NewStateSyncDetector(l *ledger.Ledger, stateRange int) *stateSyncDetector

func NewTxSyncClient

func NewTxSyncClient(opt *clientOpts, txids []string) *txCliFactory

func PruneSyncPlan

func PruneSyncPlan(l *ledger.Ledger, tasks []*pb.SyncBlockRange) []*pb.SyncBlockRange

pruen a plan by the data of ledgers to avoiding overhead traffic (duplicated transfer for existed blocks). this function suppose we may replay the same syncing task (ofen generated by the same group of checkpoints) mutiple times Notice tasks is updated in palace but unnecessary task is removed

func PushLedgerStatusOfStub

func PushLedgerStatusOfStub(tb testing.TB, ctx context.Context, simustub *pb.SimuPeerStub, st *pb.LedgerState)

func TotalSyncBlocks

func TotalSyncBlocks(tsks []*pb.SyncBlockRange) (val int64)

Types

type ClientFactory

type ClientFactory interface {
	Tag() string
	//opts just called at the beginning of ExecuteSyncTask so returned
	//object can be created just after being called
	Opts() *clientOpts
	PreFilter(rledger *pb.LedgerState) bool
	//notice this should provide a handling function and factory is
	//responsed to schedule tasks between its mutiple handling
	//functions
	//handling function MUST return nil to incidate current task
	//is finished and non-nil to indicate scheduler should retry
	//another peer
	AssignHandling() func(context.Context, *pb.StreamHandler, *syncCore) error
}

type FatalEnd

type FatalEnd struct {
	// contains filtered or unexported fields
}

type ForceAck

type ForceAck struct{}

func (ForceAck) Error

func (ForceAck) Error() string

type ISessionHandler

type ISessionHandler interface {
	// contains filtered or unexported methods
}

type ISyncHandler

type ISyncHandler interface {
	// contains filtered or unexported methods
}

type NewPeerHandshake

type NewPeerHandshake struct{}

func (NewPeerHandshake) NotifyNewPeer

func (NewPeerHandshake) NotifyNewPeer(peer *pb.PeerID, stub *pb.StreamStub)

type NormalEnd

type NormalEnd struct{}

func (NormalEnd) Error

func (NormalEnd) Error() string

type SessionClientImpl

type SessionClientImpl interface {
	PreFilter(rledger *pb.LedgerState) bool
	//assign onconnect and next an id to distinguish the
	//different tasks, next can gen a user-custom object
	//and it was passed to OnData/Fail
	OnConnected(int, *pb.AcceptSession) error
	Next(int) (*pb.TransferRequest, interface{})
	//can return ForceAck or NormalEnd
	//if normalend is returned, Next() will
	//be call and the whole session is ended
	//if Next() return nil
	//Notice 1: if OnData return any error execpt
	//for ForceAck, session will end without
	//OnFail being called
	//Notice 2: we always ack when normal end
	OnData(interface{}, *pb.TransferResponse) error
	OnFail(interface{}, error)
}

type StateSyncer

type StateSyncer interface {
	GetTarget() []byte
	ApplySyncData(data *pb.SyncStateChunk) error
	AssignTasks() ([]*pb.SyncOffset, error)
}

type StreamFilter

type StreamFilter struct {
	*pb.PeerEndpoint
}

func (StreamFilter) QualitifiedPeer

func (self StreamFilter) QualitifiedPeer(ep *pb.PeerEndpoint) bool

type SyncMsgPrefilter

type SyncMsgPrefilter interface {
	ForSimple(*pb.SimpleReq) bool
	ForSession(*pb.OpenSession) bool
}

type SyncStub

type SyncStub struct {
	// contains filtered or unexported fields
}

func NewSyncStub

func NewSyncStub(ctx context.Context, l *ledger.Ledger) *SyncStub

func (*SyncStub) BroadcastLedgerStatus

func (s *SyncStub) BroadcastLedgerStatus(sstub *pb.StreamStub)

func (*SyncStub) NewStreamHandlerImpl

func (s *SyncStub) NewStreamHandlerImpl(id *pb.PeerID, _ *pb.StreamStub, _ bool) (pb.StreamHandlerImpl, error)

also help imply the main entry of stream factory

func (*SyncStub) SetExternalPrefilter

func (s *SyncStub) SetExternalPrefilter(f SyncMsgPrefilter)

func (*SyncStub) SetServerOption

func (s *SyncStub) SetServerOption(opt *syncOpt)

accept incoming option as default opt template, except for the prefilter

func (*SyncStub) StubContext

func (s *SyncStub) StubContext() context.Context

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL