Skip to content

Commit 7098e70

Browse files
Blockwise: Use hash instead of token for cache
Credit to @zworks-okada for initial work regarding rx transfers. Expanded to include tx. Closes #512
1 parent 2a95bd6 commit 7098e70

File tree

11 files changed

+103
-51
lines changed

11 files changed

+103
-51
lines changed

dtls/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ func Client(conn *dtls.Conn, opts ...udp.Option) *udpClient.Conn {
9090
v,
9191
cfg.BlockwiseTransferTimeout,
9292
cfg.Errors,
93-
func(token message.Token) (*pool.Message, bool) {
94-
return v.GetObservationRequest(token)
93+
func(hash uint64) (*pool.Message, bool) {
94+
return v.GetObservationRequest(hash)
9595
},
9696
)
9797
}

dtls/server/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ func (s *Server) createConn(connection *coapNet.Conn, inactivityMonitor udpClien
198198
v,
199199
s.cfg.BlockwiseTransferTimeout,
200200
s.cfg.Errors,
201-
func(token message.Token) (*pool.Message, bool) {
202-
return v.GetObservationRequest(token)
201+
func(hash uint64) (*pool.Message, bool) {
202+
return v.GetObservationRequest(hash)
203203
},
204204
)
205205
}

message/option.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type OptionID uint16
4747
| 35 | x | x | - | | Proxy-Uri | string | 1-1034 | (none) |
4848
| 39 | x | x | - | | Proxy-Scheme | string | 1-255 | (none) |
4949
| 60 | | | x | | Size1 | uint | 0-4 | (none) |
50+
| 292 | | | | x | Request-Tag | opaque | 0-8 | (none) |
5051
+-----+----+---+---+---+----------------+--------+--------+---------+
5152
C=Critical, U=Unsafe, N=NoCacheKey, R=Repeatable
5253
*/
@@ -73,6 +74,7 @@ const (
7374
ProxyScheme OptionID = 39
7475
Size1 OptionID = 60
7576
NoResponse OptionID = 258
77+
RequestTag OptionID = 292
7678
)
7779

