streaming

package
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2021 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors

Functions

func CreateHashCidLink(hash []byte) cidlink.Link

func GetCurrentCidRoot

func GetCurrentCidRoot(hash []byte) cidlink.Link

func GetHeaderType

func GetHeaderType() schema.Type

func GetLinkPrototype

func GetLinkPrototype() ipld.LinkPrototype

Types

type DagCosmosIntermediateWriter

type DagCosmosIntermediateWriter struct {
	// contains filtered or unexported fields
}

DagCosmosIntermediateWriter is used so that we do not need to update the underlying io.Writer inside the StoreKVPairWriteListener everytime we begin writing to a new file

func NewDagCosmosIntermediateWriter

func NewDagCosmosIntermediateWriter(outChan chan<- []byte) *DagCosmosIntermediateWriter

NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel

func (*DagCosmosIntermediateWriter) Write

func (iw *DagCosmosIntermediateWriter) Write(b []byte) (int, error)

Write satisfies io.Writer

type DagCosmosStreamingService

type DagCosmosStreamingService struct {
	// contains filtered or unexported fields
}

StreamingService is a concrete implementation of StreamingService that writes state changes out to files

func NewDagCosmosStreamingService

func NewDagCosmosStreamingService(writeDir, filePrefix string, storeKeys []sdk.StoreKey, c codec.BinaryCodec) (*DagCosmosStreamingService, error)

NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys

func (*DagCosmosStreamingService) BuildHeaderMap

func (*DagCosmosStreamingService) BuildLastCommitMap

func (fss *DagCosmosStreamingService) BuildLastCommitMap(req *abci.RequestBeginBlock) datamodel.Node

func (*DagCosmosStreamingService) BuildTx

func (*DagCosmosStreamingService) Close

func (fss *DagCosmosStreamingService) Close() error

Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface

func (*DagCosmosStreamingService) ListenBeginBlock

ListenBeginBlock satisfies the Hook interface It writes out the received BeginBlock request and response and the resulting state changes out to a file as described in the above the naming schema

func (*DagCosmosStreamingService) ListenDeliverTx

ListenDeliverTx satisfies the Hook interface It writes out the received DeliverTx request and response and the resulting state changes out to a file as described in the above the naming schema

func (*DagCosmosStreamingService) ListenEndBlock

ListenEndBlock satisfies the Hook interface It writes out the received EndBlock request and response and the resulting state changes out to a file as described in the above the naming schema

func (*DagCosmosStreamingService) Listeners

func (fss *DagCosmosStreamingService) Listeners() map[sdk.StoreKey][]types.WriteListener

Listeners returns the StreamingService's underlying WriteListeners, use for registering them with the BaseApp

func (*DagCosmosStreamingService) Stream

func (fss *DagCosmosStreamingService) Stream(wg *sync.WaitGroup)

Stream spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs and caches them in the order they were received Do we need this and an intermediate writer? We could just write directly to the buffer on calls to Write But then we don't support a Stream interface, which could be needed for other types of streamers

func (*DagCosmosStreamingService) WriteCAR

func (fss *DagCosmosStreamingService) WriteCAR(root cid.Cid, filename string) error

WriteCAR

type DagEthIntermediateWriter

type DagEthIntermediateWriter struct {
	// contains filtered or unexported fields
}

DagEthIntermediateWriter is used so that we do not need to update the underlying io.Writer inside the StoreKVPairWriteListener everytime we begin writing to a new file

func NewDagEthIntermediateWriter

func NewDagEthIntermediateWriter(outChan chan<- []byte) *DagEthIntermediateWriter

NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel

func (*DagEthIntermediateWriter) Write

func (iw *DagEthIntermediateWriter) Write(b []byte) (int, error)

Write satisfies io.Writer

type DagEthStreamingService

type DagEthStreamingService struct {
	// contains filtered or unexported fields
}

StreamingService is a concrete implementation of StreamingService that writes state changes out to files

func NewDagEthStreamingService

func NewDagEthStreamingService(writeDir, filePrefix string, storeKeys []sdk.StoreKey, c codec.BinaryCodec) (*DagEthStreamingService, error)

NewDagEthStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys

func (*DagEthStreamingService) Close

func (fss *DagEthStreamingService) Close() error

Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface

func (*DagEthStreamingService) ListenBeginBlock

ListenBeginBlock satisfies the Hook interface It writes out the received BeginBlock request and response and the resulting state changes out to a file as described in the above the naming schema

func (*DagEthStreamingService) ListenDeliverTx

func (fss *DagEthStreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error

ListenDeliverTx satisfies the Hook interface It writes out the received DeliverTx request and response and the resulting state changes out to a file as described in the above the naming schema

func (*DagEthStreamingService) ListenEndBlock

func (fss *DagEthStreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error

ListenEndBlock satisfies the Hook interface It writes out the received EndBlock request and response and the resulting state changes out to a file as described in the above the naming schema

func (*DagEthStreamingService) Listeners

func (fss *DagEthStreamingService) Listeners() map[sdk.StoreKey][]types.WriteListener

Listeners returns the StreamingService's underlying WriteListeners, use for registering them with the BaseApp

func (*DagEthStreamingService) Stream

func (fss *DagEthStreamingService) Stream(wg *sync.WaitGroup)

Stream spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs and caches them in the order they were received Do we need this and an intermediate writer? We could just write directly to the buffer on calls to Write But then we don't support a Stream interface, which could be needed for other types of streamers

type FileStreamingService

type FileStreamingService struct {
	// contains filtered or unexported fields
}

StreamingService is a concrete implementation of StreamingService that writes state changes out to files

func NewStreamingService

func NewStreamingService(writeDir, filePrefix string, storeKeys []sdk.StoreKey, c codec.BinaryCodec) (*FileStreamingService, error)

NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys

func (*FileStreamingService) Close

func (fss *FileStreamingService) Close() error

Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface

func (*FileStreamingService) ListenBeginBlock

func (fss *FileStreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error

ListenBeginBlock satisfies the Hook interface It writes out the received BeginBlock request and response and the resulting state changes out to a file as described in the above the naming schema

func (*FileStreamingService) ListenDeliverTx

func (fss *FileStreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error

ListenDeliverTx satisfies the Hook interface It writes out the received DeliverTx request and response and the resulting state changes out to a file as described in the above the naming schema

func (*FileStreamingService) ListenEndBlock

func (fss *FileStreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error

ListenEndBlock satisfies the Hook interface It writes out the received EndBlock request and response and the resulting state changes out to a file as described in the above the naming schema

func (*FileStreamingService) Listeners

func (fss *FileStreamingService) Listeners() map[sdk.StoreKey][]types.WriteListener

Listeners returns the StreamingService's underlying WriteListeners, use for registering them with the BaseApp

func (*FileStreamingService) Stream

func (fss *FileStreamingService) Stream(wg *sync.WaitGroup)

Stream satisfies the baseapp.StreamingService interface It spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs and caches them in the order they were received

type IntermediateWriter

type IntermediateWriter struct {
	// contains filtered or unexported fields
}

IntermediateWriter is used so that we do not need to update the underlying io.Writer inside the StoreKVPairWriteListener everytime we begin writing to a new file

func NewIntermediateWriter

func NewIntermediateWriter(outChan chan<- []byte) *IntermediateWriter

NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel

func (*IntermediateWriter) Write

func (iw *IntermediateWriter) Write(b []byte) (int, error)

Write satisfies io.Writer

type ServiceConstructor

type ServiceConstructor func(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryCodec) (StreamingService, error)

ServiceConstructor is used to construct a streaming service

func NewServiceConstructor

func NewServiceConstructor(name string) (ServiceConstructor, error)

NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name

type ServiceType

type ServiceType int

ServiceType enum for specifying the type of StreamingService

const (
	Unknown ServiceType = iota
	File
	// add more in the future
	DagCosmos
	DagEth
)

func NewStreamingServiceType

func NewStreamingServiceType(name string) ServiceType

NewStreamingServiceType returns the streaming.ServiceType corresponding to the provided name

func (ServiceType) String

func (sst ServiceType) String() string

String returns the string name of a streaming.ServiceType

type StreamingHooks

type StreamingHooks struct {
}

type StreamingListener

type StreamingListener interface {
	// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
	ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
	// ListenEndBlock updates the steaming service with the latest EndBlock messages
	ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
	// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
	ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
}

StreamingListener interface used to hook into the ABCI message processing of the BaseApp

type StreamingService

type StreamingService interface {
	// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
	Stream(wg *sync.WaitGroup)
	// Listeners returns the streaming service's listeners for the BaseApp to register
	Listeners() map[types.StoreKey][]store.WriteListener
	// StreamingListener interface for hooking into the ABCI messages from inside the BaseApp
	StreamingListener
	// Closer interface
	io.Closer
}

StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks

func DagCosmosStreamingConstructor

func DagCosmosStreamingConstructor(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryCodec) (StreamingService, error)

DagCosmosStreamingConstructor is the streaming.ServiceConstructor function for creating a DagCosmosStreamingService

func DagEthStreamingConstructor

func DagEthStreamingConstructor(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryCodec) (StreamingService, error)

DagEthStreamingConstructor is the streaming.ServiceConstructor function for creating a DagEthStreamingService

func FileStreamingConstructor

func FileStreamingConstructor(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryCodec) (StreamingService, error)

FileStreamingConstructor is the streaming.ServiceConstructor function for creating a FileStreamingService

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL