Skip to content
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f9e15a2
Deribit: Switch to string IDs
gbjk Aug 4, 2025
c02a864
Deribit: Dedup wsResponse / wsResponse
gbjk Aug 5, 2025
38966fa
Deribit: Use uuid v7 for IDs
gbjk Aug 5, 2025
838ea89
Deribit: Handle errors from StartHeartbeat
gbjk Aug 9, 2025
4e67ca9
Deribit: Simplify WS ID matching
gbjk Aug 5, 2025
8459589
Exchanges: Add MessageID function to base
gbjk Aug 13, 2025
fb16417
fixup! Exchanges: Add MessageID function to base
gbjk Aug 13, 2025
0651bc1
fixup! Exchanges: Add MessageID function to base
gbjk Aug 14, 2025
9a5db82
fixup! Exchanges: Add MessageID function to base
gbjk Aug 14, 2025
b6e1abc
fixup! fixup! Exchanges: Add MessageID function to base
gbjk Aug 14, 2025
47a3f99
Exchanges: Remove example BespokeGenerateMessageID
gbjk Aug 14, 2025
352cdb0
Okx: Replace conn.RequestIDGenerator with MesssageID
gbjk Aug 14, 2025
596b230
Exchanges: Add MessageSequence
gbjk Aug 15, 2025
edf410c
GateIO: Split usage of MessageID and MessageSequence
gbjk Aug 15, 2025
e9323c2
Binance: Switch to UUID message IDs
gbjk Aug 16, 2025
26fa4ea
Kraken: Switch to e.MessageSequence
gbjk Aug 17, 2025
ed96b77
Kucoin: Switch to MessageID
gbjk Aug 17, 2025
33947e9
HitBTC: Switch to UUIDv7 for ws message ID
gbjk Aug 18, 2025
9be550c
Bybit: Switch to UUIDv7 for ws message ID
gbjk Aug 18, 2025
936e1e6
Bitfinex: Switch to UUIDv7 and MessageSequence
gbjk Aug 18, 2025
d93f7cd
Websocket: Remove now unused MessageID function
gbjk Aug 18, 2025
b285785
fixup! Exchanges: Add MessageSequence
gbjk Aug 18, 2025
ba10289
fixup! Bybit: Switch to UUIDv7 for ws message ID
gbjk Aug 18, 2025
e7d4455
fixup! GateIO: Split usage of MessageID and MessageSequence
gbjk Aug 18, 2025
8ad1c20
fixup! GateIO: Split usage of MessageID and MessageSequence
gbjk Aug 19, 2025
d58e5fe
fixup! Bybit: Switch to UUIDv7 for ws message ID
gbjk Aug 19, 2025
d76ec99
fixup! HitBTC: Switch to UUIDv7 for ws message ID
gbjk Aug 19, 2025
4590c69
fixup! Okx: Replace conn.RequestIDGenerator with MesssageID
gbjk Aug 22, 2025
529f8ae
Docs: Update guidance for message signatures
gbjk Aug 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Unsubscriber: e.SpotUnsubscribe,
GenerateSubscriptions: e.GenerateDefaultSubscriptionsSpot,
Connector: e.WsConnectSpot,
BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
}); err != nil {
return err
}
Expand All @@ -125,7 +124,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Unsubscriber: e.FuturesUnsubscribe,
GenerateSubscriptions: func() (subscription.List, error) { return e.GenerateFuturesDefaultSubscriptions(currency.USDT) },
Connector: e.WsFuturesConnect,
BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
}); err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions docs/ADD_NEW_EXCHANGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,12 @@ Alternatively you can use `request.WithVerbose(context.Background())` as the `co
Ensure each endpoint is implemented and has an associated test to improve test coverage and increase confidence
#### Message IDs
If the exchange uses unique string message IDs, then your API functions can call e.MessageID() to get a UUID V7 message ID.
However if the exchange does not support that format then you should override MessageID with an appropriate implementation.
For Example: Consider `common.Counter` if universal uniqueness is not a requirement and you need an integer id.
#### Authenticated functions
Authenticated request function is created based on the way the exchange documentation specifies. For example, see the [Binance Spot API - Endpoint Security Types](https://developers.binance.com/docs/binance-spot-api-docs/rest-api/endpoint-security-type).
Expand Down
2 changes: 0 additions & 2 deletions exchange/websocket/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Unsubscriber: e.SpotUnsubscribe,
GenerateSubscriptions: e.GenerateDefaultSubscriptionsSpot,
Connector: e.WsConnectSpot,
BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
}); err != nil {
return err
}
Expand All @@ -143,7 +142,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Unsubscriber: e.FuturesUnsubscribe,
GenerateSubscriptions: func() (subscription.List, error) { return e.GenerateFuturesDefaultSubscriptions(currency.USDT) },
Connector: e.WsFuturesConnect,
BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
}); err != nil {
return err
}
Expand Down
42 changes: 2 additions & 40 deletions exchange/websocket/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"compress/flate"
"compress/gzip"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"math/big"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -37,10 +35,6 @@ type Connection interface {
Dial(context.Context, *gws.Dialer, http.Header) error
ReadMessage() Response
SetupPingHandler(request.EndpointLimit, PingHandler)
// GenerateMessageID generates a message ID for the individual connection. If a bespoke function is set
// (by using SetupNewConnection) it will use that, otherwise it will use the defaultGenerateMessageID function
// defined in websocket_connection.go.
GenerateMessageID(highPrecision bool) int64
// SendMessageReturnResponse will send a WS message to the connection and wait for response
SendMessageReturnResponse(ctx context.Context, epl request.EndpointLimit, signature, request any) ([]byte, error)
// SendMessageReturnResponses will send a WS message to the connection and wait for N responses
Expand Down Expand Up @@ -87,12 +81,8 @@ type ConnectionSetup struct {
// Handler defines the function that will be called when a message is
// received from the exchange's websocket server. This function should
// handle the incoming message and pass it to the appropriate data handler.
Handler func(ctx context.Context, conn Connection, incoming []byte) error
// RequestIDGenerator is a function that returns a unique message ID.
// This is useful for when an exchange connection requires a unique or
// structured message ID for each message sent.
RequestIDGenerator func() int64
Authenticate func(ctx context.Context, conn Connection) error
Handler func(ctx context.Context, conn Connection, incoming []byte) error
Authenticate func(ctx context.Context, conn Connection) error
// MessageFilter defines the criteria used to match messages to a specific connection.
// The filter enables precise routing and handling of messages for distinct connection contexts.
MessageFilter any
Expand Down Expand Up @@ -128,7 +118,6 @@ type connection struct {
ResponseMaxLimit time.Duration
Traffic chan struct{}
readMessageErrors chan error
requestIDGenerator func() int64
}

// Dial sets proxy urls and then connects to the websocket
Expand Down Expand Up @@ -335,33 +324,6 @@ func (c *connection) parseBinaryResponse(resp []byte) ([]byte, error) {
return standardMessage, reader.Close()
}

// GenerateMessageID generates a message ID for the individual connection.
// If a bespoke function is set (by using SetupNewConnection) it will use that,
// otherwise it will use the defaultGenerateMessageID function.
func (c *connection) GenerateMessageID(highPrec bool) int64 {
if c.requestIDGenerator != nil {
return c.requestIDGenerator()
}
return c.defaultGenerateMessageID(highPrec)
}

// defaultGenerateMessageID generates the default message ID
func (c *connection) defaultGenerateMessageID(highPrec bool) int64 {
var minValue int64 = 1e8
var maxValue int64 = 2e8
if highPrec {
maxValue = 2e12
minValue = 1e12
}
// utilization of hard coded positive numbers and default crypto/rand
// io.reader will panic on error instead of returning
randomNumber, err := rand.Int(rand.Reader, big.NewInt(maxValue-minValue+1))
if err != nil {
panic(err)
}
return randomNumber.Int64() + minValue
}

// Shutdown shuts down and closes specific connection
func (c *connection) Shutdown() error {
if c == nil || c.Connection == nil {
Expand Down
3 changes: 1 addition & 2 deletions exchange/websocket/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (m *Manager) SetupNewConnection(c *ConnectionSetup) error {
return err
}

if c.ResponseCheckTimeout == 0 && c.ResponseMaxLimit == 0 && c.RateLimit == nil && c.URL == "" && c.ConnectionLevelReporter == nil && c.RequestIDGenerator == nil {
if c.ResponseCheckTimeout == 0 && c.ResponseMaxLimit == 0 && c.RateLimit == nil && c.URL == "" && c.ConnectionLevelReporter == nil {
return fmt.Errorf("%w: %w", errConnSetup, errExchangeConfigEmpty)
}

Expand Down Expand Up @@ -401,7 +401,6 @@ func (m *Manager) getConnectionFromSetup(c *ConnectionSetup) *connection {
Match: match,
RateLimit: c.RateLimit,
Reporter: c.ConnectionLevelReporter,
requestIDGenerator: c.RequestIDGenerator,
RateLimitDefinitions: m.rateLimitDefinitions,
}
}
Expand Down
35 changes: 2 additions & 33 deletions exchange/websocket/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func TestSendMessageReturnResponse(t *testing.T) {
Subscription: testRequestData{
Name: "ticker",
},
RequestID: wc.GenerateMessageID(false),
RequestID: 12345,
}

_, err = wc.SendMessageReturnResponse(t.Context(), request.Unset, req.RequestID, req)
Expand Down Expand Up @@ -758,37 +758,6 @@ func TestCanUseAuthenticatedWebsocketForWrapper(t *testing.T) {
assert.True(t, ws.CanUseAuthenticatedWebsocketForWrapper(), "CanUseAuthenticatedWebsocketForWrapper should return true")
}

func TestGenerateMessageID(t *testing.T) {
t.Parallel()
wc := connection{}
const spins = 1000
ids := make([]int64, spins)
for i := range spins {
id := wc.GenerateMessageID(true)
assert.NotContains(t, ids, id, "GenerateMessageID should not generate the same ID twice")
ids[i] = id
}

wc.requestIDGenerator = func() int64 { return 42 }
assert.EqualValues(t, 42, wc.GenerateMessageID(true), "GenerateMessageID should use bespokeGenerateMessageID")
}

// 7002502 166.7 ns/op 48 B/op 3 allocs/op
func BenchmarkGenerateMessageID_High(b *testing.B) {
wc := connection{}
for b.Loop() {
_ = wc.GenerateMessageID(true)
}
}

// 6536250 186.1 ns/op 48 B/op 3 allocs/op
func BenchmarkGenerateMessageID_Low(b *testing.B) {
wc := connection{}
for b.Loop() {
_ = wc.GenerateMessageID(false)
}
}

func TestCheckWebsocketURL(t *testing.T) {
err := checkWebsocketURL("")
assert.ErrorIs(t, err, errInvalidWebsocketURL, "checkWebsocketURL should error correctly on empty string")
Expand Down Expand Up @@ -1135,7 +1104,7 @@ func TestLatency(t *testing.T) {
Event: "subscribe",
Pairs: []string{currency.NewPairWithDelimiter("XBT", "USD", "/").String()},
Subscription: testRequestData{Name: "ticker"},
RequestID: wc.GenerateMessageID(false),
RequestID: 12346,
}

_, err = wc.SendMessageReturnResponse(t.Context(), request.Unset, req.RequestID, req)
Expand Down
4 changes: 2 additions & 2 deletions exchanges/binance/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,7 @@ func TestSubscribe(t *testing.T) {
var req WsPayload
require.NoError(tb, json.Unmarshal(msg, &req), "Unmarshal must not error")
require.ElementsMatch(tb, req.Params, exp, "Params must have correct channels")
return w.WriteMessage(gws.TextMessage, fmt.Appendf(nil, `{"result":null,"id":%d}`, req.ID))
return w.WriteMessage(gws.TextMessage, fmt.Appendf(nil, `{"result":null,"id":"%s"}`, req.ID))
}
e = testexch.MockWsInstance[Exchange](t, mockws.CurryWsMockUpgrader(t, mock))
} else {
Expand All @@ -2025,7 +2025,7 @@ func TestSubscribeBadResp(t *testing.T) {
var req WsPayload
err := json.Unmarshal(msg, &req)
require.NoError(tb, err, "Unmarshal must not error")
return w.WriteMessage(gws.TextMessage, fmt.Appendf(nil, `{"result":{"error":"carrots"},"id":%d}`, req.ID))
return w.WriteMessage(gws.TextMessage, fmt.Appendf(nil, `{"result":{"error":"carrots"},"id":"%s"}`, req.ID))
}
b := testexch.MockWsInstance[Exchange](t, mockws.CurryWsMockUpgrader(t, mock))
err := b.Subscribe(channels)
Expand Down
2 changes: 1 addition & 1 deletion exchanges/binance/binance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ type WsListStatusData struct {
type WsPayload struct {
Method string `json:"method"`
Params []string `json:"params"`
ID int64 `json:"id"`
ID string `json:"id"`
}

// CrossMarginInterestData stores cross margin data for borrowing
Expand Down
4 changes: 2 additions & 2 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (e *Exchange) wsReadData() {
}

func (e *Exchange) wsHandleData(respRaw []byte) error {
if id, err := jsonparser.GetInt(respRaw, "id"); err == nil {
if id, err := jsonparser.GetString(respRaw, "id"); err == nil {
if e.Websocket.Match.IncomingWithData(id, respRaw) {
return nil
}
Expand Down Expand Up @@ -582,7 +582,7 @@ func (e *Exchange) manageSubs(ctx context.Context, op string, subs subscription.
}

req := WsPayload{
ID: e.Websocket.Conn.GenerateMessageID(false),
ID: e.MessageID(),
Method: op,
Params: subs.QualifiedChannels(),
}
Expand Down
4 changes: 2 additions & 2 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1716,7 +1716,7 @@ func (e *Exchange) subscribeToChan(ctx context.Context, subs subscription.List)

// subId is a single round-trip identifier that provides linking sub requests to chanIDs
// Although docs only mention subId for wsBookChannel, it works for all chans
subID := strconv.FormatInt(e.Websocket.Conn.GenerateMessageID(false), 10)
subID := e.MessageID()
req["subId"] = subID

// Add a temporary Key so we can find this Sub when we get the resp without delay or context switch
Expand Down Expand Up @@ -1829,7 +1829,7 @@ func (e *Exchange) WsSendAuth(ctx context.Context) error {

// WsNewOrder authenticated new order request
func (e *Exchange) WsNewOrder(ctx context.Context, data *WsNewOrderRequest) (string, error) {
data.CustomID = e.Websocket.AuthConn.GenerateMessageID(false)
data.CustomID = e.MessageSequence()
req := makeRequestInterface(wsOrderNew, data)
resp, err := e.Websocket.AuthConn.SendMessageReturnResponse(ctx, request.Unset, data.CustomID, req)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions exchanges/bybit/bybit.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import (
type Exchange struct {
exchange.Base

messageIDSeq common.Counter
account accountTypeHolder
account accountTypeHolder
}

const (
Expand Down
1 change: 0 additions & 1 deletion exchanges/bybit/bybit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3026,7 +3026,6 @@ type FixtureConnection struct {
websocket.Connection
}

func (d *FixtureConnection) GenerateMessageID(bool) int64 { return 1337 }
func (d *FixtureConnection) SetupPingHandler(request.EndpointLimit, websocket.PingHandler) {}
func (d *FixtureConnection) Dial(context.Context, *gws.Dialer, http.Header) error { return d.dialError }

Expand Down
16 changes: 8 additions & 8 deletions exchanges/bybit/bybit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (e *Exchange) WebsocketAuthenticateConnection(ctx context.Context, conn web
return err
}
req := Authenticate{
RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10),
RequestID: e.MessageID(),
Operation: "auth",
Args: []any{creds.Key, intNonce, hex.EncodeToString(hmac)},
}
Expand All @@ -128,7 +128,7 @@ func (e *Exchange) WebsocketAuthenticateConnection(ctx context.Context, conn web
return nil
}

func (e *Exchange) handleSubscriptions(conn websocket.Connection, operation string, subs subscription.List) (args []SubscriptionArgument, err error) {
func (e *Exchange) handleSubscriptions(_ websocket.Connection, operation string, subs subscription.List) (args []SubscriptionArgument, err error) {
subs, err = subs.ExpandTemplates(e)
if err != nil {
return
Expand All @@ -139,7 +139,7 @@ func (e *Exchange) handleSubscriptions(conn websocket.Connection, operation stri
args = append(args, SubscriptionArgument{
auth: b[0].Authenticated,
Operation: operation,
RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10),
RequestID: e.MessageID(),
Arguments: b.QualifiedChannels(),
associatedSubs: b,
})
Expand Down Expand Up @@ -664,7 +664,7 @@ func hasPotentialDelimiter(a asset.Item) bool {

// TODO: Remove this function when template expansion is across all assets
func (e *Exchange) submitDirectSubscription(ctx context.Context, conn websocket.Connection, a asset.Item, operation string, channelsToSubscribe subscription.List) error {
payloads, err := e.directSubscriptionPayload(conn, a, operation, channelsToSubscribe)
payloads, err := e.directSubscriptionPayload(a, operation, channelsToSubscribe)
if err != nil {
return err
}
Expand Down Expand Up @@ -702,17 +702,17 @@ func (e *Exchange) submitDirectSubscription(ctx context.Context, conn websocket.
}

// TODO: Remove this function when template expansion is across all assets
func (e *Exchange) directSubscriptionPayload(conn websocket.Connection, assetType asset.Item, operation string, channelsToSubscribe subscription.List) ([]SubscriptionArgument, error) {
func (e *Exchange) directSubscriptionPayload(assetType asset.Item, operation string, channelsToSubscribe subscription.List) ([]SubscriptionArgument, error) {
var args []SubscriptionArgument
arg := SubscriptionArgument{
Operation: operation,
RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10),
RequestID: e.MessageID(),
Arguments: []string{},
}
authArg := SubscriptionArgument{
auth: true,
Operation: operation,
RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10),
RequestID: e.MessageID(),
Arguments: []string{},
}

Expand Down Expand Up @@ -757,7 +757,7 @@ func (e *Exchange) directSubscriptionPayload(conn websocket.Connection, assetTyp
args = append(args, arg)
arg = SubscriptionArgument{
Operation: operation,
RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10),
RequestID: e.MessageID(),
Arguments: []string{},
}
}
Expand Down
10 changes: 2 additions & 8 deletions exchanges/bybit/bybit_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error {
return e.wsHandleData(conn, asset.Spot, resp)
},
RequestIDGenerator: e.messageIDSeq.IncrementAndGet,
}); err != nil {
return err
}
Expand All @@ -276,7 +275,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error {
return e.wsHandleData(conn, asset.Options, resp)
},
RequestIDGenerator: e.messageIDSeq.IncrementAndGet,
}); err != nil {
return err
}
Expand Down Expand Up @@ -305,8 +303,7 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error {
return e.wsHandleData(conn, asset.USDTMarginedFutures, resp)
},
RequestIDGenerator: e.messageIDSeq.IncrementAndGet,
MessageFilter: asset.USDTMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types.
MessageFilter: asset.USDTMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types.
}); err != nil {
return err
}
Expand Down Expand Up @@ -335,8 +332,7 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error {
return e.wsHandleData(conn, asset.USDCMarginedFutures, resp)
},
RequestIDGenerator: e.messageIDSeq.IncrementAndGet,
MessageFilter: asset.USDCMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types.
MessageFilter: asset.USDCMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types.
}); err != nil {
return err
}
Expand All @@ -359,7 +355,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error {
return e.wsHandleData(conn, asset.CoinMarginedFutures, resp)
},
RequestIDGenerator: e.messageIDSeq.IncrementAndGet,
}); err != nil {
return err
}
Expand All @@ -381,7 +376,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Subscriber: e.authSubscribe,
Unsubscriber: e.authUnsubscribe,
Handler: e.wsHandleAuthenticatedData,
RequestIDGenerator: e.messageIDSeq.IncrementAndGet,
Authenticate: e.WebsocketAuthenticateConnection,
})
}
Expand Down
Loading
Loading