superwatcher

package module
v0.0.0-...-8c998e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2023 License: BSD-3-Clause Imports: 10 Imported by: 0

README

superwatcher

superwatcher is a building block for filtering Ethereum logs, with chain reorganization handling baked in.

The code in this project is organized into the following packages:

  1. Top-level package "github.com/soyart/superwatcher" (public)

    This package exposes core interfaces to the superwatcher that the application code must implement. To use superwatcher, call functions in pkg sub-packages.

  2. pkg (public)

    This package defines extra (non-core) interfaces and some implementations that would help superwatcher user during their development. Most code there provides wrapper for internal, or offers other convenient functions and examples.

    Some development facility code like a fullly integrated test suite for application code servicetest, or the chain reorg simulation code reorgsim, or the mocked StateDataGateway types, are provided here.

    One package, pkg/components, is especially important for users, because it provides the preferred way to initialize superwatcher components.

  3. internal (private)

    This private package defines the actual implementations of interfaces defined in the top-level package. User are not expected to directly interact with the code here.

  4. examples (public)

    This package provides some context and examples of how to use superwatcher to build services. You can try running the service with its main.go.

superwatcher components

For more in-depth look at the components, see package components

There are 3 main superwatcher components - (1) the emitter, (2) the emitter client, and (3) the engine. The flowchart below illustrates how the 3 components work together.

                  EthClient (blockchain)
                            │
                            │
                            │  logs []types.Log
                            │  blockHashes []common.Hash
                            │
                            │
                            ▼
                      Emitter+Poller
                            │
                            │
                            │  PollerResult {
                            │     GoodBlocks
                            │     ReorgedBlocks
                            │  }
                            │
                            │
                            │  error
                            ▼
                          Engine
┌─────────────────────────────────────────────────────────────┐
│                      EmitterClient                          │
│                           │                                 │
│                           ▼                                 │
│       ┌───────────────────┼─────────────────────┐           │
│       ▼                   ▼                     ▼           │
│  HandleGoogLogs    HandleReorgedLogs    HandleEmitterError  │
│                                                             │
└─────────────────────────────────────────────────────────────┘
  1. EmitterPoller

    The poller polls event logs from blockchain, and compares a log's block hash with the one it once saw. If the hash of this poll differs from the the previous poll, it assumes that the block's has been reorged. The result of this polling is PollerResult.

  2. Emitter

    The emitter uses an infinite loop to filter a overlapping range of blocks. It filters the logs using addresses and log topics, and because the block range overlaps with previous loop, it can gaurantee that the poller is always seeing all blocks more than once, catching any reorg events as they happen.

    After the poller returns, the emitter checks the result and error, emits the result to its consumers. It then waits (blocks) for a signal from its consumer. If no signal is received, the emitter blocks forever (no timeout for now).

  3. EmiiterClient

    The emitter client is embedded into Engine. The emitter client linearly receives PollerResult from emitter, and then returning it to Engine. It also syncs with the emittter. The emitter will not proceed to the next loop unless it client syncs.

  4. Engine The engine receives PollerResult from the emitter client, and passes the result to appropriate methods of ServiceEngine. It calls ServiceEngine.HandleReorgedLogs first, before ServiceEngine.HandleGoodLogs, so that the service can undo or fix any actions it had performed on the now bad logs before it processes the new, reorged logs.

    After the result is processed, it writes some metadata, with SetStateDataGateway, and use the emitter client to signal the emitter to start a new loop In addition to passing data around, the engine also keeps track of the log processing state to avoid double processing of the same data.

  5. ServiceEngine (example)

    The service engine is embedded into Engine, and it is what user injects into Engine. Because it is an interface, you can treat it like HTTP handlers - you can have a router service engine that routes logs to other service sub-engines, who also implement ServiceEngine.

From the chart, it may seem EmitterClient is somewhat extra bloat, but it's better (to me) to abstract the emitter data retrieval away from the engine. Having EmitterClient also makes testing Emitter easier, as we use the EmitterClient interface to test emitter's results.

Single emitter and engine, but with multiple service engines

See the demoservice to see how this router implementation works.

We can use middleware model on ServiceEngine to compose more complex service to be able to handle multiple contracts or business logic units.

An example of multiple ServiceEngines would be something like this:

                                                         ┌───►PoolFactoryEngine
                                                         │    (ServiceEngine)
                                    ┌──►UniswapV3Engine──┤
                                    │   (ServiceEngine)  │
                                    │                    └───►LiquidityPoolEngine
                                    │                         (ServiceEngine)
Engine ───► MainServiceRouter───────┤
           (ServiceEngine)          │
                                    │
                                    │
                                    └──►ENSEngine
                                        (ServiceEngine)

Using superwatcher

The most basic way to use superwatcher is to first implement ServiceEngine, and then call initsuperwatcher.New to initialize the emitter (with addresses and topics) and the engine (with the service engine implementation injected).

After you have successfully init both components, start both concurrently with Loop.

Understanding PollerResult

The data structure emitted by the emitter is PollerResult, which represents the result of each emitter.FilterLogs call. The important structure fields are:

PollerResult.GoodBlocks is any new blocks filtered. Duplicate good blocks will reappear if they are still in range, but the engine should be able to skip all such duplicate blocks, and thus the ServiceEngine code only sees new, good blocks it never saw.

PollerResult.ReorgedBlocks is any GoodBlocks from previous loops that the emitter saw with different block hashes. This means that any ReorgedBlocks was once a GoodBlocks. ServiceEngine is expected to revert or fix any actions performed when the removed blocks were still considered canonical.

Let's say this is our pre-fork blockchain (annotated with block numbers and block hashes):

Pre-fork:

[{10:0x10}, {11:0x11}, {12:0x12}, {13:0x13}]

Reorg fork at block 12: {12:0x12} -> {12:0x1212}, {13:0x13} -> {13:0x1313}

New chain:

[{10:0x10}, {11:0x11}, {12:0x1212}, {13:0x1313}]

Now let's say the emitter filters with a range of 2 blocks, with no look back blocks, this means that the engine will see the results as the following:

Loop 0 PollerResult: {GoodBlocks: [{10:0x10},{11:0x11}], ReorgedBlocks:[]}


                             a GoodBlock reappears
                                      ▼
Loop 1 PollerResult: {GoodBlocks: [{11:0x11},{12:0x12}], ReorgedBlocks:[]}


                                   we have not seen 13:0x13     was once a GoodBlock
                                                ▼                          ▼
Loop 2 PollerResult: {GoodBlocks: [12:0x1212},13:0x1313], ReorgedBlocks:[12:0x12]}


                            a GoodBlock reappears
                                      ▼
Loop 3 PollerResult: {GoodBlocks: [{13:0x1313},{14:0x14}], ReorgedBlocks:[]}

You can see that the once good 12:0x12 comes back in ReorgedBlocks even after its hash changed on the chain, and the new log from the forked chain appears as GoodBlocks in that round.

After emitting the result, emitter waits before continuing the next loop until the client syncs with the engine by sending a lightweight signal.

Future

We may provide core component wrappers to extend the base superwatcher functionality. For example, the router service engine discussed above maybe provided in the future, and wrappers for testing too.

These wrappers, like the wrapper, will first be prototyped in the demo service.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Ethereum node fetch error
	ErrFetchError = errors.New("fetch from ethclient failed")

	// Chain is reorging - will not cause a return from emitter
	// e.g. when logs filtered from the same block has different hashes
	ErrChainIsReorging  = errors.New("chain is reorging and data is not usable for now")
	ErrFromBlockReorged = errors.Wrap(ErrChainIsReorging, "fromBlock reorged")

	// Bug from my own part
	ErrSuperwatcherBug = errors.New("superwatcher bug")
	ErrProcessReorg    = errors.Wrap(ErrSuperwatcherBug, "error in emitter reorg detection logic") // Bug in reorg detection logic

	// User violates some rules/policies, e.g. downgrading poller Policy
	ErrUserError = errors.New("user error")
	ErrBadPolicy = errors.Wrap(ErrUserError, "invalid policy")
)
View Source
var ErrRecordNotFound = errors.New("record not found")

ErrRecordNotFound is checked for in emitter.loopEmit. If the error is ErrRecordNotFound, the emitter assumes the service has never run on this host (hence no data in the database), and will not attempt to go back.

Functions

func LastGoodBlock

func LastGoodBlock(
	result *PollerResult,
) uint64

LastGoodBlock computes `PollerResult.LastGoodBlock` based on |result|.

func WrapErrRecordNotFound

func WrapErrRecordNotFound(err error, keyNotFound string) error

Types

type Artifact

type Artifact any

type BaseServiceEngine

type BaseServiceEngine interface {
	HandleEmitterError(error) error
}

BaseServiceEngine is shared by ServiceEngine and ServiceThinEngine

type Block

type Block struct {
	// LogsMigrated indicates whether all interesting logs were moved/migrated
	// _from_ this block after a chain reorg or not. The field is primarily used
	// by EmitterPoller to trigger the poller to get new, fresh block hash for a block.
	// The field should always be false if the Block is in PollerResult.GoodBlocks.
	LogsMigrated bool `json:"logsMigrated"`

	Number uint64       `json:"number"`
	Hash   common.Hash  `json:"hash"`
	Header BlockHeader  `json:"-"`
	Logs   []*types.Log `json:"logs"`
}

Block represents the minimum block info needed for superwatcher. Block data can be retrieved from Block itself or its Header field.

func (*Block) String

func (b *Block) String() string

String returns the block hash with 0x prepended in all lowercase string.

type BlockHeader

type BlockHeader interface {
	Number() uint64
	Hash() common.Hash
	Nonce() types.BlockNonce
	Time() uint64
	GasLimit() uint64
	GasUsed() uint64
}

BlockHeader is implemented by `blockHeaderWrapper` and `*reorgsim.Block`. It is used in place of *types.Header to make writing tests with reorgsim easier. More methods may be added as our needs for data from the headers grow, or we (i.e. you) can mock the actual *types.Header in reorgsim instead :)

type BlockHeaderWrapper

type BlockHeaderWrapper struct {
	Header *types.Header
}

BlockHeaderWrappers wrap *types.Header to implenent BlockHeader

func (BlockHeaderWrapper) GasLimit

func (h BlockHeaderWrapper) GasLimit() uint64

func (BlockHeaderWrapper) GasUsed

func (h BlockHeaderWrapper) GasUsed() uint64

func (BlockHeaderWrapper) Hash

func (h BlockHeaderWrapper) Hash() common.Hash

func (BlockHeaderWrapper) Nonce

func (BlockHeaderWrapper) Number

func (h BlockHeaderWrapper) Number() uint64

func (BlockHeaderWrapper) Time

func (h BlockHeaderWrapper) Time() uint64

type Config

type Config struct {
	// External dependencies
	NodeURL string `mapstructure:"node_url" yaml:"node_url" json:"nodeURL"`

	// StartBlock is the shortest block height the emitter will consider as base, usually a contract's genesis block
	StartBlock uint64 `mapstructure:"start_block" yaml:"start_block" json:"startBlock"`

	// FilterRange is the forward range (number of new blocks) each call to emitter.poller.poll will perform
	FilterRange uint64 `mapstructure:"filter_range" yaml:"filter_range" json:"filterRange"`

	// DoReorg specifies whether superwatcher superwatcher.EmitterPoller will process chain reorg for PollerResult
	DoReorg bool `mapstructure:"do_reorg" yaml:"do_reorg" json:"doReorg"`

	// DoHeader specifies whether superwatcher.EmitterPoller should fetch block headers too
	DoHeader bool `mapstructure:"do_header" yaml:"do_header" json:"doHeader"`

	// MaxGoBackRetries is the maximum number of blocks the emitter will go back for. Once this is reached,
	// the emitter exits on error ErrMaxRetriesReached
	MaxGoBackRetries uint64 `mapstructure:"max_go_back_retries" yaml:"max_go_back_retries" json:"maxGoBackRetries"`

	// LoopInterval is the number of seconds the emitter sleeps after each call to emitter.poller.poll
	LoopInterval uint64 `mapstructure:"loop_interval" yaml:"loop_interval" json:"loopInterval"`

	// LogLevel for debugger.Debugger, the higher the more verbose
	LogLevel uint8 `mapstructure:"log_level" yaml:"log_level" json:"logLevel"`

	// Policy is for configuring EmitterPoller behavior
	Policy Policy `mapstructure:"policy" yaml:"policy" json:"policy"`
}

Config is superwatcher-wide configuration

type Controller

type Controller interface {
	// SetDoReorg makes the EmitterPoller engage chain reorg detection logic
	SetDoReorg(bool)
	// DoReorg returns if EmitterPoller is currently processing chain reorg inside EmitterPoller.Poll
	DoReorg() bool
	// SetDoHeader makes the EmitterPoller fetch block header for every block with interesting logs
	SetDoHeader(bool)
	// DoHeader returns if the EmitterPoller will fetch block headers for blocks with interesting logs
	DoHeader() bool
	// Addresses reads EmitterPoller's current event log addresses for filter query
	Addresses() []common.Address
	// Topics reads EmitterPoller's current event log topics for filter query
	Topics() [][]common.Hash
	// AddAddresses adds (appends) addresses to EmitterPoller's filter query
	AddAddresses(...common.Address)
	// AddTopics adds (appends) topics to EmitterPoller's filter query
	AddTopics(...[]common.Hash)
	// SetAddresses changes EmitterPoller's event log addresses on-the-fly
	SetAddresses([]common.Address)
	// SetTopics changes EmitterPoller's event log topics on-the-fly
	SetTopics([][]common.Hash)
}

Controller gives users the means and methods to change some of EmitterPoller parameters

type Emitter

type Emitter interface {
	// Loop is the entry point for Emitter.
	// Users will call Loop in a different loop than Engine.Loop
	// to make both components run concurrently.
	Loop(context.Context) error
	// SyncsEngine waits until engine is done processing the last batch
	SyncsEngine()
	// Shutdown closes emitter channels
	Shutdown()
	// Poller returns the current Poller in use by Emitter
	Poller() EmitterPoller
	// SetPoller overwrites emitter's Poller with a new one
	SetPoller(EmitterPoller)
}

Emitter receives results from Poller and emits them to Engine. Emitter is aware of the current service states (via StateDataGateway), and uses that information to determine fromBlock and toBlock for Poller.Poll.

type EmitterClient

type EmitterClient interface {
	// WatcherResult returns result from Emitter to caller
	WatcherResult() *PollerResult
	// WatcherError returns error sent by Emitter
	WatcherError() error
	// WatcherConfig returns config used to create its Emitter
	WatcherConfig() *Config
	// SyncsEmitter sends sync signal to Emitter so it can continue
	SyncsEmitter()
	// Shutdown closes Emitter comms channels
	Shutdown()
}

EmitterClient interfaces with Emitter. It can help abstract the complexity of receiving of channel data away from Engine. It can be ignored by superwatcher users if they are not implementing their own Engine.

type EmitterPoller

type EmitterPoller interface {
	// Poll polls event logs from fromBlock to toBlock, and process the logs into *PollerResult for Emitter
	Poll(ctx context.Context, fromBlock, toBlock uint64) (*PollerResult, error)
	// Policy gets current Policy
	Policy() Policy
	// SetPolicy sets new Policy (NOTE: changing Policy mid-run not tested)
	SetPolicy(Policy) error

	// EmitterPoller also implements Controller
	Controller
}

EmitterPoller fetches event logs and other blockchain data and maps it into *PollerResult. The result of EmitterPoller.poll is later used by Emitter to emit to Engine.

type Engine

type Engine interface {
	// Loop is the entry point for Engine.
	// Call it in a different Goroutine than Emitter.Loop to make both run concurrently.
	Loop(context.Context) error
}

Engine receives PollerResult emitted from Emitter and executes business service logic on PollerResult with ServiceEngine.

type EthClient

type EthClient interface {
	BlockNumber(context.Context) (uint64, error)
	FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error)
	HeaderByNumber(context.Context, *big.Int) (BlockHeader, error)
	EthClientRPC // EthClient will need to be able to do batch RPC calls
}

EthClient defines all Ethereum client methods used in superwatcher. HeaderByNumber returns BlockHeader because if it uses the actual *types.Header then the mock client in `reorgsim` would have to mock types.Header too, which is an overkill for now.

