Skip to content

Commit e9522e2

Browse files
committed
tcp: optimize memory for observatio
1 parent 15f806c commit e9522e2

File tree

5 files changed

+42
-29
lines changed

5 files changed

+42
-29
lines changed

message/options.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,3 +550,18 @@ func (options Options) ResetOptionsTo(buf []byte, in Options) (Options, int, err
550550
}
551551
return opts, used, nil
552552
}
553+
554+
// Clone create duplicates of options.
555+
func (options Options) Clone() (Options, error) {
556+
opts := make(Options, 0, len(options))
557+
buf := make([]byte, 64)
558+
opts, used, err := opts.ResetOptionsTo(buf, options)
559+
if err == ErrTooSmall {
560+
buf = append(buf, make([]byte, used-len(buf))...)
561+
opts, used, err = opts.ResetOptionsTo(buf, options)
562+
}
563+
if err != nil {
564+
return nil, err
565+
}
566+
return opts, nil
567+
}

tcp/clientconn.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,14 @@ func bwReleaseMessage(m blockwise.Message) {
106106
pool.ReleaseMessage(m.(*pool.Message))
107107
}
108108

109-
func bwCreateHandlerFunc(observatioRequests *kitSync.Map) func(token message.Token) (blockwise.Message, bool) {
109+
func bwCreateHandlerFunc(observationRequests *kitSync.Map) func(token message.Token) (blockwise.Message, bool) {
110110
return func(token message.Token) (blockwise.Message, bool) {
111-
msg, ok := observatioRequests.LoadWithFunc(token.String(), func(v interface{}) interface{} {
112-
r := v.(*pool.Message)
113-
d := pool.AcquireMessage(r.Context())
114-
d.ResetOptionsTo(r.Options())
115-
d.SetCode(r.Code())
116-
d.SetToken(r.Token())
111+
msg, ok := observationRequests.LoadWithFunc(token.String(), func(v interface{}) interface{} {
112+
r := v.(message.Message)
113+
d := pool.AcquireMessage(r.Context)
114+
d.ResetOptionsTo(r.Options)
115+
d.SetCode(r.Code)
116+
d.SetToken(r.Token)
117117
return d
118118
})
119119
if !ok {
@@ -134,7 +134,7 @@ func Client(conn net.Conn, opts ...DialOption) *ClientConn {
134134
cfg.errors = func(error) {}
135135
}
136136

137-
observatioRequests := kitSync.NewMap()
137+
observationRequests := kitSync.NewMap()
138138
var blockWise *blockwise.BlockWise
139139
if cfg.blockwiseEnable {
140140
blockWise = blockwise.NewBlockWise(
@@ -143,7 +143,7 @@ func Client(conn net.Conn, opts ...DialOption) *ClientConn {
143143
cfg.blockwiseTransferTimeout,
144144
cfg.errors,
145145
false,
146-
bwCreateHandlerFunc(observatioRequests),
146+
bwCreateHandlerFunc(observationRequests),
147147
)
148148
}
149149

@@ -162,7 +162,7 @@ func Client(conn net.Conn, opts ...DialOption) *ClientConn {
162162
cfg.disableTCPSignalMessageCSM,
163163
cfg.closeSocket,
164164
)
165-
cc := NewClientConn(session, observationTokenHandler, observatioRequests)
165+
cc := NewClientConn(session, observationTokenHandler, observationRequests)
166166

167167
go func() {
168168
err := cc.Run()

tcp/clientobserve.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,7 @@ func NewObservationHandler(obsertionTokenHandler *HandlerContainer, next Handler
3838

3939
func (o *Observation) cleanUp() {
4040
o.cc.observationTokenHandler.Pop(o.token)
41-
registeredRequest, ok := o.cc.observationRequests.PullOut(o.token.String())
42-
if ok {
43-
pool.ReleaseMessage(registeredRequest.(*pool.Message))
44-
}
41+
o.cc.observationRequests.PullOut(o.token.String())
4542
}
4643

4744
// Cancel remove observation from server. For recreate observation use Observe.
@@ -89,6 +86,7 @@ func (cc *ClientConn) Observe(ctx context.Context, path string, observeFunc func
8986
if err != nil {
9087
return nil, fmt.Errorf("cannot create observe request: %w", err)
9188
}
89+
defer pool.ReleaseMessage(req)
9290
token := req.Token()
9391
req.SetObserve(0)
9492
o := &Observation{
@@ -99,7 +97,19 @@ func (cc *ClientConn) Observe(ctx context.Context, path string, observeFunc func
9997
}
10098
respCodeChan := make(chan codes.Code, 1)
10199
waitForReponse := uint32(1)
102-
cc.observationRequests.Store(token.String(), req)
100+
101+
options, err := req.Options().Clone()
102+
if err != nil {
103+
return nil, fmt.Errorf("cannot clone options: %w", err)
104+
}
105+
106+
obs := message.Message{
107+
Context: req.Context(),
108+
Token: req.Token(),
109+
Code: req.Code(),
110+
Options: options,
111+
}
112+
cc.observationRequests.Store(token.String(), obs)
103113
err = o.cc.observationTokenHandler.Insert(token.String(), func(w *ResponseWriter, r *pool.Message) {
104114
code := r.Code()
105115
if atomic.CompareAndSwapUint32(&waitForReponse, 1, 0) {

tcp/message/pool/message.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,7 @@ func ConvertFrom(m *message.Message) (*Message, error) {
138138

139139
// ConvertTo converts pool message to common message.
140140
func ConvertTo(m *Message) (*message.Message, error) {
141-
opts := make(message.Options, 0, len(m.Options()))
142-
buf := make([]byte, 64)
143-
opts, used, err := opts.ResetOptionsTo(buf, m.Options())
144-
if err == message.ErrTooSmall {
145-
buf = append(buf, make([]byte, used-len(buf))...)
146-
opts, used, err = opts.ResetOptionsTo(buf, m.Options())
147-
}
141+
opts, err := m.Options().Clone()
148142
if err != nil {
149143
return nil, err
150144
}

udp/message/pool/message.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,13 +178,7 @@ func ConvertFrom(m *message.Message) (*Message, error) {
178178

179179
// ConvertTo converts pool message to common message.
180180
func ConvertTo(m *Message) (*message.Message, error) {
181-
opts := make(message.Options, 0, len(m.Options()))
182-
buf := make([]byte, 64)
183-
opts, used, err := opts.ResetOptionsTo(buf, m.Options())
184-
if err == message.ErrTooSmall {
185-
buf = append(buf, make([]byte, used-len(buf))...)
186-
opts, used, err = opts.ResetOptionsTo(buf, m.Options())
187-
}
181+
opts, err := m.Options().Clone()
188182
if err != nil {
189183
return nil, err
190184
}

0 commit comments

Comments
 (0)