stubborn

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2021 License: Apache-2.0 Imports: 17 Imported by: 3

README

Stubborn

Stubborn is a framework that's written in Go for clients that uses duplex protocols such as WebSocket.

Status

The Stubborn framework provides a complete and tested persistent connection for the duplex protocols using various types of clients.

Installation
go get github.com/filinvadim/stubborn
Stubborn advantages
  1. Agnostic to any clients by DuplexConnector interface.
  2. Therefore, it's fully testable using mock that implements three methods:
    Close() error
    ReadMessage() (messageType int, p []byte, err error)
    WriteMessage(messageType int, data []byte) error
    
  3. Persistence. Stubborn framework won't stop fetching data even if panic() occurs (that's useful if for example Gorilla WebSocket was used as client).
  4. Fairly configurable for any API requirements.
  5. No external dependencies whatsoever.
Stubborn disadvantages
  1. Framework wasn't benchmark tested but expected average speed.
Protocol examples
Crypto exchange pseudocode examples
  • Binance
func BinanceStream(key, secret string) (err error) {
	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT)

	var stream binance.BinanceAPIStreamer

	stub := stubborn.NewStubborn(stubborn.Config{
		URL:             "wss://stream.binance.com:9443/ws/",
		IsReconnectable: true,
		MessageType:     stubborn.TextMessage,
		Print: func(args ...interface{}) {
			fmt.Println(args)
			return
		},
		Dialerf: func(ctx context.Context) (stubborn.DuplexConnector, error) {
			stream, err = b.StartUserDataStream(key, secret)
			if err != nil {
				return nil, err
			}
			conn, _, err := websocket.DefaultDialer.DialContext(
				ctx,
				fmt.Sprintf("%s%s", "wss://stream.binance.com:9443/ws/", stream.ListenKey()),
				nil)
			return conn, err
		},
		UnimportantErrs: []error{io.EOF},
	})

	stub.SetErrorHandler(func(err error) { fmt.Println("binance websocket error:", err) })

	stub.SetKeepAliveHandler(stubborn.KeepAlive{
		Tick: time.Tick(time.Hour - time.Minute),
		CustomPing: func() (msgType int, payload []byte) {
			stream.KeepAliveStream()
			return stubborn.PingMessage, []byte{}
		},
	})

	stub.SetMessageHandler(func(resp []byte) {
		fmt.Println("RESPONSE:", string(resp))
	})

	err = stub.Connect(b.ctx)
	if err != nil {
		return err
	}
	<-interrupt
    
	defer func() {
		stub.Close()
	}
	return nil
}
  • Huobi
func HuobiTradesSubscribe(key, secret string) (err error) {
	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT)
	
	sub := "{\"action\":\"sub\", \"ch\":\"trade.clearing#*#0\", \"cid\": \"someId\"}"
	unsub := "{\"action\":\"unsub\", \"ch\":\"trade.clearing#*\", \"cid\": \"someId\"}"

	path := "/ws/v2"
	wsUrl := fmt.Sprintf("wss://%s%s", "api-aws.huobi.pro", path)

	stub := stubborn.NewStubborn(stubborn.Config{
		URL:             wsUrl,
		IsReconnectable: true,
		MessageType:     stubborn.TextMessage,
		Print: func(args ...interface{}) {
			fmt.Println(args)
			return
		},
		Dialerf: func(ctx context.Context) (stubborn.DuplexConnector, error) {
			conn, _, err := websocket.DefaultDialer.DialContext(
				ctx,
				wsUrl,
				nil)

			return conn, err
		},
		AuthTimeOut:     5 * time.Second,
		UnimportantErrs: []error{io.EOF},
	})
	stub.SetAuthHandler(func() (req []byte, resp []byte, err error) {
		req, err = new(huobi.WebSocketV2RequestBuilder).Build(key, secret, "api-aws.huobi.pro", path)
		if err != nil {
			return nil, nil, err
		}

		resp = []byte(`{
			"action": "req",
				"code": 200,
				"ch": "auth",
				"data": {}
		}`)
		return
	})

	stub.SetErrorHandler(func(err error) {
		fmt.Println("huobi websocket error:", err)

		if errors.Is(err, stubborn.ErrCritical) {
			// resubscribe
			err = stub.Send(stubborn.TextMessage, []byte(sub))
			if err != nil {
				fmt.Println("huobi websocket error:", err)
			}
		}
	})

	stub.SetKeepAliveHandler(stubborn.KeepAlive{
		Tick: time.After(1 * time.Hour), 
		CustomPong: func(typ int, payload []byte) (int, []byte) {
			ping := struct {
				Action string `json:"action"`
				Data   *struct {
					Timestamp int64 `json:"ts"`
				} `json:"data"`
			}{}
			err := json.Unmarshal(payload, &ping)
			if err != nil {
				return 0, nil
			}
			
			pong := fmt.Sprintf("{\"action\": \"pong\", \"data\": { \"ts\": %d } }", ping.Data.Timestamp)
			return stubborn.TextMessage, []byte(pong)
		},
	})

	stub.SetMessageHandler(func(resp []byte) {
		fmt.Println("RESPONSE:", string(resp))
	})

	err = stub.Connect(hc.ctx)
	if err != nil {
		return err
	}

	// subscribe
	err = stub.Send(stubborn.TextMessage, []byte(sub))
	if err != nil {
		return err
	}
	<-interrupt
	defer func() {
		stub.Send(stubborn.TextMessage, []byte(unsub))
		stub.Close()
	}
	return nil
}
  • Okex
func OkexOrdersSubscribe(key, secret, pass string) (err error) {
	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT)
	
	var url  = "wss://real.okex.com:8443/ws/v3?compress=true"
	
	subOp := BaseOp{
		Op:   "subscribe",
		Args: []string{"spot/order:ETH-USDT"},
	}

	subMsg, _ := json.Marshal(subOp)

	stub := stubborn.NewStubborn(stubborn.Config{
		URL:             url,
		IsReconnectable: true,
		MessageType:     stubborn.TextMessage,
		Print: func(args ...interface{}) {
			fmt.Println(args)
			return
		},
		Dialerf: func(ctx context.Context) (stubborn.DuplexConnector, error) {
			conn, _, err := websocket.DefaultDialer.DialContext(
				ctx,
				url,
				nil,
			)

			return conn, err
		},
		AuthTimeOut:     5 * time.Second,
		UnimportantErrs: []error{io.EOF},
	})

	stub.SetAuthHandler(func() (req []byte, resp []byte, err error) {
		resp = []byte(`{"event":"login","success":true}`)
        et := epochTime()
		signingString := et + http.MethodGet + "/users/self/verify"
		signed, err := hmacSha256Base64Sign(signingString, secret)
		if err != nil {
			return nil, nil, err
		}

		b := BaseOp{
			Op:   "login",
			Args: []string{key, pass, et, signed},
		}
		req, err = json.Marshal(b)
		return
	})

	stub.SetErrorHandler(func(err error) {
		fmt.Println("okex websocket error:", err)
		if errors.Is(err, stubborn.ErrCritical) {
			// resubscribe
			err = stub.Send(stubborn.TextMessage, subMsg)
			if err != nil {
				fmt.Println("okex websocket error:", err)
			}
		}
	})

	stub.SetKeepAliveHandler(stubborn.KeepAlive{
		Tick: time.Tick(25 * time.Second),
		CustomPing: func() (msgType int, payload []byte) {
			return stubborn.TextMessage, []byte("ping")
		},
	})

	stub.SetMessageHandler(func(resp []byte) {
		fmt.Println("RESPONSE:", string(resp))
	})

	err = stub.Connect(oc.ctx)
	if err != nil {
		return err
	}

	// subscribe
	err = stub.Send(stubborn.TextMessage, subMsg)
	if err != nil {
		return err
	}
	<-interrupt
	defer func() {
		unsubOp := BaseOp{
			Op:   "unsubcribe",
			Args: args,
		}
		unsubMsg, _ := json.Marshal(unsubOp)
		stub.Send(stubborn.TextMessage, unsubMsg)
		stub.Close()
	}
	return nil
}

Documentation

Overview

Author of this file is https://github.com/nsf https://github.com/nsf/jsondiff/blob/master/LICENSE

Index

Constants

View Source
const (
	TextMessage   = 1
	BinaryMessage = 2
	PingMessage   = 9
	PongMessage   = 10
)

Variables

View Source
var (
	ErrCritical = errors.New("critical")
	ErrMajor    = errors.New("major")
	ErrMinor    = errors.New("minor")
)

Functions

func FlateCompress added in v1.1.1

func FlateCompress(input string) ([]byte, error)

func FlateDecompress added in v1.1.1

func FlateDecompress(input []byte) ([]byte, error)

func GZipCompress

func GZipCompress(input string) ([]byte, error)

func GZipDecompress

func GZipDecompress(input []byte) ([]byte, error)

Types

type AuthHandler

type AuthHandler func() (req []byte, resp []byte, err error)

type Client added in v1.1.1

type Client struct {
	// contains filtered or unexported fields
}

func NewStubborn

func NewStubborn(
	conf Config,
) *Client

func (*Client) Close added in v1.1.1

func (s *Client) Close()

func (*Client) Connect added in v1.1.1

func (s *Client) Connect(ctx context.Context) (err error)

func (*Client) ManualReconnect added in v1.1.1

func (s *Client) ManualReconnect()

func (*Client) Send added in v1.1.1

func (s *Client) Send(msgType int, message []byte) (err error)

func (*Client) SetAuthHandler added in v1.1.1

func (s *Client) SetAuthHandler(authH AuthHandler)

Set auth callback handler

func (*Client) SetErrorHandler added in v1.1.1

func (s *Client) SetErrorHandler(errH ErrorHandler)

Set error handler excluding messages

func (*Client) SetKeepAliveHandler added in v1.1.1

func (s *Client) SetKeepAliveHandler(keep KeepAlive)

SetKeepAliveHandler keep connection intact and trace it state

func (*Client) SetMessageHandler added in v1.1.1

func (s *Client) SetMessageHandler(msgH MessageHandler)

Set message handler excluding errors

type Config

type Config struct {
	IsReconnectable bool
	// default message type
	MessageType int
	// use your favourite logger
	Logger CustomLogger
	// use your favourite client
	Dialerf     DialerFunc
	AuthTimeOut time.Duration
	// it is easier to define expected error because exceptions always unexpected :)
	UnimportantErrs []error
}

type CustomLogger added in v1.1.1

type CustomLogger interface {
	Infoln(args ...interface{})
	Debugln(args ...interface{})
	Errorln(args ...interface{})
}

type DialerFunc

type DialerFunc func(ctx context.Context) (DuplexConnector, error)

type Difference

type Difference int
const (
	FullMatch Difference = iota
	SupersetMatch
	NoMatch
	FirstArgIsInvalidJson
	SecondArgIsInvalidJson
	BothArgsAreInvalidJson
)

func CompareJSON

func CompareJSON(a, b []byte, opts *Options) (Difference, string)

Compares two JSON documents using given options. Returns difference type and a string describing differences.

FullMatch means provided arguments are deeply equal.

SupersetMatch means first argument is a superset of a second argument. In this diffContext being a superset means that for each object or array in the hierarchy which don't match exactly, it must be a superset of another one. For example:

{"a": 123, "b": 456, "c": [7, 8, 9]}

Is a superset of:

{"a": 123, "c": [7, 8]}

NoMatch means there is no match.

The rest of the difference types mean that one of or both JSON documents are invalid JSON.

Returned string uses a format similar to pretty printed JSON to show the human-readable difference between provided JSON documents. It is important to understand that returned format is not a valid JSON and is not meant to be machine readable.

func (Difference) String

func (d Difference) String() string

type DuplexConnector added in v0.3.0

type DuplexConnector interface {
	Close() error
	ReadMessage() (messageType int, p []byte, err error)
	WriteMessage(messageType int, data []byte) error
}

type ErrorHandler

type ErrorHandler func(err error)

type KeepAlive

type KeepAlive struct {
	Tick       time.Duration
	CustomPing func() (msgType int, payload []byte)
	CustomPong func(msgTp int, data []byte) (msgType int, payload []byte)
}

type MessageHandler

type MessageHandler func(resp []byte)

type Options

type Options struct {
	Normal           Tag
	Added            Tag
	Removed          Tag
	Changed          Tag
	Prefix           string
	Indent           string
	PrintTypes       bool
	ChangedSeparator string
}

func DefaultConsoleOptions

func DefaultConsoleOptions() Options

Provides a set of options that are well suited for console output. Options use ANSI foreground color escape sequences to highlight changes.

func DefaultHTMLOptions

func DefaultHTMLOptions() Options

Provides a set of options that are well suited for HTML output. Works best inside <pre> tag.

func DefaultJSONOptions

func DefaultJSONOptions() Options

Provides a set of options in JSON format that are fully parseable.

type Tag

type Tag struct {
	Begin string
	End   string
}

Directories

Path Synopsis
example
http2 Module
wss Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL