Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added spot ws service, moved client to common, refactoring #633

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 56 additions & 95 deletions v2/futures/client_ws.go → v2/common/websocket/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package futures
package websocket

import (
"context"
Expand All @@ -13,11 +13,9 @@ import (

"github.com/gorilla/websocket"
"github.com/jpillora/backoff"

"github.com/adshao/go-binance/v2/common"
)

//go:generate mockgen -source client_ws.go -destination mock/client_ws.go -package mock
//go:generate mockgen -source client.go -destination mock/client.go -package mock

const (
// reconnectMinInterval define reconnect min interval
Expand All @@ -33,22 +31,21 @@ var (

// ErrorWsIdAlreadySent defines that request with the same id was already sent
ErrorWsIdAlreadySent = errors.New("ws error: request with same id already sent")

// KeepAlivePingDeadline defines deadline to send ping frame
KeepAlivePingDeadline = 10 * time.Second
)

// messageId define id field of request/response
type messageId struct {
Id string `json:"id"`
}

// ClientWs define API websocket client
type ClientWs struct {
APIKey string
SecretKey string
// client define API websocket client
type client struct {
Debug bool
KeyType string
TimeOffset int64
logger *log.Logger
conn wsConnection
conn Connection
connMu sync.Mutex
reconnectSignal chan struct{}
connectionEstablishedSignal chan struct{}
Expand All @@ -58,18 +55,15 @@ type ClientWs struct {
reconnectCount int64
}

func (c *ClientWs) debug(format string, v ...interface{}) {
func (c *client) debug(format string, v ...interface{}) {
if c.Debug {
c.logger.Println(fmt.Sprintf(format, v...))
}
}

// NewClientWs init ClientWs
func NewClientWs(conn wsConnection, apiKey, secretKey string) (*ClientWs, error) {
client := &ClientWs{
APIKey: apiKey,
SecretKey: secretKey,
KeyType: common.KeyTypeHmac,
// NewClient init client
func NewClient(conn Connection) (Client, error) {
client := &client{
logger: log.New(os.Stderr, "Binance-golang ", log.LstdFlags),
conn: conn,
connMu: sync.Mutex{},
Expand All @@ -86,21 +80,17 @@ func NewClientWs(conn wsConnection, apiKey, secretKey string) (*ClientWs, error)
return client, nil
}

type wsClient interface {
type Client interface {
Write(id string, data []byte) error
WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error)
GetReadChannel() <-chan []byte
GetReadErrorChannel() <-chan error
GetApiKey() string
GetSecretKey() string
GetTimeOffset() int64
GetKeyType() string
GetReconnectCount() int64
Wait(timeout time.Duration)
}

// Write sends data into websocket connection
func (c *ClientWs) Write(id string, data []byte) error {
func (c *client) Write(id string, data []byte) error {
c.connMu.Lock()
defer c.connMu.Unlock()

Expand All @@ -120,7 +110,7 @@ func (c *ClientWs) Write(id string, data []byte) error {

// WriteSync sends data to the websocket connection and waits for a response synchronously
// Should be used separately from the asynchronous Write method (do not send anything in parallel)
func (c *ClientWs) WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error) {
func (c *client) WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error) {
c.connMu.Lock()
defer c.connMu.Unlock()

Expand Down Expand Up @@ -157,36 +147,20 @@ func (c *ClientWs) WriteSync(id string, data []byte, timeout time.Duration) ([]b
}
}

func (c *ClientWs) GetReadChannel() <-chan []byte {
func (c *client) GetReadChannel() <-chan []byte {
return c.readC
}

func (c *ClientWs) GetReadErrorChannel() <-chan error {
func (c *client) GetReadErrorChannel() <-chan error {
return c.readErrChan
}

func (c *ClientWs) GetApiKey() string {
return c.APIKey
}

func (c *ClientWs) GetSecretKey() string {
return c.SecretKey
}

func (c *ClientWs) GetTimeOffset() int64 {
return c.TimeOffset
}

func (c *ClientWs) GetKeyType() string {
return c.KeyType
}

func (c *ClientWs) Wait(timeout time.Duration) {
func (c *client) Wait(timeout time.Duration) {
c.wait(timeout)
}

// read data from connection
func (c *ClientWs) read() {
func (c *client) read() {
defer func() {
// reading from closed connection 1000 times caused panic
// prevent panic for any case
Expand Down Expand Up @@ -231,7 +205,7 @@ func (c *ClientWs) read() {

// wait until all responses received
// make sure that you are not sending requests
func (c *ClientWs) wait(timeout time.Duration) {
func (c *client) wait(timeout time.Duration) {
doneC := make(chan struct{})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -260,7 +234,7 @@ func (c *ClientWs) wait(timeout time.Duration) {
}

// handleReconnect waits for reconnect signal and starts reconnect
func (c *ClientWs) handleReconnect() {
func (c *client) handleReconnect() {
for _ = range c.reconnectSignal {
c.debug("reconnect: received signal")

Expand All @@ -285,10 +259,10 @@ func (c *ClientWs) handleReconnect() {
}

// startReconnect starts reconnect loop with increasing delay
func (c *ClientWs) startReconnect(b *backoff.Backoff) *connection {
func (c *client) startReconnect(b *backoff.Backoff) Connection {
for {
atomic.AddInt64(&c.reconnectCount, 1)
conn, err := newConnection()
conn, err := c.conn.RestoreConnection()
if err != nil {
delay := b.Duration()
c.debug("reconnect: error while reconnecting. try in %s", delay.Round(time.Millisecond))
Expand All @@ -301,7 +275,9 @@ func (c *ClientWs) startReconnect(b *backoff.Backoff) *connection {
}

// GetReconnectCount returns reconnect counter value
func (c *ClientWs) GetReconnectCount() int64 { return atomic.LoadInt64(&c.reconnectCount) }
func (c *client) GetReconnectCount() int64 {
return atomic.LoadInt64(&c.reconnectCount)
}

// NewRequestList creates request list
func NewRequestList() RequestList {
Expand Down Expand Up @@ -356,37 +332,47 @@ func (l *RequestList) IsAlreadyInList(id string) bool {
return false
}

// constructor for connection
func newConnection() (*connection, error) {
conn, err := WsApiInitReadWriteConn()
// NewConnection constructor for connection
func NewConnection(
initUnderlyingWsConnFn func() (*websocket.Conn, error),
isKeepAliveNeeded bool,
keepaliveTimeout time.Duration,
) (Connection, error) {
underlyingWsConn, err := initUnderlyingWsConnFn()
if err != nil {
return nil, err
}

wsConn := &connection{
conn: conn,
connectionMu: sync.Mutex{},
lastResponseMu: sync.Mutex{},
conn: underlyingWsConn,
connectionMu: sync.Mutex{},
lastResponseMu: sync.Mutex{},
initUnderlyingWsConnFn: initUnderlyingWsConnFn,
keepaliveTimeout: keepaliveTimeout,
}

if WebsocketKeepalive {
go wsConn.keepAlive(WebsocketTimeoutReadWriteConnection)
if isKeepAliveNeeded {
go wsConn.keepAlive(keepaliveTimeout)
}

return wsConn, nil
}

// instance of single connection with keepalive handler
// connection is an instance of single ws connection with keepalive handler
type connection struct {
conn *websocket.Conn
connectionMu sync.Mutex
lastResponse time.Time
lastResponseMu sync.Mutex
conn *websocket.Conn
connectionMu sync.Mutex
lastResponse time.Time
lastResponseMu sync.Mutex
initUnderlyingWsConnFn func() (*websocket.Conn, error)
keepaliveTimeout time.Duration
isKeepAliveNeeded bool
}

type wsConnection interface {
type Connection interface {
WriteMessage(messageType int, data []byte) error
ReadMessage() (messageType int, p []byte, err error)
RestoreConnection() (Connection, error)
}

// WriteMessage is a thread-safe method for conn.WriteMessage
Expand All @@ -401,6 +387,11 @@ func (c *connection) ReadMessage() (int, []byte, error) {
return c.conn.ReadMessage()
}

// RestoreConnection recreates ws connection with the same underlying connection callback and keepalive timeout
func (c *connection) RestoreConnection() (Connection, error) {
return NewConnection(c.initUnderlyingWsConnFn, c.isKeepAliveNeeded, c.keepaliveTimeout)
}

// keepAlive handles ping-pong for connection
func (c *connection) keepAlive(timeout time.Duration) {
ticker := time.NewTicker(timeout)
Expand Down Expand Up @@ -455,41 +446,11 @@ func (c *connection) ping() error {
c.connectionMu.Lock()
defer c.connectionMu.Unlock()

deadline := time.Now().Add(10 * time.Second)
deadline := time.Now().Add(KeepAlivePingDeadline)
err := c.conn.WriteControl(websocket.PingMessage, []byte{}, deadline)
if err != nil {
return err
}

return nil
}

// NewOrderPlaceWsService init OrderPlaceWsService
func NewOrderPlaceWsService(apiKey, secretKey string) (*OrderPlaceWsService, error) {
conn, err := newConnection()
if err != nil {
return nil, err
}

client, err := NewClientWs(conn, apiKey, secretKey)
if err != nil {
return nil, err
}

return &OrderPlaceWsService{c: client}, nil
}

// NewOrderCancelWsService init OrderCancelWsService
func NewOrderCancelWsService(apiKey, secretKey string) (*OrderCancelWsService, error) {
conn, err := newConnection()
if err != nil {
return nil, err
}

client, err := NewClientWs(conn, apiKey, secretKey)
if err != nil {
return nil, err
}

return &OrderCancelWsService{c: client}, nil
}
Loading
Loading