Skip to content

Commit ec27cba

Browse files
committed
observation: cleanup channel after send response
1 parent e9522e2 commit ec27cba

File tree

2 files changed

+89
-61
lines changed

2 files changed

+89
-61
lines changed

tcp/clientobserve.go

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,56 @@ import (
1313
"github.com/plgd-dev/go-coap/v2/tcp/message/pool"
1414
)
1515

16+
func NewObservationHandler(obsertionTokenHandler *HandlerContainer, next HandlerFunc) HandlerFunc {
17+
return func(w *ResponseWriter, r *pool.Message) {
18+
v, err := obsertionTokenHandler.Get(r.Token())
19+
if err != nil {
20+
next(w, r)
21+
return
22+
}
23+
v(w, r)
24+
}
25+
}
26+
1627
//Observation represents subscription to resource on the server
1728
type Observation struct {
18-
token message.Token
19-
path string
29+
token message.Token
30+
path string
31+
cc *ClientConn
32+
observeFunc func(req *pool.Message)
33+
respCodeChan chan codes.Code
34+
2035
obsSequence uint32
2136
etag []byte
22-
cc *ClientConn
2337
lastEvent time.Time
38+
mutex sync.Mutex
2439

25-
mutex sync.Mutex
40+
waitForReponse uint32
2641
}
2742

28-
func NewObservationHandler(obsertionTokenHandler *HandlerContainer, next HandlerFunc) HandlerFunc {
29-
return func(w *ResponseWriter, r *pool.Message) {
30-
v, err := obsertionTokenHandler.Get(r.Token())
31-
if err != nil {
32-
next(w, r)
33-
return
43+
func newObservation(token message.Token, path string, cc *ClientConn, observeFunc func(req *pool.Message), respCodeChan chan codes.Code) *Observation {
44+
return &Observation{
45+
token: token,
46+
path: path,
47+
obsSequence: 0,
48+
cc: cc,
49+
waitForReponse: 1,
50+
respCodeChan: respCodeChan,
51+
observeFunc: observeFunc,
52+
}
53+
}
54+
55+
func (o *Observation) handler(w *ResponseWriter, r *pool.Message) {
56+
code := r.Code()
57+
if atomic.CompareAndSwapUint32(&o.waitForReponse, 1, 0) {
58+
select {
59+
case o.respCodeChan <- code:
60+
default:
3461
}
35-
v(w, r)
62+
o.respCodeChan = nil
63+
}
64+
if o.wantBeNotified(r) {
65+
o.observeFunc(r)
3666
}
3767
}
3868

@@ -89,14 +119,9 @@ func (cc *ClientConn) Observe(ctx context.Context, path string, observeFunc func
89119
defer pool.ReleaseMessage(req)
90120
token := req.Token()
91121
req.SetObserve(0)
92-
o := &Observation{
93-
token: token,
94-
path: path,
95-
obsSequence: 0,
96-
cc: cc,
97-
}
122+
98123
respCodeChan := make(chan codes.Code, 1)
99-
waitForReponse := uint32(1)
124+
o := newObservation(token, path, cc, observeFunc, respCodeChan)
100125

101126
options, err := req.Options().Clone()
102127
if err != nil {
@@ -110,18 +135,7 @@ func (cc *ClientConn) Observe(ctx context.Context, path string, observeFunc func
110135
Options: options,
111136
}
112137
cc.observationRequests.Store(token.String(), obs)
113-
err = o.cc.observationTokenHandler.Insert(token.String(), func(w *ResponseWriter, r *pool.Message) {
114-
code := r.Code()
115-
if atomic.CompareAndSwapUint32(&waitForReponse, 1, 0) {
116-
select {
117-
case respCodeChan <- code:
118-
default:
119-
}
120-
}
121-
if o.wantBeNotified(r) {
122-
observeFunc(r)
123-
}
124-
})
138+
err = o.cc.observationTokenHandler.Insert(token.String(), o.handler)
125139
defer func(err *error) {
126140
if *err != nil {
127141
o.cleanUp()

udp/client/clientobserve.go

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,6 @@ import (
1313
"github.com/plgd-dev/go-coap/v2/udp/message/pool"
1414
)
1515

16-
//Observation represents subscription to resource on the server
17-
type Observation struct {
18-
token message.Token
19-
path string
20-
obsSequence uint32
21-
etag []byte
22-
cc *ClientConn
23-
lastEvent time.Time
24-
25-
mutex sync.Mutex
26-
}
27-
2816
func NewObservationHandler(obsertionTokenHandler *HandlerContainer, next HandlerFunc) HandlerFunc {
2917
return func(w *ResponseWriter, r *pool.Message) {
3018
v, err := obsertionTokenHandler.Get(r.Token())
@@ -41,6 +29,34 @@ func NewObservationHandler(obsertionTokenHandler *HandlerContainer, next Handler
4129
}
4230
}
4331

32+
//Observation represents subscription to resource on the server
33+
type Observation struct {
34+
token message.Token
35+
path string
36+
cc *ClientConn
37+
observeFunc func(req *pool.Message)
38+
respCodeChan chan codes.Code
39+
40+
obsSequence uint32
41+
etag []byte
42+
lastEvent time.Time
43+
mutex sync.Mutex
44+
45+
waitForReponse uint32
46+
}
47+
48+
func newObservation(token message.Token, path string, cc *ClientConn, observeFunc func(req *pool.Message), respCodeChan chan codes.Code) *Observation {
49+
return &Observation{
50+
token: token,
51+
path: path,
52+
obsSequence: 0,
53+
cc: cc,
54+
waitForReponse: 1,
55+
respCodeChan: respCodeChan,
56+
observeFunc: observeFunc,
57+
}
58+
}
59+
4460
func (o *Observation) cleanUp() {
4561
o.cc.observationTokenHandler.Pop(o.token)
4662
registeredRequest, ok := o.cc.observationRequests.PullOut(o.token.String())
@@ -49,6 +65,20 @@ func (o *Observation) cleanUp() {
4965
}
5066
}
5167

68+
func (o *Observation) handler(w *ResponseWriter, r *pool.Message) {
69+
code := r.Code()
70+
if atomic.CompareAndSwapUint32(&o.waitForReponse, 1, 0) {
71+
select {
72+
case o.respCodeChan <- code:
73+
default:
74+
}
75+
o.respCodeChan = nil
76+
}
77+
if o.wantBeNotified(r) {
78+
o.observeFunc(r)
79+
}
80+
}
81+
5282
// Cancel remove observation from server. For recreate observation use Observe.
5383
func (o *Observation) Cancel(ctx context.Context) error {
5484
o.cleanUp()
@@ -97,27 +127,11 @@ func (cc *ClientConn) Observe(ctx context.Context, path string, observeFunc func
97127
}
98128
token := req.Token()
99129
req.SetObserve(0)
100-
o := &Observation{
101-
token: token,
102-
path: path,
103-
obsSequence: 0,
104-
cc: cc,
105-
}
106130
respCodeChan := make(chan codes.Code, 1)
107-
waitForReponse := uint32(1)
131+
o := newObservation(token, path, cc, observeFunc, respCodeChan)
132+
108133
cc.observationRequests.Store(token.String(), req)
109-
err = o.cc.observationTokenHandler.Insert(token.String(), func(w *ResponseWriter, r *pool.Message) {
110-
code := r.Code()
111-
if atomic.CompareAndSwapUint32(&waitForReponse, 1, 0) {
112-
select {
113-
case respCodeChan <- code:
114-
default:
115-
}
116-
}
117-
if o.wantBeNotified(r) {
118-
observeFunc(r)
119-
}
120-
})
134+
err = o.cc.observationTokenHandler.Insert(token.String(), o.handler)
121135
defer func(err *error) {
122136
if *err != nil {
123137
o.cleanUp()

0 commit comments

Comments
 (0)