etherstream

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2022 License: CC0-1.0 Imports: 10 Imported by: 1

README

Go Reference

EtherStream is a library for event-sourcing with Ethereum blockchains.

This is free and unencumbered software released into the public domain.

Your feedback and code is welcome.

Use

Fetch a live stream plus all previous entries with something like the following.

ethereumRPCURL := "wss://eth-goerli.g.alchemy.com/v2/8DXx4tv3-IXWBgwrtyhhmElxV0_VM9YK"
ethereumRPCClient, err := ethclient.DialContext(ctx, ethereumRPCURL)
if err != nil {
	log.Fatal("Ethereum RPC API unavailable:", err)
}
etherReader := etherstream.Reader{Backend: ethereumRPCClient}

exampleQ := ethereum.FilterQuery{Topics: [][]ether.Hash{{
	ether.HexToHash("0xeb6c7d1cd53bd4a9d7c4478386be075d97a6372e435e72cb37313dfa17ad00d7"),
}}}
stream, sub, history, err := etherReader.QueryWithHistory(ctx, &exampleQ)
if err != nil {
	log.Print(err)
	return
}
defer sub.Unsubscribe()

Documentation

Overview

Package etherstream provides event streams from the blockchain.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Order

func Order(a, b *chain.Log) int

Order returns whether b follows a, with positive for yes, zero for equal, or negative for no.

Types

type Reader

type Reader struct {
	Backend ethereum.LogFilterer // blockchain connection

	// limit for a subscription request (defaults to 2 s)
	SubscribeTimeout time.Duration
	// limit for history retreival (defaults to 7 s)
	FetchTimeout time.Duration

	// idle time on which no new content can be assumed
	// (defaults to 500 ms)
	ReceiveExpire time.Duration
}

Reader configures the blockchain connectivity.

Note that users need WebSockets when calling ethclient.DialContext, because subscriptions won't work with regular HTTP RPC.

Example
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

ethereumRPCURL := "wss://eth-goerli.g.alchemy.com/v2/8DXx4tv3-IXWBgwrtyhhmElxV0_VM9YK"
ethereumRPCClient, err := ethclient.DialContext(ctx, ethereumRPCURL)
if err != nil {
	fmt.Println("Ethereum RPC API unavailable:", err)
	return
}
etherReader := etherstream.Reader{Backend: ethereumRPCClient}

exampleQ := ethereum.FilterQuery{Topics: [][]ether.Hash{{
	ether.HexToHash("0xeb6c7d1cd53bd4a9d7c4478386be075d97a6372e435e72cb37313dfa17ad00d7"),
}}}
stream, sub, history, err := etherReader.QueryWithHistory(ctx, &exampleQ)
if err != nil {
	fmt.Println(err)
	return
}
defer sub.Unsubscribe()

if len(history) != 0 {
	fmt.Println("historic entries ✓")
}
fmt.Print(cap(stream), "-slot stream buffer ✓\n")
Output:

historic entries ✓
60-slot stream buffer ✓

func (Reader) EventsWithHistory

func (r Reader) EventsWithHistory(ctx context.Context, eventType *abi.Event) (stream <-chan chain.Log, _ ethereum.Subscription, history []chain.Log, _ error)

EventsWithHistory resolves all logs matching eventType. The history is sorted in ascending order. The first receive from stream directly follows the last entry from history, if any.

func (Reader) QueryWithHistory

func (r Reader) QueryWithHistory(ctx context.Context, q *ethereum.FilterQuery) (stream <-chan chain.Log, _ ethereum.Subscription, history []chain.Log, _ error)

QueryWithHistory resolves all logs matching q. The history is sorted in ascending order. The first receive from stream directly follows the last entry from history, if any.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL