publisher

package
v0.0.0-...-bf1eb11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2023 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WRITE_WAIT       = 10 * time.Second
	PONG_WAIT        = 60 * time.Second
	PING_PERIOD      = (PONG_WAIT * 9) / 10
	MAX_MESSAGE_SIZE = 512
)
View Source
const (
	LEVEL_2_TYPE_SNAPSHOT = Level2Type("snapshot")
	LEVEL_2_TYPE_UPDATE   = Level2Type("l2update")

	CHANNEL_TICKER  = Channel("ticker")
	CHANNEL_MATCH   = Channel("match")
	CHANNEL_LEVEL_2 = Channel("level2")
	CHANNEL_FUNDS   = Channel("funds")
	CHANNEL_ORDER   = Channel("order")
)
View Source
const (
	ORDER_BOOK_L2_SNAPSHOT_KEY_PREFIX   = "order_book_level2_snapshot_"
	ORDER_BOOK_FULL_SNAPSHOT_KEY_PREFIX = "order_book_full_snapshot_"
)

Variables

Functions

func StartServer

func StartServer()

Types

type Channel

type Channel string

func (Channel) Format

func (t Channel) Format(productId string, userId int64) string

func (Channel) FormatWithProductId

func (t Channel) FormatWithProductId(productId string) string

func (Channel) FormatWithUserId

func (t Channel) FormatWithUserId(userId int64) string

type Client

type Client struct {
	Id         int64
	Conn       *websocket.Conn
	WriteCh    chan interface{}
	L2ChangeCh chan *Level2Change
	Sub        *Subscription
	Channels   map[string]struct{}
	Mu         sync.Mutex
}

func NewClient

func NewClient(conn *websocket.Conn, sub *Subscription) *Client

func (*Client) Close

func (c *Client) Close()

func (*Client) OnMessage

func (c *Client) OnMessage(req *Request)

func (*Client) OnSub

func (c *Client) OnSub(currencyIds []string, productIds []string, channels []string, token string)

func (*Client) OnUnSub

func (c *Client) OnUnSub(currencyIds []string, productIds []string, channels []string, token string)

func (*Client) RunL2ChangeWriter

func (c *Client) RunL2ChangeWriter(ctx context.Context)

func (*Client) RunReader

func (c *Client) RunReader()

func (*Client) RunWriter

func (c *Client) RunWriter()

func (*Client) StartServe

func (c *Client) StartServe()

func (*Client) Subscribe

func (c *Client) Subscribe(channel string) bool

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(channel string)

type FundsMessage

type FundsMessage struct {
	Type      string `json:"type"`
	Sequence  int64  `json:"sequence"`
	UserId    string `json:"userId"`
	Currency  string `json:"currencyCode"`
	Available string `json:"available"`
	Hold      string `json:"hold"`
}

type Level2Change

type Level2Change struct {
	Seq       int64
	ProductId string
	Side      string
	Price     string
	Size      string
}

type Level2SnapshotMessage

type Level2SnapshotMessage struct {
	Type      Level2Type       `json:"type"`
	ProductId string           `json:"productId"`
	Bids      [][3]interface{} `json:"bids"`
	Asks      [][3]interface{} `json:"asks"`
}

type Level2Type

type Level2Type string

type Level2UpdateMessage

type Level2UpdateMessage struct {
	Type      Level2Type       `json:"type"`
	ProductId string           `json:"procduct_id"`
	Changes   [][3]interface{} `json:"changes"` // ["buy", "6500.09", "0.847023376"]
}

type LogOffset

type LogOffset struct {
	Log    interface{}
	Offset int64
}

type MatchMessage

type MatchMessage struct {
	Type         string `json:"type"`
	TradeId      int64  `json:"tradeId"`
	Sequence     int64  `json:"sequence"`
	Time         string `json:"time"`
	ProductId    string `json:"productId"`
	Price        string `json:"price"`
	Size         string `json:"size"`
	MakerOrderId string `json:"makerOrderId"`
	TakerOrderId string `json:"takerOrderId"`
	Side         string `json:"side"`
}

type MatchStream

type MatchStream struct {
	ProductId string
	Sub       *Subscription
	BestBid   decimal.Decimal
	BestAsk   decimal.Decimal
	Tick24h   *entities.Tick
	Tick30d   *entities.Tick
	LogReader matching.LogReader
}

func NewMatchStream

func NewMatchStream(productId string, sub *Subscription, logReader matching.LogReader) *MatchStream

func (*MatchStream) OnDoneLog

func (s *MatchStream) OnDoneLog(log *matching.DoneLog, offset int64)

func (*MatchStream) OnMatchLog

func (s *MatchStream) OnMatchLog(log *matching.MatchLog, offset int64)

func (*MatchStream) OnOpenLOg

func (s *MatchStream) OnOpenLOg(log *matching.OpenLog, offset int64)

func (*MatchStream) Start

func (s *MatchStream) Start()

type OrderBook

type OrderBook struct {
	ProductId string
	Seq       int64
	LogOffset int64
	LogSeq    int64
	Depths    map[entities.Side]*treemap.Map
	Orders    map[int64]*matching.BookOrder
}

func NewOrderBook

func NewOrderBook(productId string) *OrderBook

func (*OrderBook) Restore

func (s *OrderBook) Restore(snapshot *OrderBookFullSnapshot)

func (*OrderBook) SaveOrder

func (s *OrderBook) SaveOrder(logOffset, logSeq int64, orderId int64, newSize, price decimal.Decimal, side entities.Side) *Level2Change

func (*OrderBook) SnapshotFull

func (s *OrderBook) SnapshotFull() *OrderBookFullSnapshot

func (*OrderBook) SnapshotLevel2

func (s *OrderBook) SnapshotLevel2(levels int) *OrderBookLevel2Snapshot

type OrderBookFullSnapshot

type OrderBookFullSnapshot struct {
	ProductId string
	Seq       int64
	LogOffset int64
	LogSeq    int64
	Orders    []matching.BookOrder
}

type OrderBookLevel2Snapshot

type OrderBookLevel2Snapshot struct {
	ProductId string
	Seq       int64
	Asks      [][3]interface{}
	Bids      [][3]interface{}
}

type OrderBookStream

type OrderBookStream struct {
	ProductId  string
	LogReader  matching.LogReader
	LogCh      chan *LogOffset
	OrderBook  *OrderBook
	Sub        *Subscription
	SnapshotCh chan interface{}
}

func NewOrderBookStream

func NewOrderBookStream(productId string, sub *Subscription, logReader matching.LogReader) *OrderBookStream

func (*OrderBookStream) OnDoneLog

func (s *OrderBookStream) OnDoneLog(log *matching.DoneLog, offset int64)

func (*OrderBookStream) OnMatchLog

func (s *OrderBookStream) OnMatchLog(log *matching.MatchLog, offset int64)

func (*OrderBookStream) OnOpenLOg

func (s *OrderBookStream) OnOpenLOg(log *matching.OpenLog, offset int64)

func (*OrderBookStream) Start

func (s *OrderBookStream) Start()

type OrderMessage

type OrderMessage struct {
	UserId        int64  `json:"userId"`
	Type          string `json:"type"`
	Sequence      int64  `json:"sequence"`
	Id            string `json:"id"`
	Price         string `json:"price"`
	Size          string `json:"size"`
	Funds         string `json:"funds"`
	ProductId     string `json:"productId"`
	Side          string `json:"side"`
	OrderType     string `json:"orderType"`
	CreatedAt     string `json:"createdAt"`
	FillFees      string `json:"fillFees"`
	FilledSize    string `json:"filledSize"`
	ExecutedValue string `json:"executedValue"`
	Status        string `json:"status"`
	Settled       bool   `json:"settled"`
}

type PriceLevel

type PriceLevel struct {
	Price      decimal.Decimal
	Size       decimal.Decimal
	OrderCount int64
}

type RedisSnapshotStore

type RedisSnapshotStore struct {
	RedisClient *redis.Client
}

Redis snapshotstore used to manage snapshots

func (*RedisSnapshotStore) GetLastFull

func (s *RedisSnapshotStore) GetLastFull(productId string) (*OrderBookFullSnapshot, error)

func (*RedisSnapshotStore) GetLastLevel2

func (s *RedisSnapshotStore) GetLastLevel2(productId string) (*OrderBookLevel2Snapshot, error)

func (*RedisSnapshotStore) StoreFull

func (s *RedisSnapshotStore) StoreFull(productId string, snapshot *OrderBookFullSnapshot) error

func (*RedisSnapshotStore) StoreLevel2

func (s *RedisSnapshotStore) StoreLevel2(productId string, snapshot *OrderBookLevel2Snapshot) error

type RedisStream

type RedisStream struct {
	Sub   *Subscription
	Mutex sync.Mutex
}

func NewRedisStream

func NewRedisStream(sub *Subscription) *RedisStream

func (*RedisStream) Start

func (r *RedisStream) Start()

type Request

type Request struct {
	Type        string   `json:"type"`
	ProductIds  []string `json:"product_ids"`
	CurrencyIds []string `json:"currency_ids"`
	Channels    []string `json"channels"`
	Token       string   `json:"token"`
}

type Response

type Response struct {
	Type       string   `json:"type"`
	ProductIds []string `json:"product_ids"`
	Channels   []string `json:"channels"`
	Token      string   `json:"token"`
}

type Server

type Server struct {
	Addr string
	Path string
	Sub  *Subscription
}

func NewServer

func NewServer(addr, path string, sub *Subscription) *Server

func (*Server) Run

func (s *Server) Run()

func (*Server) Ws

func (s *Server) Ws(c *gin.Context)

type Subscription

type Subscription struct {
	Subscribers map[string]map[int64]*Client
	Mu          sync.RWMutex
}

func NewSubscription

func NewSubscription() *Subscription

func (*Subscription) Publish

func (s *Subscription) Publish(channel string, msg interface{})

func (*Subscription) Subscribe

func (s *Subscription) Subscribe(channel string, client *Client) bool

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe(channel string, client *Client) bool

type TickerMessage

type TickerMessage struct {
	Type      string `json:"type"`
	TradeId   int64  `json:"tradeId"`
	Sequence  int64  `json:"sequence"`
	Time      string `json:"time"`
	ProductId string `json:"productId"`
	Price     string `json:"price"`
	Side      string `json:"side"`
	LastSize  string `json:"lastSize"`
	BestBid   string `json:"bestBid"`
	BestAsk   string `json:"bestAsk"`
	Volume24h string `json:"volume24h"`
	Volume30d string `json:"volume30d"`
	Low24h    string `json:"low24h"`
	Open24h   string `json:"open24h"`
}

type TickerStream

type TickerStream struct {
	ProductId      string
	Sub            *Subscription
	BestBid        decimal.Decimal
	BestAsk        decimal.Decimal
	LogReader      matching.LogReader
	LastTickerTime int64
}

func NewTickerStream

func NewTickerStream(productId string, sub *Subscription, logReader matching.LogReader) *TickerStream

func (*TickerStream) OnDoneLog

func (s *TickerStream) OnDoneLog(log *matching.DoneLog, offset int64)

func (*TickerStream) OnMatchLog

func (s *TickerStream) OnMatchLog(log *matching.MatchLog, offset int64)

func (*TickerStream) OnOpenLOg

func (s *TickerStream) OnOpenLOg(log *matching.OpenLog, offset int64)

func (*TickerStream) Start

func (s *TickerStream) Start()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL