client

package
v0.0.0-...-cc24d94 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2023 License: AGPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	HttpClient = http.Client{
		Transport: &http.Transport{MaxConnsPerHost: constants.MaxConnectionsPerExchange},
	}
)

Functions

func RunMarketParamUpdaterTaskLoop

func RunMarketParamUpdaterTaskLoop(
	ctx context.Context,
	configs types.PricefeedMutableMarketConfigs,
	pricesQueryClient pricetypes.QueryClient,
	logger log.Logger,
	isPastGracePeriod bool,
)

RunMarketParamUpdaterTaskLoop queries all market params from the query client, and then updates the shared, in-memory `PricefeedMutableMarketConfigs` object with the latest market params.

func RunPriceUpdaterTaskLoop

func RunPriceUpdaterTaskLoop(
	ctx context.Context,
	exchangeToMarketPrices types.ExchangeToMarketPrices,
	priceFeedServiceClient api.PriceFeedServiceClient,
	logger log.Logger,
) error

RunPriceUpdaterTaskLoop copies the map of current `exchangeId -> MarketPriceTimestamp`, transforms the map values into a market price update request and sends the request to the socket where the pricefeed server is listening.

Types

type Client

type Client struct {
	// include HealthCheckable to track the health of the daemon.
	daemontypes.HealthCheckable
	// contains filtered or unexported fields
}

Client encapsulates the logic for executing and cleanly stopping all subtasks associated with the pricefeed client daemon. Access to the client's internal state is synchronized. The pricefeed daemon is a job that periodically queries external exchanges and transmits price data to the pricefeed service, which is then used by the application to compute index prices for proposing and validating oracle price updates on the blockchain. Note: price fetchers manage their own subtasks by blocking on their completion on every subtask run. When the price fetcher is stopped, it will wait for all of its own subtasks to complete before returning.

func StartNewClient

func StartNewClient(
	ctx context.Context,
	daemonFlags flags.DaemonFlags,
	appFlags appflags.Flags,
	logger log.Logger,
	grpcClient daemontypes.GrpcClient,
	exchangeIdToQueryConfig map[types.ExchangeId]*types.ExchangeQueryConfig,
	exchangeIdToExchangeDetails map[types.ExchangeId]types.ExchangeQueryDetails,
	subTaskRunner SubTaskRunner,
) (client *Client)

StartNewClient initializes and starts a new pricefeed daemon as a subtask of the calling process. The pricefeed daemon is a job that periodically queries external exchanges and transmits price data to the pricefeed service, which is then used by the application to compute index prices for proposing and validating oracle price updates on the blockchain. Note: the daemon will panic if it fails to start up.

func (*Client) Stop

func (c *Client) Stop()

Stop stops the daemon and all running subtasks. This method is synchronized by the daemonStartup WaitGroup.

type SubTaskRunner

type SubTaskRunner interface {
	StartPriceUpdater(
		c *Client,
		ctx context.Context,
		ticker *time.Ticker,
		stop <-chan bool,
		exchangeToMarketPrices types.ExchangeToMarketPrices,
		priceFeedServiceClient api.PriceFeedServiceClient,
		logger log.Logger,
	)
	StartPriceEncoder(
		exchangeId types.ExchangeId,
		configs types.PricefeedMutableMarketConfigs,
		exchangeToMarketPrices types.ExchangeToMarketPrices,
		logger log.Logger,
		bCh <-chan *price_fetcher.PriceFetcherSubtaskResponse,
	)
	StartPriceFetcher(
		ticker *time.Ticker,
		stop <-chan bool,
		configs types.PricefeedMutableMarketConfigs,
		exchangeQueryConfig types.ExchangeQueryConfig,
		exchangeDetails types.ExchangeQueryDetails,
		queryHandler handler.ExchangeQueryHandler,
		logger log.Logger,
		bCh chan<- *price_fetcher.PriceFetcherSubtaskResponse,
	)
	StartMarketParamUpdater(
		ctx context.Context,
		ticker *time.Ticker,
		stop <-chan bool,
		configs types.PricefeedMutableMarketConfigs,
		pricesQueryClient pricetypes.QueryClient,
		logger log.Logger,
	)
}

SubTaskRunner is the interface for running pricefeed client task functions.

type SubTaskRunnerImpl

type SubTaskRunnerImpl struct{}

SubTaskRunnerImpl is the struct that implements the `SubTaskRunner` interface.

func (*SubTaskRunnerImpl) StartMarketParamUpdater

func (s *SubTaskRunnerImpl) StartMarketParamUpdater(
	ctx context.Context,
	ticker *time.Ticker,
	stop <-chan bool,
	configs types.PricefeedMutableMarketConfigs,
	pricesQueryClient pricetypes.QueryClient,
	logger log.Logger,
)

StartMarketParamUpdater periodically starts a goroutine to update the market parameters that control which markets the daemon queries and how they are queried and computed from each exchange.

func (*SubTaskRunnerImpl) StartPriceEncoder

func (s *SubTaskRunnerImpl) StartPriceEncoder(
	exchangeId types.ExchangeId,
	configs types.PricefeedMutableMarketConfigs,
	exchangeToMarketPrices types.ExchangeToMarketPrices,
	logger log.Logger,
	bCh <-chan *price_fetcher.PriceFetcherSubtaskResponse,
)

StartPriceEncoder continuously reads from a buffered channel, reading encoded API responses for exchange requests and inserting them into an `ExchangeToMarketPrices` cache, performing currency conversions based on the index price of other markets as necessary. StartPriceEncoder reads price fetcher responses from a shared channel, and does not need a ticker or stop signal from the daemon to exit. It marks itself as done in the daemon's wait group when the price fetcher closes the shared channel.

func (*SubTaskRunnerImpl) StartPriceFetcher

func (s *SubTaskRunnerImpl) StartPriceFetcher(
	ticker *time.Ticker,
	stop <-chan bool,
	configs types.PricefeedMutableMarketConfigs,
	exchangeQueryConfig types.ExchangeQueryConfig,
	exchangeDetails types.ExchangeQueryDetails,
	queryHandler handler.ExchangeQueryHandler,
	logger log.Logger,
	bCh chan<- *price_fetcher.PriceFetcherSubtaskResponse,
)

StartPriceFetcher periodically starts goroutines to "fetch" market prices from a specific exchange. Each goroutine does the following: 1) query a single market price from a specific exchange 2) transform response to `MarketPriceTimestamp` 3) send transformed response to a buffered channel that's shared across multiple goroutines NOTE: the subtask response shared channel has a buffer size and goroutines will block if the buffer is full. NOTE: the price fetcher kicks off 1 to n go routines every time the subtask loop runs, but the subtask loop blocks until all go routines are done. This means that these go routines are not tracked by the wait group.

func (*SubTaskRunnerImpl) StartPriceUpdater

func (s *SubTaskRunnerImpl) StartPriceUpdater(
	c *Client,
	ctx context.Context,
	ticker *time.Ticker,
	stop <-chan bool,
	exchangeToMarketPrices types.ExchangeToMarketPrices,
	priceFeedServiceClient api.PriceFeedServiceClient,
	logger log.Logger,
)

StartPriceUpdater periodically runs a task loop to send price updates to the pricefeed server via: 1) Get `MarketPriceTimestamps` for all exchanges in an `ExchangeToMarketPrices` struct. 2) Transform `MarketPriceTimestamps` and exchange ids into an `UpdateMarketPricesRequest` struct. StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group to signal task completion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL