Skip to content

Commit a69e6af

Browse files
authored
Merge pull request #304 from xssnick/dev-v1-11
ADNL improvements & RLDP FEC parse fix
2 parents 619c2aa + 100d170 commit a69e6af

File tree

6 files changed

+75
-74
lines changed

6 files changed

+75
-74
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
[![Based on TON][ton-svg]][ton]
66
[![Telegram Channel][tgc-svg]][tg-channel]
7-
![Coverage](https://img.shields.io/badge/Coverage-72.8%25-brightgreen)
7+
![Coverage](https://img.shields.io/badge/Coverage-72.6%25-brightgreen)
88

99
Golang library for interacting with TON blockchain.
1010

adnl/adnl.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ type ADNL struct {
5858
closerCtx context.Context
5959
closeFn context.CancelFunc
6060

61-
channelPtr unsafe.Pointer
62-
channelExchangeCompleted int32
61+
channelPtr unsafe.Pointer
6362

6463
msgParts map[string]*partitionedMessage
6564

@@ -274,7 +273,6 @@ func (a *ADNL) processMessage(message any) error {
274273
}
275274

276275
atomic.StorePointer(&a.channelPtr, unsafe.Pointer(newChan))
277-
atomic.StoreInt32(&a.channelExchangeCompleted, 0)
278276
case MessageConfirmChannel:
279277
a.mx.Lock()
280278
defer a.mx.Unlock()
@@ -567,8 +565,6 @@ func (a *ADNL) buildRequest(req tl.Serializable) (buf []byte, err error) {
567565
return buf, nil
568566
}
569567

570-
atomic.StoreInt32(&a.channelExchangeCompleted, 1)
571-
572568
// channel is active
573569
buf, err = ch.createPacket(seqno, req)
574570
if err != nil {
@@ -591,7 +587,6 @@ func (a *ADNL) buildRequest(req tl.Serializable) (buf []byte, err error) {
591587
initDate: int32(time.Now().Unix()),
592588
}
593589
atomic.StorePointer(&a.channelPtr, unsafe.Pointer(ch))
594-
atomic.StoreInt32(&a.channelExchangeCompleted, 0)
595590
a.mx.Unlock()
596591
}
597592

@@ -670,6 +665,18 @@ func (a *ADNL) GetID() []byte {
670665
return id
671666
}
672667

668+
func (a *ADNL) GetPubKey() ed25519.PublicKey {
669+
return a.peerKey
670+
}
671+
672+
func (a *ADNL) Reinit() {
673+
atomic.StorePointer(&a.channelPtr, nil)
674+
atomic.StoreInt32(&a.reinitTime, int32(time.Now().Unix()))
675+
atomic.StoreInt32(&a.dstReinit, 0)
676+
atomic.StoreInt64(&a.seqno, 0)
677+
atomic.StoreInt64(&a.confirmSeqno, 0)
678+
}
679+
673680
func (a *ADNL) createPacket(seqno int64, isResp bool, msgs ...any) ([]byte, error) {
674681
if a.peerKey == nil {
675682
return nil, fmt.Errorf("unknown peer")

adnl/conn.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package adnl
22

33
import (
4+
"context"
5+
"errors"
46
"fmt"
5-
"golang.org/x/net/ipv4"
7+
"log"
68
"net"
79
"sync"
810
"time"
@@ -52,43 +54,46 @@ func (c *clientConn) Close() error {
5254
return nil
5355
}
5456

55-
type batchConn struct {
56-
localAddr net.Addr
57-
conn *ipv4.PacketConn
58-
59-
buf []ipv4.Message
60-
mx sync.Mutex
61-
}
62-
6357
type syncPacket struct {
6458
addr net.Addr
6559
buf []byte
6660
}
6761

6862
type SyncConn struct {
69-
conn net.PacketConn
70-
chWrite chan syncPacket
71-
chRead chan syncPacket
63+
conn net.PacketConn
64+
chWrite chan syncPacket
65+
chRead chan syncPacket
66+
closerCtx context.Context
67+
closer context.CancelFunc
7268
}
7369

7470
func NewSyncConn(conn net.PacketConn, packetsBufSz int) *SyncConn {
71+
ctx, cancel := context.WithCancel(context.Background())
7572
sc := &SyncConn{
76-
conn: conn,
77-
chWrite: make(chan syncPacket, packetsBufSz),
73+
conn: conn,
74+
chWrite: make(chan syncPacket, packetsBufSz),
75+
closer: cancel,
76+
closerCtx: ctx,
7877
}
7978
go sc.writer()
8079
return sc
8180
}
8281

8382
func (s *SyncConn) writer() {
83+
defer s.Close()
84+
8485
for {
8586
select {
8687
case p := <-s.chWrite:
87-
_, err := s.conn.WriteTo(p.buf, p.addr)
88-
if err != nil {
89-
_ = s.conn.Close()
90-
return
88+
if _, err := s.conn.WriteTo(p.buf, p.addr); err != nil {
89+
if errors.Is(err, net.ErrClosed) {
90+
return
91+
}
92+
// should not happen, but if will we want to see
93+
log.Println("[CONN] Write error:", err.Error())
9194
}
95+
case <-s.closerCtx.Done():
96+
return
9297
}
9398
}
9499
}
@@ -98,11 +103,16 @@ func (s *SyncConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
98103
}
99104

100105
func (s *SyncConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
101-
s.chWrite <- syncPacket{addr, p}
102-
return len(p), nil
106+
select {
107+
case <-s.closerCtx.Done():
108+
return 0, fmt.Errorf("connection was closed")
109+
case s.chWrite <- syncPacket{addr, p}:
110+
return len(p), nil
111+
}
103112
}
104113

105114
func (s *SyncConn) Close() error {
115+
s.closer()
106116
return s.conn.Close()
107117
}
108118

adnl/dht/client_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ type MockADNL struct {
5353
close func()
5454
}
5555

56+
func (m MockADNL) GetPubKey() ed25519.PublicKey {
57+
return ed25519.PublicKey{}
58+
}
59+
60+
func (m MockADNL) Reinit() {
61+
}
62+
5663
func (m MockADNL) GetCloserCtx() context.Context {
5764
return context.Background()
5865
}

adnl/gateway.go

Lines changed: 23 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type Peer interface {
2929
GetCloserCtx() context.Context
3030
RemoteAddr() string
3131
GetID() []byte
32+
GetPubKey() ed25519.PublicKey
33+
Reinit()
3234
Close()
3335
}
3436

@@ -125,42 +127,11 @@ func NewGatewayWithListener(key ed25519.PrivateKey, listener func(addr string) (
125127
}
126128

127129
var DefaultListener = func(addr string) (net.PacketConn, error) {
128-
/*ra, err := net.ResolveUDPAddr("udp", *dstAddr)
129-
if err != nil {
130-
log.Fatal(err)
131-
}
132-
conn, err := net.DialUDP("udp", nil, ra)
133-
if err != nil {
134-
log.Fatal(err)
135-
}
136-
pconn := ipv6.NewPacketConn(conn)
137-
138-
pconn := ipv6.NewPacketConn(g.conn)
139-
140-
wb := make([]ipv6.Message, *batchSize)
141-
for i := 0; i < *batchSize; i++ {
142-
wb[i].Addr = ra
143-
wb[i].Buffers = [][]byte{make([]byte, *packetSize)}
144-
}
145-
146-
pconn.SendMsgs()*/
147-
/*lc := net.ListenConfig{
148-
Control: func(network, address string, c syscall.RawConn) error {
149-
var opErr error
150-
err := c.Control(func(fd uintptr) {
151-
opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
152-
})
153-
if err != nil {
154-
return err
155-
}
156-
return opErr
157-
},
158-
}*/
159130
lp, err := net.ListenPacket("udp", addr)
160131
if err != nil {
161132
return nil, err
162133
}
163-
return NewSyncConn(lp, 1*1024*1024), nil
134+
return NewSyncConn(lp, 512*1024), nil
164135
}
165136

166137
func (g *Gateway) GetAddressList() address.List {
@@ -430,20 +401,6 @@ func (g *Gateway) listen(rootId []byte) {
430401
}
431402
}
432403

433-
// TODO: processors pool
434-
/*func() {
435-
defer func() {
436-
if r := recover(); r != nil {
437-
Logger("critical error while processing packet at server:", r)
438-
}
439-
}()
440-
441-
if err := proc.processor(buf); err != nil {
442-
Logger("failed to process packet at server:", err)
443-
return
444-
}
445-
}()*/
446-
447404
if err := proc.processor(buf); err != nil {
448405
Logger("failed to process packet at server:", err)
449406
}
@@ -458,6 +415,18 @@ func (p *peerConn) checkUpdateAddr(addr net.Addr) {
458415
}
459416
}
460417

418+
func (g *Gateway) GetActivePeers() []Peer {
419+
g.mx.RLock()
420+
defer g.mx.RUnlock()
421+
422+
peers := make([]Peer, 0, len(g.peers))
423+
for _, p := range g.peers {
424+
peers = append(peers, p)
425+
}
426+
427+
return peers
428+
}
429+
461430
func (g *Gateway) registerClient(addr net.Addr, key ed25519.PublicKey, id string) (*peerConn, error) {
462431
g.mx.Lock()
463432
defer g.mx.Unlock()
@@ -586,6 +555,14 @@ func (p *peerConn) GetID() []byte {
586555
return p.client.GetID()
587556
}
588557

558+
func (p *peerConn) Reinit() {
559+
p.client.Reinit()
560+
}
561+
562+
func (p *peerConn) GetPubKey() ed25519.PublicKey {
563+
return p.client.GetPubKey()
564+
}
565+
589566
func (p *peerConn) SetCustomMessageHandler(handler func(msg *MessageCustom) error) {
590567
p.client.SetCustomMessageHandler(handler)
591568
}

adnl/rldp/rldp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (m *MessagePart) Parse(data []byte) ([]byte, error) {
7979
copy(transfer, data)
8080

8181
var fec FECRaptorQ
82-
data, err := tl.Parse(fec, data[32:], true)
82+
data, err := tl.Parse(&fec, data[32:], true)
8383
if err != nil {
8484
return nil, err
8585
}

0 commit comments

Comments
 (0)