7880
var optionIDToString = map[OptionID]string{
@@ -96,6 +98,7 @@ var optionIDToString = map[OptionID]string{
9698
ProxyScheme: "ProxyScheme",
9799
Size1: "Size1",
98100
NoResponse: "NoResponse",
101+
RequestTag: "RequestTag",
99102
}
100103

101104
func (o OptionID) String() string {
@@ -153,6 +156,7 @@ var CoapOptionDefs = map[OptionID]OptionDef{
153156
ProxyScheme: {ValueFormat: ValueString, MinLen: 1, MaxLen: 255},
154157
Size1: {ValueFormat: ValueUint, MinLen: 0, MaxLen: 4},
155158
NoResponse: {ValueFormat: ValueUint, MinLen: 0, MaxLen: 1},
159+
RequestTag: {ValueFormat: ValueOpaque, MinLen: 0, MaxLen: 8},
156160
}
157161

158162
// MediaType specifies the content format of a message.

net/blockwise/blockwise.go

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"hash/crc64"
89
"io"
10+
"net"
911
"time"
1012

1113
"github.com/dsnet/golib/memfile"
@@ -131,14 +133,17 @@ type Client interface {
131133
AcquireMessage(ctx context.Context) *pool.Message
132134
// return back the message to the pool for next use
133135
ReleaseMessage(m *pool.Message)
136+
137+
// The remote address for determining the endpoint pair
138+
RemoteAddr() net.Addr
134139
}
135140

136141
type BlockWise[C Client] struct {
137142
cc C
138143
receivingMessagesCache *cache.Cache[uint64, *messageGuard]
139144
sendingMessagesCache *cache.Cache[uint64, *pool.Message]
140145
errors func(error)
141-
getSentRequestFromOutside func(token message.Token) (*pool.Message, bool)
146+
getSentRequestFromOutside func(hash uint64) (*pool.Message, bool)
142147
expiration time.Duration
143148
}
144149

@@ -160,10 +165,10 @@ func New[C Client](
160165
cc C,
161166
expiration time.Duration,
162167
errors func(error),
163-
getSentRequestFromOutside func(token message.Token) (*pool.Message, bool),
168+
getSentRequestFromOutside func(hash uint64) (*pool.Message, bool),
164169
) *BlockWise[C] {
165170
if getSentRequestFromOutside == nil {
166-
getSentRequestFromOutside = func(message.Token) (*pool.Message, bool) { return nil, false }
171+
getSentRequestFromOutside = func(uint64) (*pool.Message, bool) { return nil, false }
167172
}
168173
return &BlockWise[C]{
169174
cc: cc,
@@ -214,11 +219,12 @@ func (b *BlockWise[C]) Do(r *pool.Message, maxSzx SZX, maxMessageSize uint32, do
214219
if !ok {
215220
expire = time.Now().Add(b.expiration)
216221
}
217-
_, loaded := b.sendingMessagesCache.LoadOrStore(r.Token().Hash(), cache.NewElement(r, expire, nil))
222+
matchableHash := generateMatchableHash(r.Options(), b.cc.RemoteAddr(), r.Code())
223+
_, loaded := b.sendingMessagesCache.LoadOrStore(matchableHash, cache.NewElement(r, expire, nil))
218224
if loaded {
219225
return nil, errors.New("invalid token")
220226
}
221-
defer b.sendingMessagesCache.Delete(r.Token().Hash())
227+
defer b.sendingMessagesCache.Delete(matchableHash)
222228
if r.Body() == nil {
223229
return do(r)
224230
}
@@ -282,9 +288,9 @@ func (b *BlockWise[C]) WriteMessage(request *pool.Message, maxSZX SZX, maxMessag
282288
if err != nil {
283289
return fmt.Errorf("cannot encode start sending message block option(%v,%v,%v): %w", maxSZX, 0, true, err)
284290
}
285-
291+
matchableHash := generateMatchableHash(request.Options(), b.cc.RemoteAddr(), request.Code())
286292
w := newWriteRequestResponse(b.cc, request)
287-
err = b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock)
293+
err = b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock, matchableHash)
288294
if err != nil {
289295
return fmt.Errorf("cannot start writing request: %w", err)
290296
}
@@ -333,8 +339,8 @@ func wantsToBeReceived(r *pool.Message) bool {
333339
return true
334340
}
335341

336-
func (b *BlockWise[C]) getSendingMessageCode(token uint64) (codes.Code, bool) {
337-
v := b.sendingMessagesCache.Load(token)
342+
func (b *BlockWise[C]) getSendingMessageCode(hash uint64) (codes.Code, bool) {
343+
v := b.sendingMessagesCache.Load(hash)
338344
if v == nil {
339345
return codes.Empty, false
340346
}
@@ -348,19 +354,20 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa
348354
}
349355
token := r.Token()
350356

357+
matchableHash := generateMatchableHash(r.Options(), w.Conn().RemoteAddr(), r.Code())
358+
351359
if len(token) == 0 {
352-
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next)
360+
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next, matchableHash)
353361
if err != nil {
354362
b.sendEntityIncomplete(w, token)
355363
b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err))
356364
}
357365
return
358366
}
359-
tokenStr := token.Hash()
360367

361-
sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(tokenStr)
368+
sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(matchableHash)
362369
if !sendingMessageExist || wantsToBeReceived(r) {
363-
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next)
370+
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next, matchableHash)
364371
if err != nil {
365372
b.sendEntityIncomplete(w, token)
366373
b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err))
@@ -369,17 +376,17 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa
369376
}
370377
more, err := b.continueSendingMessage(w, r, maxSZX, maxMessageSize, sendingMessageCode)
371378
if err != nil {
372-
b.sendingMessagesCache.Delete(tokenStr)
379+
b.sendingMessagesCache.Delete(matchableHash)
373380
b.errors(fmt.Errorf("continueSendingMessage(%v): %w", r, err))
374381
return
375382
}
376383
// For codes GET,POST,PUT,DELETE, we want them to wait for pairing response and then delete them when the full response comes in or when timeout occurs.
377384
if !more && sendingMessageCode > codes.DELETE {
378-
b.sendingMessagesCache.Delete(tokenStr)
385+
b.sendingMessagesCache.Delete(matchableHash)
379386
}
380387
}
381388

382-
func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, next func(w *responsewriter.ResponseWriter[C], r *pool.Message)) error {
389+
func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), rx_hash uint64) error {
383390
startSendingMessageBlock, err := EncodeBlockOption(maxSZX, 0, true)
384391
if err != nil {
385392
return fmt.Errorf("cannot encode start sending message block option(%v,%v,%v): %w", maxSZX, 0, true, err)
@@ -411,7 +418,7 @@ func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C]
411418
return errP
412419
}
413420
}
414-
return b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock)
421+
return b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock, rx_hash)
415422
}
416423

417424
func (b *BlockWise[C]) createSendingMessage(sendingMessage *pool.Message, maxSZX SZX, maxMessageSize uint32, block uint32) (sendMessage *pool.Message, more bool, err error) {
@@ -504,7 +511,8 @@ func (b *BlockWise[C]) continueSendingMessage(w *responsewriter.ResponseWriter[C
504511
}
505512
var sendMessage *pool.Message
506513
var more bool
507-
b.sendingMessagesCache.LoadWithFunc(r.Token().Hash(), func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] {
514+
matchableHash := generateMatchableHash(r.Options(), w.Conn().RemoteAddr(), r.Code())
515+
b.sendingMessagesCache.LoadWithFunc(matchableHash, func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] {
508516
sendMessage, more, err = b.createSendingMessage(value.Data(), maxSZX, maxMessageSize, block)
509517
if err != nil {
510518
err = fmt.Errorf("cannot create sending message: %w", err)
@@ -529,7 +537,7 @@ func isObserveResponse(msg *pool.Message) bool {
529537
return msg.Code() >= codes.Created
530538
}
531539

532-
func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], maxSZX SZX, maxMessageSize uint32, block uint32) error {
540+
func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], maxSZX SZX, maxMessageSize uint32, block uint32, rx_hash uint64) error {
533541
payloadSize, err := w.Message().BodySize()
534542
if err != nil {
535543
return payloadSizeError(err)
@@ -552,16 +560,16 @@ func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C],
552560
if !ok {
553561
expire = time.Now().Add(b.expiration)
554562
}
555-
el, loaded := b.sendingMessagesCache.LoadOrStore(sendingMessage.Token().Hash(), cache.NewElement(originalSendingMessage, expire, nil))
563+
el, loaded := b.sendingMessagesCache.LoadOrStore(rx_hash, cache.NewElement(originalSendingMessage, expire, nil))
556564
if loaded {
557565
defer b.cc.ReleaseMessage(originalSendingMessage)
558566
return fmt.Errorf("cannot add message (%v) to sending message cache: message(%v) with token(%v) already exist", originalSendingMessage, el.Data(), sendingMessage.Token())
559567
}
560568
return nil
561569
}
562570

563-
func (b *BlockWise[C]) getSentRequest(token message.Token) *pool.Message {
564-
data, ok := b.sendingMessagesCache.LoadWithFunc(token.Hash(), func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] {
571+
func (b *BlockWise[C]) getSentRequest(hash uint64) *pool.Message {
572+
data, ok := b.sendingMessagesCache.LoadWithFunc(hash, func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] {
565573
if value == nil {
566574
return nil
567575
}
@@ -576,7 +584,7 @@ func (b *BlockWise[C]) getSentRequest(token message.Token) *pool.Message {
576584
if ok {
577585
return data.Data()
578586
}
579-
globalRequest, ok := b.getSentRequestFromOutside(token)
587+
globalRequest, ok := b.getSentRequestFromOutside(hash)
580588
if ok {
581589
return globalRequest
582590
}
@@ -595,7 +603,8 @@ func (b *BlockWise[C]) handleObserveResponse(sentRequest *pool.Message) (message
595603
validUntil := time.Now().Add(b.expiration) // context of observation can be expired.
596604
bwSentRequest := b.cloneMessage(sentRequest)
597605
bwSentRequest.SetToken(token)
598-
_, loaded := b.sendingMessagesCache.LoadOrStore(token.Hash(), cache.NewElement(bwSentRequest, validUntil, nil))
606+
matchableHash := generateMatchableHash(sentRequest.Options(), b.cc.RemoteAddr(), sentRequest.Code())
607+
_, loaded := b.sendingMessagesCache.LoadOrStore(matchableHash, cache.NewElement(bwSentRequest, validUntil, nil))
599608
if loaded {
600609
return nil, time.Time{}, errors.New("cannot process message: message with token already exist")
601610
}
@@ -674,7 +683,7 @@ func copyToPayloadFromOffset(r *pool.Message, payloadFile *memfile.File, offset
674683
return payloadSize, nil
675684
}
676685

677-
func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Message, tokenStr uint64, validUntil time.Time) (*pool.Message, func(), error) {
686+
func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Message, tokenStr uint64, matchableHash uint64, validUntil time.Time) (*pool.Message, func(), error) {
678687
cannotLockError := func(err error) error {
679688
return fmt.Errorf("processReceivedMessage: cannot lock message: %w", err)
680689
}
@@ -708,11 +717,11 @@ func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Messag
708717
return nil, nil, cannotLockError(errA)
709718
}
710719
appendToClose(mg)
711-
element, loaded := b.receivingMessagesCache.LoadOrStore(tokenStr, cache.NewElement(mg, validUntil, func(d *messageGuard) {
720+
element, loaded := b.receivingMessagesCache.LoadOrStore(matchableHash, cache.NewElement(mg, validUntil, func(d *messageGuard) {
712721
if d == nil {
713722
return
714723
}
715-
b.sendingMessagesCache.Delete(tokenStr)
724+
b.sendingMessagesCache.Delete(matchableHash)
716725
}))
717726
// request was already stored in cache, silently
718727
if loaded {
@@ -732,6 +741,41 @@ func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Messag
732741
return mg.Message, closeFn, nil
733742
}
734743

744+
/*
745+
RFC9175 1.1:
746+
Two request messages are said to be "matchable" if they occur between
747+
the same endpoint pair, have the same code, and have the same set of
748+
options, with the exception that elective NoCacheKey options and
749+
options involved in block-wise transfer (Block1, Block2, and Request-
750+
Tag) need not be the same. Two blockwise request operations are said
751+
to be matchable if their request messages are matchable.
752+
753+
This function concatenates the IDs and values of relevant options, the string representation of the remote address,
754+
and the code of the message to generate a hash that can be used to match requests.
755+
*/
756+
func generateMatchableHash(options message.Options, remoteAddr net.Addr, code codes.Code) uint64 {
757+
options_str := ""
758+
759+
input := make([]byte, 0, 512)
760+
761+
for _, opt := range options {
762+
options_str += opt.ID.String() + ","
763+
switch opt.ID {
764+
// Skip Blockwise Options and NoCacheKey Options
765+
case message.Block1, message.Block2, message.Size1, message.Size2, message.RequestTag:
766+
continue
767+
}
768+
input = append(input, byte(opt.ID))
769+
input = append(input, opt.Value...)
770+
}
771+
772+
input = append(input, []byte(remoteAddr.Network())...)
773+
input = append(input, []byte(remoteAddr.String())...)
774+
input = append(input, byte(code))
775+
776+
return crc64.Checksum(input, crc64.MakeTable(crc64.ISO))
777+
}
778+
735779
//nolint:gocyclo,gocognit
736780
func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSzx SZX, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), blockType message.OptionID, sizeType message.OptionID) error {
737781
token := r.Token()
@@ -755,7 +799,8 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C
755799
if err != nil {
756800
return fmt.Errorf("cannot decode block option: %w", err)
757801
}
758-
sentRequest := b.getSentRequest(token)
802+
matchableHash := generateMatchableHash(r.Options(), w.Conn().RemoteAddr(), r.Code())
803+
sentRequest := b.getSentRequest(matchableHash)
759804
if sentRequest != nil {
760805
defer b.cc.ReleaseMessage(sentRequest)
761806
}
@@ -772,7 +817,7 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C
772817

773818
tokenStr := token.Hash()
774819
var cachedReceivedMessageGuard *messageGuard
775-
if e := b.receivingMessagesCache.Load(tokenStr); e != nil {
820+
if e := b.receivingMessagesCache.Load(matchableHash); e != nil {
776821
cachedReceivedMessageGuard = e.Data()
777822
}
778823
if cachedReceivedMessageGuard == nil {
@@ -783,15 +828,15 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C
783828
return nil
784829
}
785830
}
786-
cachedReceivedMessage, closeCachedReceivedMessage, err := b.getCachedReceivedMessage(cachedReceivedMessageGuard, r, tokenStr, validUntil)
831+
cachedReceivedMessage, closeCachedReceivedMessage, err := b.getCachedReceivedMessage(cachedReceivedMessageGuard, r, tokenStr, matchableHash, validUntil)
787832
if err != nil {
788833
return err
789834
}
790835
defer closeCachedReceivedMessage()
791836

792837
defer func(err *error) {
793838
if *err != nil {
794-
b.receivingMessagesCache.Delete(tokenStr)
839+
b.receivingMessagesCache.Delete(matchableHash)
795840
}
796841
}(&err)
797842
payloadFile, payloadSize, err := b.getPayloadFromCachedReceivedMessage(r, cachedReceivedMessage)
@@ -805,12 +850,12 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C
805850
return fmt.Errorf("cannot copy data to payload: %w", err)
806851
}
807852
if !more {
808-
b.receivingMessagesCache.Delete(tokenStr)
853+
b.receivingMessagesCache.Delete(matchableHash)
809854
cachedReceivedMessage.Remove(blockType)
810855
cachedReceivedMessage.Remove(sizeType)
811856
cachedReceivedMessage.SetType(r.Type())
812857
if !bytes.Equal(cachedReceivedMessage.Token(), token) {
813-
b.sendingMessagesCache.Delete(tokenStr)
858+
b.sendingMessagesCache.Delete(matchableHash)
814859
}
815860
_, errS := cachedReceivedMessage.Body().Seek(0, io.SeekStart)
816861
if errS != nil {

net/blockwise/blockwise_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"io"
7+
"net"
78
"testing"
89
"time"
910

@@ -54,6 +55,10 @@ type testClient struct {
5455
p *pool.Pool
5556
}
5657

58+
func (c *testClient) RemoteAddr() net.Addr {
59+
return &net.IPAddr{IP: net.IPv4(127, 0, 0, 1)}
60+
}
61+
5762
func newTestClient() *testClient {
5863
return &testClient{
5964
p: pool.New(100, 1024),

net/client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ func (c *Client[C]) Observe(ctx context.Context, path string, observeFunc func(r
105105
return c.DoObserve(req, observeFunc)
106106
}
107107

108-
func (c *Client[C]) GetObservationRequest(token message.Token) (*pool.Message, bool) {
109-
return c.observationHandler.GetObservationRequest(token)
108+
func (c *Client[C]) GetObservationRequest(hash uint64) (*pool.Message, bool) {
109+
return c.observationHandler.GetObservationRequest(hash)
110110
}
111111

112112
// NewPostRequest creates post request.

0 commit comments

Comments
 (0)