Documentation ¶
Index ¶
Constants ¶
View Source
const ( // FromBlockLatest is the special string that means subscribe from the current block FromBlockLatest = "latest" // ErrorHandlingBlock blocks the event stream until the handler can accept the event ErrorHandlingBlock = "block" // ErrorHandlingSkip processes up to the retry behavior on the stream, then skips to the next event ErrorHandlingSkip = "skip" // MaxBatchSize is the maximum that a user can specific for their batch size MaxBatchSize = 1000 // DefaultExponentialBackoffInitial is the initial delay for backoff retry DefaultExponentialBackoffInitial = time.Duration(1) * time.Second // DefaultExponentialBackoffFactor is the factor we use between retries DefaultExponentialBackoffFactor = float64(2.0) // DefaultTimestampCacheSize is the number of entries we will hold in a LRU cache for block timestamps DefaultTimestampCacheSize = 1000 )
View Source
const ( // SubPathPrefix is the path prefix for subscriptions SubPathPrefix = "/subscriptions" // StreamPathPrefix is the path prefix for event streams StreamPathPrefix = "/eventstreams" )
Variables ¶
This section is empty.
Functions ¶
func CobraInitSubscriptionManager ¶
func CobraInitSubscriptionManager(cmd *cobra.Command, conf *SubscriptionManagerConf)
CobraInitSubscriptionManager standard naming for cobra command params
Types ¶
type ABIRefOrInline ¶
type ABIRefOrInline struct { contractregistry.ABILocation Inline ethbinding.ABIMarshaling `json:"inline,omitempty"` }
type DistributionMode ¶
type DistributionMode string
const ( DistributionModeBroadcast DistributionMode = "broadcast" DistributionModeWLD DistributionMode = "workloadDistribution" )
type StreamInfo ¶
type StreamInfo struct { messages.TimeSorted ID string `json:"id"` Name string `json:"name,omitempty"` Path string `json:"path"` Suspended bool `json:"suspended"` Type string `json:"type,omitempty"` BatchSize uint64 `json:"batchSize,omitempty"` BatchTimeoutMS uint64 `json:"batchTimeoutMS,omitempty"` ErrorHandling string `json:"errorHandling,omitempty"` RetryTimeoutSec uint64 `json:"retryTimeoutSec,omitempty"` TypoReryDelaySec uint64 `json:"blockedReryDelaySec,omitempty"` BlockedRetryDelaySec *uint64 `json:"blockedRetryDelaySec,omitempty"` Webhook *webhookActionInfo `json:"webhook,omitempty"` WebSocket *webSocketActionInfo `json:"websocket,omitempty"` Timestamps bool `json:"timestamps,omitempty"` // Include block timestamps in the events generated TimestampCacheSize int `json:"timestampCacheSize,omitempty"` Inputs bool `json:"inputs,omitempty"` // Include input args in the events generated }
StreamInfo configures the stream to perform an action for each event
func (*StreamInfo) GetID ¶
func (spec *StreamInfo) GetID() string
GetID returns the ID (for sorting)
type SubscriptionCreateDTO ¶
type SubscriptionCreateDTO struct { Name string `json:"name,omitempty"` Stream string `json:"stream,omitempty"` Event *ethbinding.ABIElementMarshaling `json:"event,omitempty"` Methods ethbinding.ABIMarshaling `json:"methods,omitempty"` // an inline set of methods that might emit the event FromBlock string `json:"fromBlock,omitempty"` Address *ethbinding.Address `json:"address,omitempty"` }
type SubscriptionInfo ¶
type SubscriptionInfo struct { messages.TimeSorted ID string `json:"id,omitempty"` Path string `json:"path"` Summary string `json:"-"` // System generated name for the subscription Name string `json:"name"` // User provided name for the subscription, set to Summary if missing Stream string `json:"stream"` Filter persistedFilter `json:"filter"` Event *ethbinding.ABIElementMarshaling `json:"event"` FromBlock string `json:"fromBlock,omitempty"` ABI *ABIRefOrInline `json:"abi,omitempty"` Synchronized bool `json:"synchronized"` }
SubscriptionInfo is the persisted data for the subscription
func (*SubscriptionInfo) GetID ¶
func (info *SubscriptionInfo) GetID() string
GetID returns the ID (for sorting)
type SubscriptionManager ¶
type SubscriptionManager interface { Init() error AddStream(ctx context.Context, spec *StreamInfo) (*StreamInfo, error) Streams(ctx context.Context) []*StreamInfo StreamByID(ctx context.Context, id string) (*StreamInfo, error) UpdateStream(ctx context.Context, id string, spec *StreamInfo) (*StreamInfo, error) SuspendStream(ctx context.Context, id string) error ResumeStream(ctx context.Context, id string) error DeleteStream(ctx context.Context, id string) error AddSubscription(ctx context.Context, addr *ethbinding.Address, abi *contractregistry.ABILocation, event *ethbinding.ABIElementMarshaling, streamID, initialBlock, name string) (*SubscriptionInfo, error) AddSubscriptionDirect(ctx context.Context, newSub *SubscriptionCreateDTO) (*SubscriptionInfo, error) Subscriptions(ctx context.Context) []*SubscriptionInfo SubscriptionByID(ctx context.Context, id string) (*SubscriptionInfo, error) ResetSubscription(ctx context.Context, id, initialBlock string) error DeleteSubscription(ctx context.Context, id string) error Close(wait bool) }
SubscriptionManager provides REST APIs for managing events
func NewSubscriptionManager ¶
func NewSubscriptionManager(conf *SubscriptionManagerConf, rpc eth.RPCClient, cr contractregistry.ContractResolver, wsChannels ws.WebSocketChannels) (s SubscriptionManager, err error)
NewSubscriptionManager constructor
type SubscriptionManagerConf ¶
type SubscriptionManagerConf struct { EventLevelDBPath string `json:"eventsDB"` EventPollingIntervalSec uint64 `json:"eventPollingIntervalSec,omitempty"` CatchupModeBlockGap int64 `json:"catchupModeBlockGap,omitempty"` CatchupModePageSize int64 `json:"catchupModePageSize,omitempty"` WebhooksAllowPrivateIPs bool `json:"webhooksAllowPrivateIPs,omitempty"` DecimalTransactionIndex bool `json:"decimalTransactionIndex,omitempty"` Confirmations bcmConfExternal `json:"confirmations,omitempty"` }
SubscriptionManagerConf configuration
Click to show internal directories.
Click to hide internal directories.