stream

package
v0.0.0-...-2e66602 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2021 License: GPL-3.0 Imports: 27 Imported by: 0

Documentation

Overview

sync

Index

Constants

View Source
const (
	Low uint8 = iota
	Mid
	High
	Top
	PriorityQueue    = 4   // number of priority queues - Low, Mid, High, Top
	PriorityQueueCap = 512 // queue capacity
	HashSize         = 32
)
View Source
const (
	BatchSize = 128
)
View Source
const (
	MAX_DELAY_CNT = 10
)

Variables

View Source
var ErrMaxPeerServers = errors.New("max peer servers")

ErrMaxPeerServers will be returned if peer server limit is reached. It will be sent in the SubscribeErrorMsg.

View Source
var (
	ErrorLightNodeRejectRetrieval = errors.New("Light nodes reject retrieval")
)

Functions

func FormatSyncBinKey

func FormatSyncBinKey(bin uint8) string

FormatSyncBinKey returns a string representation of Kademlia bin number to be used as key for SYNC stream.

func ParseSyncBinKey

func ParseSyncBinKey(s string) (uint8, error)

ParseSyncBinKey parses the string representation and returns the Kademlia bin number.

func RegisterSwarmSyncerClient

func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore)

RegisterSwarmSyncerClient registers the client constructor function for to handle incoming sync streams

func RegisterSwarmSyncerServer

func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore)

Types

type API

type API struct {
	// contains filtered or unexported fields
}

func NewAPI

func NewAPI(r *Registry) *API

func (*API) GetPeerSubscriptions

func (api *API) GetPeerSubscriptions() map[string][]string

GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects.

func (*API) SubscribeStream

func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error

func (*API) UnsubscribeStream

func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error

type ChunkDeliveryMsg

type ChunkDeliveryMsg struct {
	Addr  storage.Address
	SData []byte // the stored chunk Data (incl size)
	// contains filtered or unexported fields
}

Chunk delivery always uses the same message type....

type ChunkDeliveryMsgRetrieval

type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg

defines a chunk delivery for retrieval (with accounting)

type ChunkDeliveryMsgSyncing

type ChunkDeliveryMsgSyncing ChunkDeliveryMsg

defines a chunk delivery for syncing (without accounting)

type Client

type Client interface {
	NeedData(context.Context, []byte) func(context.Context) error
	BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
	Close()
}

Client interface for incoming peer Streamer

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

func NewDelivery

func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore, receiptStore *state.ReceiptStore) *Delivery

func (*Delivery) AttachBzz

func (d *Delivery) AttachBzz(bzz *network.Bzz)

收到了某个节点来的查询数据的请求

func (*Delivery) GetConnectedNodes

func (d *Delivery) GetConnectedNodes() (int, int)

func (*Delivery) GetDataFromCentral

func (d *Delivery) GetDataFromCentral(ctx context.Context, address storage.Address)

* not used, read one chunk from center is a very inefficient routine We will research if read from another data-distribute network is very viable

func (*Delivery) GetReceiptsLogs

func (d *Delivery) GetReceiptsLogs() []state.Receipts

func (*Delivery) GetReceivedChunkInfo

func (d *Delivery) GetReceivedChunkInfo() map[common.Address]int64

func (*Delivery) IncreaseAccount

func (d *Delivery) IncreaseAccount(account common.Address)

func (*Delivery) RequestFromPeers

func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error)

RequestFromPeers sends a chunk retrieve request to 发送一个chunk读取请求,ctx保存超时之类的信息,req是一个请求指令,包含了源地址和数据哈希,

func (*Delivery) SetSyncBandlimit

func (d *Delivery) SetSyncBandlimit(syncBandLimit int)

*

设置允许的流同步的速度,
syncBandLimit,同步的带宽,以Bytes/s为单位

func (*Delivery) SyncEnabled

func (d *Delivery) SyncEnabled() bool

func (*Delivery) UpdateNodes

func (d *Delivery) UpdateNodes(nodes []string)

type Handover

type Handover struct {
	Stream     Stream // name of stream
	Start, End uint64 // index of hashes
	Root       []byte // Root hash for indexed segment inclusion proofs
}

Handover represents a statement that the upstream peer hands over the stream section

type HandoverProof

type HandoverProof struct {
	Sig []byte // Sign(Hash(Serialisation(Handover)))
	*Handover
}

HandoverProof represents a signed statement that the upstream peer handed over the stream section

type OfferedHashesMsg

type OfferedHashesMsg struct {
	Stream   Stream // name of Stream
	From, To uint64 // peer and db-specific entry count
	Hashes   []byte // stream of hashes (128)
	//	Delayed        uint64
	*HandoverProof // HandoverProof
}

OfferedHashesMsg is the protocol msg for offering to hand over a stream section

func (OfferedHashesMsg) String

func (m OfferedHashesMsg) String() string

String pretty prints OfferedHashesMsg

type Peer

type Peer struct {
	*protocols.Peer
	// contains filtered or unexported fields
}

Peer is the Peer extension for the streaming protocol

func NewPeer

func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer

NewPeer is the constructor for Peer

func (*Peer) Deliver

func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error

Deliver sends a storeRequestMsg protocol message to the peer Depending on the `syncing` parameter we send different message types

func (*Peer) EndRetrieve

func (p *Peer) EndRetrieve(address storage.Address)

func (*Peer) GetDelay

func (p *Peer) GetDelay() time.Duration

func (*Peer) HandleMsg

func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error

HandleMsg is the message handler that delegates incoming messages

func (*Peer) SendOfferedHashes

func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error

SendOfferedHashes sends OfferedHashesMsg protocol msg

func (*Peer) SendPriority

func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error

SendPriority sends message to the peer using the outgoing priority queue

func (*Peer) StartRetrieve

func (p *Peer) StartRetrieve(address storage.Address)

type QuitMsg

type QuitMsg struct {
	Stream Stream
}

type Range

type Range struct {
	From, To uint64
}

func NewRange

func NewRange(from, to uint64) *Range

func (*Range) String

func (r *Range) String() string

type ReceiptsMsg

type ReceiptsMsg struct {
	PA     [20]byte
	STime  uint32
	AMount uint32
	Sig    []byte
}

收据消息,客户端从服务端收到检索的回应数据后,通过此格式向服务端发送签名 如果客户端总是向服务端提交一个新签名,即STime为新的,AMount为0,那么服务端就可以断开该客户端的连接,并将该节点加入黑名单 签名的收据总是用最低优先级发送,并且如果有新的可覆盖签名出现时,使用新的签名,可以直接丢弃老的签名

type Registry

type Registry struct {
	NodeType uint8
	// contains filtered or unexported fields
}

Registry registry for outgoing and incoming streamer constructors

func NewRegistry

func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry

NewRegistry is Streamer constructor

func (*Registry) APIs

func (r *Registry) APIs() []rpc.API

func (*Registry) Close

func (r *Registry) Close() error

func (*Registry) GetClientFunc

func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error)

GetClient accessor for incoming streamer constructors

func (*Registry) GetServerFunc

func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error)

GetServer accessor for incoming streamer constructors

func (*Registry) GetSpec

func (r *Registry) GetSpec() *protocols.Spec

GetSpec returns the streamer spec to callers This used to be a global variable but for simulations with multiple nodes its fields (notably the Hook) would be overwritten

func (*Registry) Protocols

func (r *Registry) Protocols() []p2p.Protocol

func (*Registry) Quit

func (r *Registry) Quit(peerId enode.ID, s Stream) error

Quit sends the QuitMsg to the peer to remove the stream peer client and terminate the streaming.

func (*Registry) RegisterClientFunc

func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error))

RegisterClient registers an incoming streamer constructor

func (*Registry) RegisterServerFunc

func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error))

RegisterServer registers an outgoing streamer constructor

func (*Registry) RequestSubscription

func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error

注册一个订阅,方法是向peer发送需要一个订阅消息 从这个角度来说说,每次peer建立后,都应该调用这个函数

func (*Registry) Run

func (r *Registry) Run(p *network.BzzPeer) error

Run protocol run function

func (*Registry) Start

func (r *Registry) Start(server *p2p.Server) error

func (*Registry) Stop

func (r *Registry) Stop() error

func (*Registry) Subscribe

func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error

Subscribe initiates the streamer

func (*Registry) Unsubscribe

func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error

type RegistryOptions

type RegistryOptions struct {
	SkipCheck       bool
	NodeType        uint8
	SyncUpdateDelay time.Duration
	MaxPeerServers  int // The limit of servers for each peer in registry
}

RegistryOptions holds optional values for NewRegistry constructor.

type RequestSubscriptionMsg

type RequestSubscriptionMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  // delivered on priority channel
}

RequestSubscriptionMsg is the protocol msg for a node to request subscription to a specific stream

type RetrieveRequestMsg

type RetrieveRequestMsg struct {
	Addr        storage.Address
	SkipCheck   bool
	RetrieveNow bool
	HopCount    uint8
}

RetrieveRequestMsg is the protocol msg for chunk retrieve requests

type Server

type Server interface {
	// SessionIndex is called when a server is initialized
	// to get the current cursor state of the stream data.
	// Based on this index, live and history stream intervals
	// will be adjusted before calling SetNextBatch.
	SessionIndex() (uint64, error)
	SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
	GetData(context.Context, []byte) ([]byte, error)
	Close()
}

