Skip to content

Commit 8ec14d0

Browse files
authored
Merge pull request #344 from xssnick/rldp-transfer-fix
Rldp transfer fix
2 parents 3d1c845 + c988065 commit 8ec14d0

File tree

4 files changed

+57
-45
lines changed

4 files changed

+57
-45
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
[![Based on TON][ton-svg]][ton]
66
[![Telegram Channel][tgc-svg]][tg-channel]
7-
![Coverage](https://img.shields.io/badge/Coverage-69.5%25-yellow)
7+
![Coverage](https://img.shields.io/badge/Coverage-69.6%25-yellow)
88

99
Golang library for interacting with TON blockchain.
1010

adnl/rldp/client.go

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,14 @@ import (
77
"encoding/hex"
88
"errors"
99
"fmt"
10+
"github.com/xssnick/raptorq"
11+
"github.com/xssnick/tonutils-go/adnl"
12+
"github.com/xssnick/tonutils-go/tl"
1013
"math/bits"
1114
"reflect"
1215
"sync"
1316
"sync/atomic"
1417
"time"
15-
"unsafe"
16-
17-
"github.com/xssnick/raptorq"
18-
"github.com/xssnick/tonutils-go/adnl"
19-
"github.com/xssnick/tonutils-go/tl"
2018
)
2119

2220
type ADNL interface {
@@ -56,7 +54,7 @@ type activeTransfer struct {
5654
data []byte
5755
timeoutAt int64
5856

59-
currentPart unsafe.Pointer //*activeTransferPart
57+
currentPart atomic.Pointer[activeTransferPart]
6058
rldp *RLDP
6159

6260
mx sync.Mutex
@@ -97,6 +95,8 @@ type RLDP struct {
9795
lastNetworkProcessAt int64
9896
lastNetworkPacketsRecv int64
9997
lastNetworkBatchesNum int64
98+
99+
lastReport time.Time
100100
}
101101

102102
type decoderStreamPart struct {
@@ -249,7 +249,6 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error {
249249
} else {
250250
r.recvStreams[id] = stream
251251
}
252-
delete(r.expectedTransfers, id)
253252
r.mx.Unlock()
254253
}
255254

@@ -302,6 +301,11 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error {
302301
}
303302

304303
if stream.currentPart.decoder == nil {
304+
fec, ok := part.FecType.(FECRaptorQ)
305+
if !ok {
306+
return fmt.Errorf("not supported fec type in part: %d", part.Part)
307+
}
308+
305309
if uint64(fec.DataSize) > stream.totalSize || fec.DataSize > uint32(MaxFECDataSize) ||
306310
fec.SymbolSize == 0 || fec.SymbolsCount == 0 {
307311
return fmt.Errorf("invalid fec")
@@ -312,6 +316,7 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error {
312316
return fmt.Errorf("failed to init raptorq decoder: %w", err)
313317
}
314318
stream.currentPart.decoder = dec
319+
Logger("[ID]", hex.EncodeToString(part.TransferID), "[RLDP] created decoder for part:", part.Part, "data size:", fec.DataSize, "symbol size:", fec.SymbolSize, "symbols:", fec.SymbolsCount)
315320
}
316321

317322
canTryDecode, err := stream.currentPart.decoder.AddSymbol(part.Seqno, part.Data)
@@ -346,11 +351,7 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error {
346351
if isV2 {
347352
complete = CompleteV2(complete.(Complete))
348353
}
349-
350-
err = r.adnl.SendCustomMessage(context.Background(), complete)
351-
if err != nil {
352-
return fmt.Errorf("failed to send rldp complete message: %w", err)
353-
}
354+
_ = r.adnl.SendCustomMessage(context.Background(), complete)
354355

355356
if uint64(len(stream.buf)) >= stream.totalSize {
356357
stream.finishedAt = &tmd
@@ -396,6 +397,7 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error {
396397
req := r.activeRequests[qid]
397398
if req != nil {
398399
delete(r.activeRequests, qid)
400+
delete(r.expectedTransfers, id)
399401
}
400402
r.mx.Unlock()
401403

@@ -461,10 +463,8 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error {
461463
}
462464
}
463465
case Complete: // receiver has fully received transfer part, send new part or close our stream if done
464-
id := string(m.TransferID)
465-
466466
r.mx.RLock()
467-
t := r.activeTransfers[id]
467+
t := r.activeTransfers[string(m.TransferID)]
468468
r.mx.RUnlock()
469469

470470
if t != nil {
@@ -486,7 +486,7 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error {
486486

487487
if !more {
488488
r.mx.Lock()
489-
delete(r.activeTransfers, id)
489+
delete(r.activeTransfers, string(t.id))
490490
r.mx.Unlock()
491491

492492
break
@@ -574,7 +574,6 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error {
574574
}
575575

576576
func (r *RLDP) recoverySender() {
577-
packets := make([]tl.Serializable, 0, 1024)
578577
transfersToProcess := make([]*activeTransferPart, 0, 128)
579578
timedOut := make([]*activeTransfer, 0, 32)
580579
timedOutReq := make([]string, 0, 32)
@@ -588,11 +587,19 @@ func (r *RLDP) recoverySender() {
588587
case <-closerCtx.Done():
589588
return
590589
case <-ticker.C:
591-
packets = packets[:0]
592-
transfersToProcess = transfersToProcess[:0]
593-
timedOut = timedOut[:0]
594-
timedOutReq = timedOutReq[:0]
595-
timedOutExp = timedOutExp[:0]
590+
if r.lastReport.Before(time.Now().Add(-10 * time.Second)) {
591+
r.lastReport = time.Now()
592+
r.mx.RLock()
593+
Logger("[RLDP] recovery sender stats",
594+
"peer", hex.EncodeToString(r.adnl.GetID()),
595+
"active transfers", len(r.activeTransfers),
596+
"active requests", len(r.activeRequests),
597+
"expected transfers", len(r.expectedTransfers),
598+
"transfers to process", len(transfersToProcess),
599+
"timed out", len(timedOut),
600+
)
601+
r.mx.RUnlock()
602+
}
596603

597604
ms := time.Now().UnixMilli()
598605

@@ -694,6 +701,11 @@ func (r *RLDP) recoverySender() {
694701
}
695702
r.mx.Unlock()
696703
}
704+
705+
transfersToProcess = transfersToProcess[:0]
706+
timedOut = timedOut[:0]
707+
timedOutReq = timedOutReq[:0]
708+
timedOutExp = timedOutExp[:0]
697709
case <-r.activateRecoverySender:
698710
ticker.Reset(1 * time.Millisecond)
699711
}
@@ -703,7 +715,7 @@ func (r *RLDP) recoverySender() {
703715
func (r *RLDP) startTransfer(ctx context.Context, transferId, data []byte, recoverTimeoutAt int64) error {
704716
at := &activeTransfer{
705717
id: transferId,
706-
timeoutAt: recoverTimeoutAt * int64(time.Millisecond),
718+
timeoutAt: recoverTimeoutAt * 1000, // ms
707719
data: data,
708720
rldp: r,
709721
}
@@ -734,7 +746,7 @@ func (r *RLDP) startTransfer(ctx context.Context, transferId, data []byte, recov
734746
}
735747

736748
func (t *activeTransfer) getCurrentPart() *activeTransferPart {
737-
return (*activeTransferPart)(atomic.LoadPointer(&t.currentPart))
749+
return t.currentPart.Load()
738750
}
739751

740752
func (t *activeTransfer) prepareNextPart() (bool, error) {
@@ -775,7 +787,7 @@ func (t *activeTransfer) prepareNextPart() (bool, error) {
775787
transfer: t,
776788
}
777789

778-
atomic.StorePointer(&t.currentPart, unsafe.Pointer(&part))
790+
t.currentPart.Store(&part)
779791
return true, nil
780792
}
781793

@@ -925,19 +937,17 @@ func (r *RLDP) SendAnswer(ctx context.Context, maxAnswerSize uint64, timeoutAt u
925937
return fmt.Errorf("too big answer for that client, client wants no more than %d bytes", maxAnswerSize)
926938
}
927939

928-
var transferId []byte
940+
timeout, ok := ctx.Deadline()
941+
if !ok {
942+
timeout = time.Now().Add(15 * time.Second)
943+
}
944+
tm := timeout.Unix()
929945

930-
if toTransferId != nil {
931-
// if we have transfer to respond, invert it and use id
932-
transferId = reverseTransferId(toTransferId)
933-
} else {
934-
transferId = make([]byte, 32)
935-
if _, err = rand.Read(transferId); err != nil {
936-
return err
937-
}
946+
if int64(timeoutAt) < tm {
947+
tm = int64(timeoutAt)
938948
}
939949

940-
if err = r.startTransfer(ctx, transferId, data, int64(timeoutAt)); err != nil {
950+
if err = r.startTransfer(ctx, reverseTransferId(toTransferId), data, tm); err != nil {
941951
return fmt.Errorf("failed to send partitioned answer: %w", err)
942952
}
943953

adnl/rldp/client_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"crypto/ed25519"
77
"crypto/rand"
8+
"crypto/sha256"
89
"encoding/hex"
910
"errors"
1011
"github.com/xssnick/raptorq"
@@ -17,7 +18,6 @@ import (
1718
"strings"
1819
"testing"
1920
"time"
20-
"unsafe"
2121
)
2222

2323
func init() {
@@ -38,8 +38,8 @@ func (m MockADNL) GetCloserCtx() context.Context {
3838
}
3939

4040
func (m MockADNL) GetID() []byte {
41-
//TODO implement me
42-
panic("implement me")
41+
v := sha256.Sum256([]byte("1.1.1.1:1234"))
42+
return v[:]
4343
}
4444

4545
func (m MockADNL) RemoteAddr() string {
@@ -303,13 +303,15 @@ func TestRLDP_handleMessage(t *testing.T) {
303303
}
304304
cli := NewClient(tAdnl)
305305

306-
cli.activeTransfers[string(tId)] = &activeTransfer{
307-
id: nil,
306+
td := &activeTransfer{
307+
id: tId,
308308
data: make([]byte, 10),
309-
currentPart: unsafe.Pointer(&activeTransferPart{
310-
index: 0,
311-
}),
312309
}
310+
td.currentPart.Store(&activeTransferPart{
311+
index: 0,
312+
})
313+
314+
cli.activeTransfers[string(tId)] = td
313315

314316
err := cli.handleMessage(msgComplete)
315317
if err != nil {

ton/integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func TestAPIClient_GetBlockData(t *testing.T) {
124124
func TestAPIClient_GetOldBlockData(t *testing.T) {
125125
client := liteclient.NewConnectionPool()
126126

127-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
127+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
128128
defer cancel()
129129

130130
err := client.AddConnection(ctx, "135.181.177.59:53312", "aF91CuUHuuOv9rm2W5+O/4h38M3sRm40DtSdRxQhmtQ=")

0 commit comments

Comments
 (0)