Documentation ¶
Index ¶
- Constants
- func AddListener(name string, ...)
- type BaseJob
- func (e *BaseJob) CreatedAt() time.Time
- func (e *BaseJob) FromChainID() *big.Int
- func (e *BaseJob) GetBackOff() int
- func (e *BaseJob) GetData() []byte
- func (e *BaseJob) GetID() int32
- func (e *BaseJob) GetListener() Listener
- func (e *BaseJob) GetMaxTry() int
- func (e *BaseJob) GetNextTry() int64
- func (e *BaseJob) GetRetryCount() int
- func (e *BaseJob) GetSubscriptionName() string
- func (e *BaseJob) GetTransaction() Transaction
- func (e *BaseJob) GetType() int
- func (e *BaseJob) GetValue() *big.Int
- func (e *BaseJob) Hash() common.Hash
- func (e *BaseJob) IncreaseRetryCount()
- func (e *BaseJob) Process() ([]byte, error)
- func (e *BaseJob) Save() error
- func (e *BaseJob) SetID(id int32)
- func (e *BaseJob) String() string
- func (e *BaseJob) Update(status string) error
- func (e *BaseJob) UpdateNextTry(nextTry int64)
- func (e *BaseJob) Utils() utils.Utils
- type Block
- type BridgeWorker
- func (w *BridgeWorker) Channel() chan JobHandler
- func (w *BridgeWorker) Close()
- func (w *BridgeWorker) Context() context.Context
- func (w *BridgeWorker) FailedChannel() chan<- JobHandler
- func (w *BridgeWorker) IsClose() bool
- func (w *BridgeWorker) PoolChannel() chan<- JobHandler
- func (w *BridgeWorker) ProcessJob(job JobHandler) error
- func (w *BridgeWorker) String() string
- func (w *BridgeWorker) WorkersQueue() chan chan JobHandler
- type Config
- type Controller
- type EmptyTransaction
- type Handler
- type Job
- type JobHandler
- type Listener
- type Log
- type LsConfig
- type Pool
- type Receipt
- type Secret
- type Stats
- type Subscribe
- type TaskHandler
- type Transaction
- type Worker
Constants ¶
View Source
const ( ListenHandler = iota CallbackHandler )
View Source
const ( TxEvent = iota LogEvent )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BaseJob ¶
type BaseJob struct {
// contains filtered or unexported fields
}
func NewBaseJob ¶
func (*BaseJob) FromChainID ¶
func (*BaseJob) GetBackOff ¶
func (*BaseJob) GetListener ¶
func (*BaseJob) GetNextTry ¶
func (*BaseJob) GetRetryCount ¶
func (*BaseJob) GetSubscriptionName ¶
func (*BaseJob) GetTransaction ¶
func (e *BaseJob) GetTransaction() Transaction
func (*BaseJob) IncreaseRetryCount ¶
func (e *BaseJob) IncreaseRetryCount()
func (*BaseJob) UpdateNextTry ¶
type BridgeWorker ¶
type BridgeWorker struct {
// contains filtered or unexported fields
}
func NewWorker ¶
func NewWorker(ctx context.Context, id int, mainChan, failedChan chan<- JobHandler, queue chan chan JobHandler, size int, listeners map[string]Listener) *BridgeWorker
func (*BridgeWorker) Channel ¶
func (w *BridgeWorker) Channel() chan JobHandler
func (*BridgeWorker) Close ¶
func (w *BridgeWorker) Close()
func (*BridgeWorker) Context ¶
func (w *BridgeWorker) Context() context.Context
func (*BridgeWorker) FailedChannel ¶
func (w *BridgeWorker) FailedChannel() chan<- JobHandler
func (*BridgeWorker) IsClose ¶
func (w *BridgeWorker) IsClose() bool
func (*BridgeWorker) PoolChannel ¶
func (w *BridgeWorker) PoolChannel() chan<- JobHandler
func (*BridgeWorker) ProcessJob ¶
func (w *BridgeWorker) ProcessJob(job JobHandler) error
func (*BridgeWorker) String ¶
func (w *BridgeWorker) String() string
func (*BridgeWorker) WorkersQueue ¶
func (w *BridgeWorker) WorkersQueue() chan chan JobHandler
type Config ¶
type Config struct { Listeners map[string]*LsConfig `json:"listeners"` SlackUrl string `json:"slackUrl"` ScanUrl string `json:"scanUrl"` NumberOfWorkers int `json:"numberOfWorkers"` MaxQueueSize int `json:"maxQueueSize"` MaxRetry int32 `json:"maxRetry"` BackOff int32 `json:"backoff"` DB *stores.Database `json:"database"` // this field is used for testing purpose Testing bool }
type Controller ¶
type Controller struct { HandlerABIs map[string]*abi.ABI Pool *Pool // contains filtered or unexported fields }
func (*Controller) Close ¶
func (c *Controller) Close()
func (*Controller) LoadABIsFromConfig ¶
func (c *Controller) LoadABIsFromConfig(lsConfig *LsConfig) (err error)
LoadABIsFromConfig loads all ABIPath and add results to Handler.ABI
func (*Controller) Start ¶
func (c *Controller) Start() error
type EmptyTransaction ¶
type EmptyTransaction struct {
// contains filtered or unexported fields
}
func NewEmptyTransaction ¶
func (*EmptyTransaction) GetData ¶
func (b *EmptyTransaction) GetData() []byte
func (*EmptyTransaction) GetFromAddress ¶
func (b *EmptyTransaction) GetFromAddress() string
func (*EmptyTransaction) GetHash ¶
func (b *EmptyTransaction) GetHash() common.Hash
func (*EmptyTransaction) GetToAddress ¶
func (b *EmptyTransaction) GetToAddress() string
func (*EmptyTransaction) GetValue ¶
func (b *EmptyTransaction) GetValue() *big.Int
type Handler ¶
type Handler struct { // Contract Name that will be used to get ABI Contract string `json:"contract"` // Name is method/event name Name string `json:"name"` // ContractAddress is used in callback case ContractAddress string `json:"contractAddress"` // Listener who triggers callback event Listener string `json:"listener"` ABI *abi.ABI `json:"-"` // HandleMethod is used when processing listened job, do nothing if it is empty HandleMethod string `json:"handleMethod"` }
type Job ¶
type JobHandler ¶
type JobHandler interface { GetID() int32 GetType() int GetRetryCount() int GetNextTry() int64 GetMaxTry() int GetData() []byte GetValue() *big.Int GetBackOff() int Process() ([]byte, error) Hash() common.Hash IncreaseRetryCount() UpdateNextTry(int64) GetListener() Listener GetSubscriptionName() string GetTransaction() Transaction FromChainID() *big.Int Save() error Update(string) error CreatedAt() time.Time String() string }
type Listener ¶
type Listener interface { GetName() string GetStore() stores.MainStore Config() *LsConfig Period() time.Duration GetSafeBlockRange() uint64 GetPreventOmissionRange() uint64 GetCurrentBlock() Block GetLatestBlock() (Block, error) GetLatestBlockHeight() (uint64, error) GetBlock(height uint64) (Block, error) GetBlockWithLogs(height uint64) (Block, error) GetChainID() (*big.Int, error) GetReceipt(common.Hash) (*types.Receipt, error) Context() context.Context GetSubscriptions() map[string]*Subscribe UpdateCurrentBlock(block Block) error SaveCurrentBlockToDB() error SaveTransactionsToDB(txs []Transaction) error GetListenHandleJob(subscriptionName string, tx Transaction, eventId string, data []byte) JobHandler SendCallbackJobs(listeners map[string]Listener, subscriptionName string, tx Transaction, inputData []byte) NewJobFromDB(job *models.Job) (JobHandler, error) Start() Close() IsDisabled() bool SetInitHeight(uint64) GetInitHeight() uint64 GetEthClient() utils.EthClient GetTasks() []TaskHandler GetTask(index int) TaskHandler AddTask(handler TaskHandler) IsUpTodate() bool SetPrepareJobChan(chan JobHandler) GetValidatorSign() utils.ISign AddListeners(map[string]Listener) // GetListener returns listener by name GetListener(string) Listener }
type LsConfig ¶
type LsConfig struct { ChainId string `json:"chainId"` Name string `json:"name"` RpcUrl string `json:"rpcUrl"` SlackUrl string `json:"slackUrl"` ScanUrl string `json:"scanUrl"` LoadInterval time.Duration `json:"blockTime"` SafeBlockRange uint64 `json:"safeBlockRange"` PreventOmissionRange uint64 `json:"preventOmissionRange"` FromHeight uint64 `json:"fromHeight"` DomainSeparators map[uint64]string `json:"domainSeparators"` Decimals map[uint64]uint64 `json:"decimals"` TaskInterval time.Duration `json:"taskInterval"` Disabled bool `json:"disabled"` // TODO: apply more ways to get privatekey. such as: PLAINTEXT, KMS, etc. Secret *Secret `json:"secret"` Subscriptions map[string]*Subscribe `json:"subscriptions"` TransactionCheckPeriod time.Duration `json:"transactionCheckPeriod"` Contracts map[string]string `json:"contracts"` ProcessWithinBlocks uint64 `json:"processWithinBlocks"` MaxTasksQuery int `json:"maxTasksQuery"` MinTasksQuery int `json:"minTasksQuery"` // GetLogsBatchSize is used at batch size when calling processBatchLogs GetLogsBatchSize int `json:"getLogsBatchSize"` // MaxProcessingTasks is used to specify max processing tasks allowed while processing tasks // if number of tasks reaches this number, it waits until this number decrease MaxProcessingTasks int `json:"maxProcessingTasks"` }
type Pool ¶
type Pool struct { Workers []Worker // message backoff MaxRetry int32 BackOff int32 // Queue holds a list of worker Queue chan chan JobHandler // JobChan receives new job JobChan chan JobHandler RetryJobChan chan JobHandler FailedJobChan chan JobHandler PrepareJobChan chan JobHandler MaxQueueSize int // contains filtered or unexported fields }
func (*Pool) AddWorkers ¶
func (*Pool) PrepareJob ¶
func (p *Pool) PrepareJob(job JobHandler) error
PrepareJob saves new job to database
func (*Pool) PrepareRetryableJob ¶
func (p *Pool) PrepareRetryableJob(job JobHandler)
type Receipt ¶
type Receipt interface { GetTransaction() Transaction GetStatus() bool GetLogs() []Log }
type Secret ¶
type Secret struct {
Validator *utils.SignMethodConfig `json:"validator"`
}
type TaskHandler ¶
type Transaction ¶
type Worker ¶
type Worker interface { Context() context.Context Close() ProcessJob(job JobHandler) error IsClose() bool Channel() chan JobHandler PoolChannel() chan<- JobHandler WorkersQueue() chan chan JobHandler }
Click to show internal directories.
Click to hide internal directories.