Server interface for outgoing peer Streamer

type Stream

type Stream struct {
	// Name is used for Client and Server functions identification.
	Name string
	// Key is the name of specific stream data.
	Key string
	// Live defines whether the stream delivers only new data
	// for the specific stream.
	Live bool
}

Stream defines a unique stream identifier.

func NewStream

func NewStream(name string, key string, live bool) Stream

func (Stream) String

func (s Stream) String() string

String return a stream id based on all Stream fields.

type StreamerPrices

type StreamerPrices struct {
	// contains filtered or unexported fields
}

An accountable message needs some meta information attached to it in order to evaluate the correct price

func (*StreamerPrices) Price

func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price

Price implements the accounting interface and returns the price for a specific message

type SubscribeErrorMsg

type SubscribeErrorMsg struct {
	Error string
}

type SubscribeMsg

type SubscribeMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  // delivered on priority channel
}

SubcribeMsg is the protocol msg for requesting a stream(section)

type SwarmChunkServer

type SwarmChunkServer struct {
	// contains filtered or unexported fields
}

SwarmChunkServer implements Server

func NewSwarmChunkServer

func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer

NewSwarmChunkServer is SwarmChunkServer constructor

func (*SwarmChunkServer) Close

func (s *SwarmChunkServer) Close()

Close needs to be called on a stream server

func (*SwarmChunkServer) GetData

func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error)

GetData retrives chunk data from db store

func (*SwarmChunkServer) SessionIndex

func (s *SwarmChunkServer) SessionIndex() (uint64, error)

SessionIndex returns zero in all cases for SwarmChunkServer.

func (*SwarmChunkServer) SetNextBatch

func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)

SetNextBatch

type SwarmSyncerClient

type SwarmSyncerClient struct {
	// contains filtered or unexported fields
}

SwarmSyncerClient

func NewSwarmSyncerClient

func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error)

NewSwarmSyncerClient is a contructor for provable data exchange syncer

func (*SwarmSyncerClient) BatchDone

func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error)

BatchDone

func (*SwarmSyncerClient) Close

func (s *SwarmSyncerClient) Close()

func (*SwarmSyncerClient) NeedData

func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error)

NeedData需要检查数据地址是否更近,如果更近,返回fetch,否则,直接返回nil,

type SwarmSyncerServer

type SwarmSyncerServer struct {
	// contains filtered or unexported fields
}

SwarmSyncerServer implements an Server for history syncing on bins offered streams: * live request delivery with or without checkback * (live/non-live historical) chunk syncing per proximity bin

func NewSwarmSyncerServer

func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error)

NewSwarmSyncerServer is constructor for SwarmSyncerServer

func (*SwarmSyncerServer) Close

func (s *SwarmSyncerServer) Close()

Close needs to be called on a stream server

func (*SwarmSyncerServer) GetData

func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error)

GetData retrieves the actual chunk from netstore

func (*SwarmSyncerServer) SessionIndex

func (s *SwarmSyncerServer) SessionIndex() (uint64, error)

SessionIndex returns current storage bin (po) index.

func (*SwarmSyncerServer) SetNextBatch

func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error)

GetBatch retrieves the next batch of hashes from the dbstore

type Takeover

type Takeover Handover

Takeover represents a statement that downstream peer took over (stored all data) handed over

type TakeoverProof

type TakeoverProof struct {
	Sig []byte // Sign(Hash(Serialisation(Takeover)))
	*Takeover
}
TakeoverProof represents a signed statement that the downstream peer took over

the stream section

type TakeoverProofMsg

type TakeoverProofMsg TakeoverProof

TakeoverProofMsg is the protocol msg sent by downstream peer

func (TakeoverProofMsg) String

func (m TakeoverProofMsg) String() string

String pretty prints TakeoverProofMsg

type TrafficLoad

type TrafficLoad struct {
	// contains filtered or unexported fields
}

type UnsubscribeMsg

type UnsubscribeMsg struct {
	Stream Stream
}

type WantedHashesMsg

type WantedHashesMsg struct {
	Stream   Stream
	Want     []byte // bitvector indicating which keys of the batch needed  当前想要的哈希的位映射表
	From, To uint64 // next interval offset - empty if not to be continued  下一个要检查的区域
}

WantedHashesMsg is the protocol msg data for signaling which hashes offered in OfferedHashesMsg downstream peer actually wants sent over

func (WantedHashesMsg) String

func (m WantedHashesMsg) String() string

String pretty prints WantedHashesMsg

type WrappedPriorityMsg

type WrappedPriorityMsg struct {
	Context context.Context
	Msg     interface{}
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL