chainlink: Index | Files | Directories

package services

import ""

Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.


The Application is the main component used for starting and stopping the Chainlink node.


The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.


The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.


The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.


Package Files

balance_monitor.go doc.go gas_updater.go head_tracker.go job_subscriber.go reaper.go run_executor.go run_manager.go run_queue.go runs.go scheduler.go sleeper_task.go subscription.go validators.go

func ExpectedRecurringScheduleJobError Uses

func ExpectedRecurringScheduleJobError(err error) bool

func NewRun Uses

func NewRun(
    job *models.JobSpec,
    initiator *models.Initiator,
    currentHeight *big.Int,
    runRequest *models.RunRequest,
    config orm.ConfigReader,
    orm *orm.ORM,
    now time.Time) (*models.JobRun, []*adapters.PipelineAdapter)

NewRun returns a complete run from a JobSpec

func ReceiveLogRequest Uses

func ReceiveLogRequest(runManager RunManager, le models.LogRequest)

ReceiveLogRequest parses the log and runs the job it indicated by its GetJobSpecID method

func ValidateBridgeType Uses

func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeType checks that the bridge type doesn't have a duplicate or invalid name or invalid url

func ValidateBridgeTypeNotExist Uses

func ValidateBridgeTypeNotExist(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeTypeNotExist checks that a bridge has not already been created

func ValidateExternalInitiator Uses

func ValidateExternalInitiator(
    exi *models.ExternalInitiatorRequest,
    store *store.Store,
) error

ValidateExternalInitiator checks whether External Initiator parameters are safe for processing.

func ValidateInitiator Uses

func ValidateInitiator(i models.Initiator, j models.JobSpec, store *store.Store) error

ValidateInitiator checks the Initiator for any application logic errors.

func ValidateJob Uses

func ValidateJob(j models.JobSpec, store *store.Store) error

ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.

func ValidateRun Uses

func ValidateRun(run *models.JobRun, contractCost *assets.Link)

ValidateRun ensures that a run's initial preconditions have been met

func ValidateServiceAgreement Uses

func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error

ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.

type BalanceMonitor Uses

type BalanceMonitor interface {
    GetEthBalance(gethCommon.Address) *assets.Eth

BalanceMonitor checks the balance for each key on every new head

func NewBalanceMonitor Uses

func NewBalanceMonitor(store *store.Store, gethClientWrapper store.GethClientWrapper) BalanceMonitor

NewBalanceMonitor returns a new balanceMonitor

type Cron Uses

type Cron interface {
    Stop() context.Context
    AddFunc(string, func()) (cron.EntryID, error)

type GasUpdater Uses

type GasUpdater interface {
    RollingBlockHistory() []models.Block

GasUpdater listens for new heads and updates the base gas price dynamically based on the configured percentile of gas prices in that block

func NewGasUpdater Uses

func NewGasUpdater(store *store.Store) GasUpdater

NewGasUpdater returns a new gas updater.

type HeadTracker Uses

type HeadTracker struct {
    // contains filtered or unexported fields

HeadTracker holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.

func NewHeadTracker Uses

func NewHeadTracker(store *strpkg.Store, callbacks []strpkg.HeadTrackable, sleepers ...utils.Sleeper) *HeadTracker

NewHeadTracker instantiates a new HeadTracker using the orm to persist new block numbers. Can be passed in an optional sleeper object that will dictate how often it tries to reconnect.

func (*HeadTracker) Connected Uses

func (ht *HeadTracker) Connected() bool

Connected returns whether or not this HeadTracker is connected.

func (*HeadTracker) GetChainWithBackfill Uses

func (ht *HeadTracker) GetChainWithBackfill(ctx context.Context, head models.Head, depth uint) (models.Head, error)

GetChainWithBackfill returns a chain of the given length, backfilling any heads that may be missing from the database

func (*HeadTracker) HighestSeenHead Uses

func (ht *HeadTracker) HighestSeenHead() *models.Head

HighestSeenHead returns the block header with the highest number that has been seen, or nil

func (*HeadTracker) Save Uses

func (ht *HeadTracker) Save(h models.Head) error

Save updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.

func (*HeadTracker) Start Uses

func (ht *HeadTracker) Start() error

Start retrieves the last persisted block number from the HeadTracker, subscribes to new heads, and if successful fires Connect on the HeadTrackable argument.

func (*HeadTracker) Stop Uses

func (ht *HeadTracker) Stop() error

Stop unsubscribes all connections and fires Disconnect.

type InitiatorSubscription Uses

type InitiatorSubscription struct {

    Initiator models.Initiator
    // contains filtered or unexported fields

InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the callback.

func NewInitiatorSubscription Uses

func NewInitiatorSubscription(
    initr models.Initiator,
    client eth.Client,
    runManager RunManager,
    nextHead *big.Int,
    callback func(RunManager, models.LogRequest),
) (InitiatorSubscription, error)

NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.

type JobSubscriber Uses

type JobSubscriber interface {
    AddJob(job models.JobSpec, bn *models.Head) error
    RemoveJob(ID *models.ID) error
    Jobs() []models.JobSpec
    Stop() error

JobSubscriber listens for push notifications of event logs from the ethereum node's websocket for specific jobs by subscribing to ethLogs.

func NewJobSubscriber Uses

func NewJobSubscriber(store *store.Store, runManager RunManager) JobSubscriber

NewJobSubscriber returns a new job subscriber.

type JobSubscription Uses

type JobSubscription struct {
    Job models.JobSpec
    // contains filtered or unexported fields

JobSubscription listens to event logs being pushed from the Ethereum Node to a job.

func StartJobSubscription Uses

func StartJobSubscription(job models.JobSpec, head *models.Head, store *strpkg.Store, runManager RunManager) (JobSubscription, error)

StartJobSubscription constructs a JobSubscription which listens for and tracks event logs corresponding to the specified job. Ignores any errors if there is at least one successful subscription to an initiator log.

func (JobSubscription) Unsubscribe Uses

func (js JobSubscription) Unsubscribe()

Unsubscribe stops the subscription and cleans up associated resources.

type ManagedSubscription Uses

type ManagedSubscription struct {
    // contains filtered or unexported fields

ManagedSubscription encapsulates the connecting, backfilling, and clean up of an ethereum node subscription.

func NewManagedSubscription Uses

func NewManagedSubscription(
    logSubscriber eth.LogSubscriber,
    filter ethereum.FilterQuery,
    callback func(models.Log),
) (*ManagedSubscription, error)

NewManagedSubscription subscribes to the ethereum node with the passed filter and delegates incoming logs to callback.

func (ManagedSubscription) Unsubscribe Uses

func (sub ManagedSubscription) Unsubscribe()

Unsubscribe closes channels and cleans up resources.

type OneTime Uses

type OneTime struct {
    Store      *store.Store
    Clock      utils.Afterer
    RunManager RunManager
    // contains filtered or unexported fields

OneTime represents runs that are to be executed only once.

func (*OneTime) AddJob Uses

func (ot *OneTime) AddJob(job models.JobSpec)

AddJob runs the job at the time specified for the "runat" initiator.

func (*OneTime) RunJobAt Uses

func (ot *OneTime) RunJobAt(initiator models.Initiator, job models.JobSpec)

RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.

func (*OneTime) Start Uses

func (ot *OneTime) Start() error

Start allocates a channel for the "done" field with an empty struct.

func (*OneTime) Stop Uses

func (ot *OneTime) Stop()

Stop closes the "done" field's channel.

type RawHead Uses

type RawHead struct {
    Number     string
    Hash       string
    ParentHash string
    Timestamp  string

type Recurring Uses

type Recurring struct {
    Cron  Cron
    Clock utils.Nower
    // contains filtered or unexported fields

Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().

func NewRecurring Uses

func NewRecurring(runManager RunManager) *Recurring

NewRecurring create a new instance of Recurring, ready to use.

func (*Recurring) AddJob Uses

func (r *Recurring) AddJob(job models.JobSpec)

AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.

func (*Recurring) Start Uses

func (r *Recurring) Start() error

Start for Recurring types executes tasks with a "cron" initiator based on the configured schedule for the run.

func (*Recurring) Stop Uses

func (r *Recurring) Stop()

Stop stops the cron scheduler and waits for running jobs to finish.

type RecurringScheduleJobError Uses

type RecurringScheduleJobError struct {
    // contains filtered or unexported fields

RecurringScheduleJobError contains the field for the error message.

func (RecurringScheduleJobError) Error Uses

func (err RecurringScheduleJobError) Error() string

Error returns the error message for the run.

type RunExecutor Uses

type RunExecutor interface {
    Execute(*models.ID) error

RunExecutor handles the actual running of the job tasks

func NewRunExecutor Uses

func NewRunExecutor(store *store.Store, statsPusher synchronization.StatsPusher) RunExecutor

NewRunExecutor initializes a RunExecutor.

type RunManager Uses

type RunManager interface {
        jobSpecID *models.ID,
        initiator *models.Initiator,
        creationHeight *big.Int,
        runRequest *models.RunRequest) (*models.JobRun, error)
        jobSpecID *models.ID,
        initiator models.Initiator,
        err error) (*models.JobRun, error)
        runID *models.ID,
        input models.BridgeRunResult) error
    Cancel(runID *models.ID) (*models.JobRun, error)

    ResumeAllInProgress() error
    ResumeAllPendingNextBlock(currentBlockHeight *big.Int) error
    ResumeAllPendingConnection() error

RunManager supplies methods for queueing, resuming and cancelling jobs in the RunQueue

func NewRunManager Uses

func NewRunManager(
    runQueue RunQueue,
    config orm.ConfigReader,
    orm *orm.ORM,
    statsPusher synchronization.StatsPusher,
    txManager store.TxManager,
    clock utils.AfterNower) RunManager

NewRunManager returns a new job manager

type RunQueue Uses

type RunQueue interface {
    Start() error

    WorkerCount() int

RunQueue safely handles coordinating job runs.

func NewRunQueue Uses

func NewRunQueue(runExecutor RunExecutor) RunQueue

NewRunQueue initializes a RunQueue.

type Scheduler Uses

type Scheduler struct {
    Recurring *Recurring
    OneTime   *OneTime
    // contains filtered or unexported fields

Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.

func NewScheduler Uses

func NewScheduler(store *store.Store, runManager RunManager) *Scheduler

NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.

func (*Scheduler) AddJob Uses

func (s *Scheduler) AddJob(job models.JobSpec)

AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.

func (*Scheduler) Start Uses

func (s *Scheduler) Start() error

Start checks to ensure the Scheduler has not already started, calls the Start function for both Recurring and OneTime types, sets the started field to true, and adds jobs relevant to its initiator ("cron" and "runat").

func (*Scheduler) Stop Uses

func (s *Scheduler) Stop()

Stop is the governing function for both Recurring and OneTime Stop function. Sets the started field to false.

type SleeperTask Uses

type SleeperTask interface {
    Stop() error

SleeperTask represents a task that waits in the background to process some work.

func NewSleeperTask Uses

func NewSleeperTask(worker Worker) SleeperTask

NewSleeperTask takes a worker and returns a SleeperTask.

SleeperTask is guaranteed to call Work on the worker at least once for every WakeUp call. If the Worker is busy when WakeUp is called, the Worker will be called again immediately after it is finished. For this reason you should take care to make sure that Worker is idempotent. WakeUp does not block.

func NewStoreReaper Uses

func NewStoreReaper(store *store.Store) SleeperTask

NewStoreReaper creates a reaper that cleans stale objects from the store.

type Unsubscriber Uses

type Unsubscriber interface {

Unsubscriber is the interface for all subscriptions, allowing one to unsubscribe.

type Worker Uses

type Worker interface {

Worker is a simple interface that represents some work to do repeatedly


signatures/cryptotestpackage cryptotest provides convenience functions for kyber-based APIs.
signatures/ethdssPackage ethdss implements the Distributed Schnorr Signature protocol from the ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
signatures/ethschnorrPackage ethschnorr implements a version of the Schnorr signature which is ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
signatures/secp256k1Package secp256k1 is an implementation of the kyber.{Group,Point,Scalar} ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
vrfNumbers are deterministically generated from seeds and a secret key, and are statistically indistinguishable from uniform sampling from {0,...,2**256-1}, to computationally-bounded observers who know the seeds, don't know the key, and only see the generated numbers.

Package services imports 32 packages (graph) and is imported by 2 packages. Updated 2020-07-15. Refresh now. Tools for package owners.