func NewEthClient

func NewEthClient(ctx context.Context, url string) EthClient

type EthClientRPC

type EthClientRPC interface {
	BatchCallContext(context.Context, []rpc.BatchElem) error
}

EthClientRPC is used by poller to get data from client in batch, e.g. when getting blocks or block headers in batch

type FuncGetLastRecordedBlock

type FuncGetLastRecordedBlock func(context.Context) (uint64, error)

type FuncSetLastRecordedBlock

type FuncSetLastRecordedBlock func(context.Context, uint64) error

type GetStateDataGateway

type GetStateDataGateway interface {
	GetLastRecordedBlock(context.Context) (uint64, error)
}

GetStateDataGateway is used by the emitter to get last recorded block.

type Policy

type Policy uint8

Policy (enum) specifies how EmitterPoller considers which blocks to include in its _tracking list_. For every block in this _tracking list_, the poller compares the saved block hash with newly polled one.

const (
	// PolicyFast makes poller only process and track blocks with interesting logs.
	// Hashes from blocks without logs are not processed, unless they were reorged
	// and had their logs removed, in which case the poller gets their headers _once_
	// to check their newer hashes, and remove the empty block from tracking list.
	PolicyFast Policy = iota

	// PolicyNormal makes poller only process and track blocks with interesting logs,
	// but if the poller detects that a block has its logs removed, it will process
	// and start tracking that block until the block goes out of poller scope.
	// The difference between PolicyFast and PolicyNormal
	// that PolicyNormal will keep tracking the reorged empty blocks.
	PolicyNormal

	// PolicyExpensive makes poller process and track all blocks' headers,
	// regardless of whether the blocks have interesting logs or not, or Config.DoHeader value.
	PolicyExpensive

	// PolicyFastBlock behaves like PolicyExpensive, but instead of fetching
	// event logs and headers, the poller fetches event logs and blocks.
	PolicyExpensiveBlock
)

func (Policy) String

func (level Policy) String() string

type PollerResult

type PollerResult struct {
	FromBlock     uint64   // The poller's `fromBlock`
	ToBlock       uint64   // The poller's `toBlock`
	LastGoodBlock uint64   // This number should be saved to StateDataGateway with SetLastRecordedBlock for the emitter
	GoodBlocks    []*Block // Can be either (1) fresh, new blocks, or (2) blocks whose hashes had not changed yet.
	ReorgedBlocks []*Block // Blocks that poller marked as removed. A service should undo/revert its actions done on the blocks.
}

PollerResult is created in Poller.Poll, and emitted by Emitter to Engine.

type ServiceEngine

type ServiceEngine interface {
	BaseServiceEngine
	// HandleGoodLogs handles new, canonical `Block`s. The return type is map of blockHash to []Artifact
	HandleGoodBlocks([]*Block, []Artifact) (map[common.Hash][]Artifact, error)
	// HandleReorgedLogs handles reorged (removed) `Block`s. The return type is map of blockHash to []Artifact
	HandleReorgedBlocks([]*Block, []Artifact) (map[common.Hash][]Artifact, error)
}

ServiceEngine is embedded and injected into Engine to perform business logic. It is the preferred way to use superwatcher

type SetStateDataGateway

type SetStateDataGateway interface {
	SetLastRecordedBlock(context.Context, uint64) error
}

SetStateDataGateway is used by the engine to set last recorded block.

type StateDataGateway

type StateDataGateway interface {
	GetStateDataGateway
	SetStateDataGateway
}

StateDataGateway is an interface that could both set and get lastRecordedBlock for superwatcher. Note: Graceful shutdowns for the StateDataGateway should be performed by service code.

type SuperWatcher

type SuperWatcher interface {
	// Run is the entry point for SuperWatcher
	Run(context.Context, context.CancelFunc) error
	Emitter() Emitter
	Engine() Engine
	Shutdown()

	Controller
}

type ThinServiceEngine

type ThinServiceEngine interface {
	BaseServiceEngine
	HandleFilterResult(*PollerResult) error
}

ThinServiceEngine is embedded and injected into thinEngine, a thin implementation of Engine without managed states. It is recommended for niche use cases and advanced users

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL