stream

package
v0.0.0-...-f1ff951 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2024 License: MIT Imports: 27 Imported by: 108

Documentation

Index

Constants

View Source
const (
	WebsocketNotAuthenticatedUsingRest = "%v - Websocket not authenticated, using REST\n"
	Ping                               = "ping"
	Pong                               = "pong"
	UnhandledMessage                   = " - Unhandled websocket message: "
)

Websocket functionality list and state consts

Variables

View Source
var (
	ErrWebsocketNotEnabled      = errors.New("websocket not enabled")
	ErrSubscriptionNotFound     = errors.New("subscription not found")
	ErrSubscribedAlready        = errors.New("duplicate subscription")
	ErrSubscriptionFailure      = errors.New("subscription failure")
	ErrSubscriptionNotSupported = errors.New("subscription channel not supported ")
	ErrUnsubscribeFailure       = errors.New("unsubscribe failure")
	ErrChannelInStateAlready    = errors.New("channel already in state")
	ErrAlreadyDisabled          = errors.New("websocket already disabled")
	ErrNotConnected             = errors.New("websocket is not connected")
)

Public websocket errors

Functions

func IsDisconnectionError

func IsDisconnectionError(err error) bool

IsDisconnectionError Determines if the error sent over chan ReadMessageErrors is a disconnection error

func SetupGlobalReporter

func SetupGlobalReporter(r Reporter)

SetupGlobalReporter sets a reporter interface to be used for all exchange requests

Types

type Connection

type Connection interface {
	Dial(*websocket.Dialer, http.Header) error
	ReadMessage() Response
	SendJSONMessage(interface{}) error
	SetupPingHandler(PingHandler)
	GenerateMessageID(highPrecision bool) int64
	SendMessageReturnResponse(signature interface{}, request interface{}) ([]byte, error)
	SendRawMessage(messageType int, message []byte) error
	SetURL(string)
	SetProxy(string)
	GetURL() string
	Shutdown() error
}

Connection defines a streaming services connection

type ConnectionSetup

type ConnectionSetup struct {
	ResponseCheckTimeout    time.Duration
	ResponseMaxLimit        time.Duration
	RateLimit               int64
	URL                     string
	Authenticated           bool
	ConnectionLevelReporter Reporter
}

ConnectionSetup defines variables for an individual stream connection

type FundingData

type FundingData struct {
	Timestamp    time.Time
	CurrencyPair currency.Pair
	AssetType    asset.Item
	Exchange     string
	Amount       float64
	Rate         float64
	Period       int64
	Side         order.Side
}

FundingData defines funding data

type KlineData

type KlineData struct {
	Timestamp  time.Time
	Pair       currency.Pair
	AssetType  asset.Item
	Exchange   string
	StartTime  time.Time
	CloseTime  time.Time
	Interval   string
	OpenPrice  float64
	ClosePrice float64
	HighPrice  float64
	LowPrice   float64
	Volume     float64
}

KlineData defines kline feed

type Match

type Match struct {
	// contains filtered or unexported fields
}

Match is a distributed subtype that handles the matching of requests and responses in a timely manner, reducing the need to differentiate between connections. Stream systems fan in all incoming payloads to one routine for processing.

func NewMatch

func NewMatch() *Match

NewMatch returns a new Match

func (*Match) Incoming

func (m *Match) Incoming(signature interface{}) bool

Incoming matches with request, disregarding the returned payload

func (*Match) IncomingWithData

func (m *Match) IncomingWithData(signature interface{}, data []byte) bool

IncomingWithData matches with requests and takes in the returned payload, to be processed outside of a stream processing routine and returns true if a handler was found

func (*Match) Set

func (m *Match) Set(signature interface{}) (Matcher, error)

Set the signature response channel for incoming data

type Matcher

type Matcher struct {
	C chan []byte
	// contains filtered or unexported fields
}

Matcher defines a payload matching return mechanism

func (*Matcher) Cleanup

func (m *Matcher) Cleanup()

Cleanup closes underlying channel and deletes signature from map

type PingHandler

type PingHandler struct {
	Websocket         bool
	UseGorillaHandler bool
	MessageType       int
	Message           []byte
	Delay             time.Duration
}

PingHandler container for ping handler settings

type Reporter

type Reporter interface {
	Latency(name string, message []byte, t time.Duration)
}

Reporter interface groups observability functionality over Websocket request latency.

type Response

type Response struct {
	Type int
	Raw  []byte
}

Response defines generalised data from the stream connection

type UnhandledMessageWarning

type UnhandledMessageWarning struct {
	Message string
}

UnhandledMessageWarning defines a container for unhandled message warnings

type Websocket

type Websocket struct {
	Subscribe   chan []subscription.Subscription
	Unsubscribe chan []subscription.Subscription

	// Subscriber function for package defined websocket subscriber
	// functionality
	Subscriber func([]subscription.Subscription) error
	// Unsubscriber function for packaged defined websocket unsubscriber
	// functionality
	Unsubscriber func([]subscription.Subscription) error
	// GenerateSubs function for package defined websocket generate
	// subscriptions functionality
	GenerateSubs func() ([]subscription.Subscription, error)

	DataHandler chan interface{}
	ToRoutine   chan interface{}

	Match *Match

	// shutdown synchronises shutdown event across routines
	ShutdownC chan struct{}
	Wg        *sync.WaitGroup

	// Orderbook is a local buffer of orderbooks
	Orderbook buffer.Orderbook

	// Trade is a notifier of occurring trades
	Trade trade.Trade

	// Fills is a notifier of occurring fills
	Fills fill.Fills

	// trafficAlert monitors if there is a halt in traffic throughput
	TrafficAlert chan struct{}
	// ReadMessageErrors will received all errors from ws.ReadMessage() and
	// verify if its a disconnection
	ReadMessageErrors chan error

	// Standard stream connection
	Conn Connection
	// Authenticated stream connection
	AuthConn Connection

	// Latency reporter
	ExchangeLevelReporter Reporter

	// MaxSubScriptionsPerConnection defines the maximum number of
	// subscriptions per connection that is allowed by the exchange.
	MaxSubscriptionsPerConnection int
	// contains filtered or unexported fields
}

Websocket defines a return type for websocket connections via the interface wrapper for routine processing

func NewWebsocket

func NewWebsocket() *Websocket

NewWebsocket initialises the websocket struct

func (*Websocket) AddSubscription

func (w *Websocket) AddSubscription(c *subscription.Subscription) error

AddSubscription adds a subscription to the subscription lists Unlike AddSubscriptions this method will error if the subscription already exists

func (*Websocket) AddSuccessfulSubscriptions

func (w *Websocket) AddSuccessfulSubscriptions(channels ...subscription.Subscription)

AddSuccessfulSubscriptions adds subscriptions to the subscription lists that has been successfully subscribed

func (*Websocket) CanUseAuthenticatedEndpoints

func (w *Websocket) CanUseAuthenticatedEndpoints() bool

CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in a thread safe manner

func (*Websocket) CanUseAuthenticatedWebsocketForWrapper

func (w *Websocket) CanUseAuthenticatedWebsocketForWrapper() bool

CanUseAuthenticatedWebsocketForWrapper Handles a common check to verify whether a wrapper can use an authenticated websocket endpoint

func (*Websocket) Connect

func (w *Websocket) Connect() error

Connect initiates a websocket connection by using a package defined connection function

func (*Websocket) Disable

func (w *Websocket) Disable() error

Disable disables the exchange websocket protocol Note that connectionMonitor will be responsible for shutting down the websocket after disabling

func (*Websocket) Enable

func (w *Websocket) Enable() error

Enable enables the exchange websocket protocol

func (*Websocket) FlushChannels

func (w *Websocket) FlushChannels() error

FlushChannels flushes channel subscriptions when there is a pair/asset change

func (*Websocket) GetChannelDifference

func (w *Websocket) GetChannelDifference(genSubs []subscription.Subscription) (sub, unsub []subscription.Subscription)

GetChannelDifference finds the difference between the subscribed channels and the new subscription list when pairs are disabled or enabled.

func (*Websocket) GetName

func (w *Websocket) GetName() string

GetName returns exchange name

func (*Websocket) GetProxyAddress

func (w *Websocket) GetProxyAddress() string

GetProxyAddress returns the current websocket proxy

func (*Websocket) GetSubscription

func (w *Websocket) GetSubscription(key any) *subscription.Subscription

GetSubscription returns a pointer to a copy of the subscription at the key provided returns nil if no subscription is at that key or the key is nil

func (*Websocket) GetSubscriptions

func (w *Websocket) GetSubscriptions() []subscription.Subscription

GetSubscriptions returns a new slice of the subscriptions

func (*Websocket) GetWebsocketURL

func (w *Websocket) GetWebsocketURL() string

GetWebsocketURL returns the running websocket URL

func (*Websocket) IsConnected

func (w *Websocket) IsConnected() bool

IsConnected returns whether the websocket is connected

func (*Websocket) IsConnecting

func (w *Websocket) IsConnecting() bool

IsConnecting returns whether the websocket is connecting

func (*Websocket) IsConnectionMonitorRunning

func (w *Websocket) IsConnectionMonitorRunning() bool

IsConnectionMonitorRunning returns status of connection monitor

func (*Websocket) IsDataMonitorRunning

func (w *Websocket) IsDataMonitorRunning() bool

IsDataMonitorRunning returns status of data monitor

func (*Websocket) IsEnabled

func (w *Websocket) IsEnabled() bool

IsEnabled returns whether the websocket is enabled

func (*Websocket) IsInitialised

func (w *Websocket) IsInitialised() bool

IsInitialised returns whether the websocket has been Setup() already

func (*Websocket) IsTrafficMonitorRunning

func (w *Websocket) IsTrafficMonitorRunning() bool

IsTrafficMonitorRunning returns status of the traffic monitor

func (*Websocket) RemoveSubscriptions

func (w *Websocket) RemoveSubscriptions(channels ...subscription.Subscription)

RemoveSubscriptions removes subscriptions from the subscription list

func (*Websocket) ResubscribeToChannel

func (w *Websocket) ResubscribeToChannel(subscribedChannel *subscription.Subscription) error

ResubscribeToChannel resubscribes to channel

func (*Websocket) SetCanUseAuthenticatedEndpoints

func (w *Websocket) SetCanUseAuthenticatedEndpoints(b bool)

SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in a thread safe manner

func (*Websocket) SetProxyAddress

func (w *Websocket) SetProxyAddress(proxyAddr string) error

SetProxyAddress sets websocket proxy address

func (*Websocket) SetSubscriptionState

func (w *Websocket) SetSubscriptionState(c *subscription.Subscription, state subscription.State) error

SetSubscriptionState sets an existing subscription state returns an error if the subscription is not found, or the new state is already set

func (*Websocket) SetWebsocketURL

func (w *Websocket) SetWebsocketURL(url string, auth, reconnect bool) error

SetWebsocketURL sets websocket URL and can refresh underlying connections

func (*Websocket) Setup

func (w *Websocket) Setup(s *WebsocketSetup) error

Setup sets main variables for websocket connection

func (*Websocket) SetupNewConnection

func (w *Websocket) SetupNewConnection(c ConnectionSetup) error

SetupNewConnection sets up an auth or unauth streaming connection

func (*Websocket) Shutdown

func (w *Websocket) Shutdown() error

Shutdown attempts to shut down a websocket connection and associated routines by using a package defined shutdown function

func (*Websocket) SubscribeToChannels

func (w *Websocket) SubscribeToChannels(channels []subscription.Subscription) error

SubscribeToChannels appends supplied channels to channelsToSubscribe

func (*Websocket) UnsubscribeChannels

func (w *Websocket) UnsubscribeChannels(channels []subscription.Subscription) error

UnsubscribeChannels unsubscribes from a websocket channel

type WebsocketConnection

type WebsocketConnection struct {
	Verbose bool

	RateLimit    int64
	ExchangeName string
	URL          string
	ProxyURL     string
	Wg           *sync.WaitGroup
	Connection   *websocket.Conn
	ShutdownC    chan struct{}

	Match            *Match
	ResponseMaxLimit time.Duration
	Traffic          chan struct{}

	Reporter Reporter
	// contains filtered or unexported fields
}

WebsocketConnection contains all the data needed to send a message to a WS connection

func (*WebsocketConnection) Dial

func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header) error

Dial sets proxy urls and then connects to the websocket

func (*WebsocketConnection) GenerateMessageID

func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64

GenerateMessageID Creates a random message ID

func (*WebsocketConnection) GetURL

func (w *WebsocketConnection) GetURL() string

GetURL returns the connection URL

func (*WebsocketConnection) IsConnected

func (w *WebsocketConnection) IsConnected() bool

IsConnected exposes websocket connection status

func (*WebsocketConnection) ReadMessage

func (w *WebsocketConnection) ReadMessage() Response

ReadMessage reads messages, can handle text, gzip and binary

func (*WebsocketConnection) SendJSONMessage

func (w *WebsocketConnection) SendJSONMessage(data interface{}) error

SendJSONMessage sends a JSON encoded message over the connection

func (*WebsocketConnection) SendMessageReturnResponse

func (w *WebsocketConnection) SendMessageReturnResponse(signature, request interface{}) ([]byte, error)

SendMessageReturnResponse will send a WS message to the connection and wait for response

func (*WebsocketConnection) SendRawMessage

func (w *WebsocketConnection) SendRawMessage(messageType int, message []byte) error

SendRawMessage sends a message over the connection without JSON encoding it

func (*WebsocketConnection) SetProxy

func (w *WebsocketConnection) SetProxy(proxy string)

SetProxy sets connection proxy

func (*WebsocketConnection) SetURL

func (w *WebsocketConnection) SetURL(url string)

SetURL sets connection URL

func (*WebsocketConnection) SetupPingHandler

func (w *WebsocketConnection) SetupPingHandler(handler PingHandler)

SetupPingHandler will automatically send ping or pong messages based on WebsocketPingHandler configuration

func (*WebsocketConnection) Shutdown

func (w *WebsocketConnection) Shutdown() error

Shutdown shuts down and closes specific connection

type WebsocketPositionUpdated

type WebsocketPositionUpdated struct {
	Timestamp time.Time
	Pair      currency.Pair
	AssetType asset.Item
	Exchange  string
}

WebsocketPositionUpdated reflects a change in orders/contracts on an exchange

type WebsocketSetup

type WebsocketSetup struct {
	ExchangeConfig        *config.Exchange
	DefaultURL            string
	RunningURL            string
	RunningURLAuth        string
	Connector             func() error
	Subscriber            func([]subscription.Subscription) error
	Unsubscriber          func([]subscription.Subscription) error
	GenerateSubscriptions func() ([]subscription.Subscription, error)
	Features              *protocol.Features

	// Local orderbook buffer config values
	OrderbookBufferConfig buffer.Config

	TradeFeed bool

	// Fill data config values
	FillsFeed bool

	// MaxWebsocketSubscriptionsPerConnection defines the maximum number of
	// subscriptions per connection that is allowed by the exchange.
	MaxWebsocketSubscriptionsPerConnection int
}

WebsocketSetup defines variables for setting up a websocket connection

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL