events

package
v0.0.0-...-d3d27b2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2024 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// FromBlockLatest is the special string that means subscribe from the current block
	FromBlockLatest = "latest"
	// ErrorHandlingBlock blocks the event stream until the handler can accept the event
	ErrorHandlingBlock = "block"
	// ErrorHandlingSkip processes up to the retry behavior on the stream, then skips to the next event
	ErrorHandlingSkip = "skip"
	// MaxBatchSize is the maximum that a user can specific for their batch size
	MaxBatchSize = 1000
	// DefaultExponentialBackoffInitial  is the initial delay for backoff retry
	DefaultExponentialBackoffInitial = time.Duration(1) * time.Second
	// DefaultExponentialBackoffFactor is the factor we use between retries
	DefaultExponentialBackoffFactor = float64(2.0)
	// DefaultTimestampCacheSize is the number of entries we will hold in a LRU cache for block timestamps
	DefaultTimestampCacheSize = 1000
)
View Source
const (
	// SubPathPrefix is the path prefix for subscriptions
	SubPathPrefix = "/subscriptions"
	// StreamPathPrefix is the path prefix for event streams
	StreamPathPrefix = "/eventstreams"
)

Variables

This section is empty.

Functions

func CobraInitSubscriptionManager

func CobraInitSubscriptionManager(cmd *cobra.Command, conf *SubscriptionManagerConf)

CobraInitSubscriptionManager standard naming for cobra command params

Types

type ABIRefOrInline

type ABIRefOrInline struct {
	contractregistry.ABILocation
	Inline ethbinding.ABIMarshaling `json:"inline,omitempty"`
}

type DistributionMode

type DistributionMode string
const (
	DistributionModeBroadcast DistributionMode = "broadcast"
	DistributionModeWLD       DistributionMode = "workloadDistribution"
)

type StreamInfo

type StreamInfo struct {
	messages.TimeSorted
	ID                   string               `json:"id"`
	Name                 string               `json:"name,omitempty"`
	Path                 string               `json:"path"`
	Suspended            bool                 `json:"suspended"`
	Type                 string               `json:"type,omitempty"`
	BatchSize            uint64               `json:"batchSize,omitempty"`
	BatchTimeoutMS       uint64               `json:"batchTimeoutMS,omitempty"`
	ErrorHandling        string               `json:"errorHandling,omitempty"`
	RetryTimeoutSec      uint64               `json:"retryTimeoutSec,omitempty"`
	TypoReryDelaySec     uint64               `json:"blockedReryDelaySec,omitempty"`
	BlockedRetryDelaySec *uint64              `json:"blockedRetryDelaySec,omitempty"`
	Webhook              *webhookActionInfo   `json:"webhook,omitempty"`
	WebSocket            *webSocketActionInfo `json:"websocket,omitempty"`
	Timestamps           bool                 `json:"timestamps,omitempty"` // Include block timestamps in the events generated
	TimestampCacheSize   int                  `json:"timestampCacheSize,omitempty"`
	Inputs               bool                 `json:"inputs,omitempty"` // Include input args in the events generated
}

StreamInfo configures the stream to perform an action for each event

func (*StreamInfo) GetID

func (spec *StreamInfo) GetID() string

GetID returns the ID (for sorting)

type SubscriptionCreateDTO

type SubscriptionCreateDTO struct {
	Name      string                           `json:"name,omitempty"`
	Stream    string                           `json:"stream,omitempty"`
	Event     *ethbinding.ABIElementMarshaling `json:"event,omitempty"`
	Methods   ethbinding.ABIMarshaling         `json:"methods,omitempty"` // an inline set of methods that might emit the event
	FromBlock string                           `json:"fromBlock,omitempty"`
	Address   *ethbinding.Address              `json:"address,omitempty"`
}

type SubscriptionInfo

type SubscriptionInfo struct {
	messages.TimeSorted
	ID           string                           `json:"id,omitempty"`
	Path         string                           `json:"path"`
	Summary      string                           `json:"-"`    // System generated name for the subscription
	Name         string                           `json:"name"` // User provided name for the subscription, set to Summary if missing
	Stream       string                           `json:"stream"`
	Filter       persistedFilter                  `json:"filter"`
	Event        *ethbinding.ABIElementMarshaling `json:"event"`
	FromBlock    string                           `json:"fromBlock,omitempty"`
	ABI          *ABIRefOrInline                  `json:"abi,omitempty"`
	Synchronized bool                             `json:"synchronized"`
}

SubscriptionInfo is the persisted data for the subscription

func (*SubscriptionInfo) GetID

func (info *SubscriptionInfo) GetID() string

GetID returns the ID (for sorting)

type SubscriptionManager

type SubscriptionManager interface {
	Init() error
	AddStream(ctx context.Context, spec *StreamInfo) (*StreamInfo, error)
	Streams(ctx context.Context) []*StreamInfo
	StreamByID(ctx context.Context, id string) (*StreamInfo, error)
	UpdateStream(ctx context.Context, id string, spec *StreamInfo) (*StreamInfo, error)
	SuspendStream(ctx context.Context, id string) error
	ResumeStream(ctx context.Context, id string) error
	DeleteStream(ctx context.Context, id string) error
	AddSubscription(ctx context.Context, addr *ethbinding.Address, abi *contractregistry.ABILocation, event *ethbinding.ABIElementMarshaling, streamID, initialBlock, name string) (*SubscriptionInfo, error)
	AddSubscriptionDirect(ctx context.Context, newSub *SubscriptionCreateDTO) (*SubscriptionInfo, error)
	Subscriptions(ctx context.Context) []*SubscriptionInfo
	SubscriptionByID(ctx context.Context, id string) (*SubscriptionInfo, error)
	ResetSubscription(ctx context.Context, id, initialBlock string) error
	DeleteSubscription(ctx context.Context, id string) error
	Close(wait bool)
}

SubscriptionManager provides REST APIs for managing events

func NewSubscriptionManager

func NewSubscriptionManager(conf *SubscriptionManagerConf, rpc eth.RPCClient, cr contractregistry.ContractResolver, wsChannels ws.WebSocketChannels) (s SubscriptionManager, err error)

NewSubscriptionManager constructor

type SubscriptionManagerConf

type SubscriptionManagerConf struct {
	EventLevelDBPath        string          `json:"eventsDB"`
	EventPollingIntervalSec uint64          `json:"eventPollingIntervalSec,omitempty"`
	CatchupModeBlockGap     int64           `json:"catchupModeBlockGap,omitempty"`
	CatchupModePageSize     int64           `json:"catchupModePageSize,omitempty"`
	WebhooksAllowPrivateIPs bool            `json:"webhooksAllowPrivateIPs,omitempty"`
	DecimalTransactionIndex bool            `json:"decimalTransactionIndex,omitempty"`
	Confirmations           bcmConfExternal `json:"confirmations,omitempty"`
}

SubscriptionManagerConf configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL