Skip to content

Commit 6082510

Browse files
committed
[patch] remove mutex and use channels for all concurrent access scenarios
1 parent 169c545 commit 6082510

File tree

1 file changed

+86
-42
lines changed

1 file changed

+86
-42
lines changed

extensions/sse/client.go

Lines changed: 86 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package sse
33
import (
44
"context"
55
"net/http"
6-
"sync"
76
)
87

98
type ClientManager interface {
@@ -28,10 +27,55 @@ type Client struct {
2827
Ctx context.Context
2928
}
3029

30+
type eventType int
31+
32+
const (
33+
eTypeNewClient eventType = iota
34+
eTypeClientList
35+
eTypeRemoveClient
36+
eTypeActiveClientCount
37+
eTypeClient
38+
)
39+
40+
type event struct {
41+
Type eventType
42+
ClientID string
43+
Client *Client
44+
Response chan *eventResponse
45+
}
46+
type eventResponse struct {
47+
Clients []*Client
48+
RemainingClients int
49+
Client *Client
50+
}
3151
type Clients struct {
3252
clients map[string]*Client
33-
locker sync.Mutex
3453
MsgBuffer int
54+
events chan<- event
55+
}
56+
57+
func (cs *Clients) listener(events <-chan event) {
58+
for ev := range events {
59+
switch ev.Type {
60+
case eTypeNewClient:
61+
cs.clients[ev.Client.ID] = ev.Client
62+
case eTypeClientList:
63+
copied := make([]*Client, 0, len(cs.clients))
64+
for clientID := range cs.clients {
65+
copied = append(copied, cs.clients[clientID])
66+
}
67+
ev.Response <- &eventResponse{
68+
Clients: copied,
69+
}
70+
case eTypeRemoveClient:
71+
delete(cs.clients, ev.ClientID)
72+
ev.Response <- nil
73+
case eTypeClient:
74+
ev.Response <- &eventResponse{
75+
Client: cs.clients[ev.ClientID],
76+
}
77+
}
78+
}
3579
}
3680

3781
func (cs *Clients) New(ctx context.Context, w http.ResponseWriter, clientID string) (*Client, int) {
@@ -42,75 +86,75 @@ func (cs *Clients) New(ctx context.Context, w http.ResponseWriter, clientID stri
4286
ResponseWriter: w,
4387
Ctx: ctx,
4488
}
45-
46-
cs.locker.Lock()
47-
defer cs.locker.Unlock()
48-
49-
cs.clients[clientID] = cli
89+
cs.events <- event{
90+
Type: eTypeNewClient,
91+
Client: cli,
92+
}
5093

5194
return cli, len(cs.clients)
5295
}
5396

5497
func (cs *Clients) Range(f func(cli *Client)) {
55-
cs.locker.Lock()
56-
copied := make([]*Client, 0, len(cs.clients))
57-
for clientID := range cs.clients {
58-
copied = append(copied, cs.clients[clientID])
98+
rch := make(chan *eventResponse)
99+
cs.events <- event{
100+
Type: eTypeClientList,
101+
Response: rch,
59102
}
60-
cs.locker.Unlock()
61103

62-
for i := range copied {
63-
f(copied[i])
104+
response := <-rch
105+
for i := range response.Clients {
106+
f(response.Clients[i])
64107
}
65108
}
66109

67110
func (cs *Clients) Remove(clientID string) int {
68-
cs.locker.Lock()
69-
defer cs.locker.Unlock()
111+
rch := make(chan *eventResponse)
112+
cs.events <- event{
113+
Type: eTypeRemoveClient,
114+
ClientID: clientID,
115+
Response: rch,
116+
}
70117

71-
delete(cs.clients, clientID)
72-
count := len(cs.clients)
118+
<-rch
73119

74-
return count
120+
return len(cs.clients)
75121
}
76122

77123
func (cs *Clients) Active() int {
78-
cs.locker.Lock()
79-
defer cs.locker.Unlock()
80-
81-
count := len(cs.clients)
82-
return count
124+
return len(cs.clients)
83125

84126
}
85127

86128
// MessageChannels returns a slice of message channels of all clients
87129
// which you can then use to send message concurrently
88130
func (cs *Clients) Clients() []*Client {
89-
idx := 0
90-
cs.locker.Lock()
91-
defer cs.locker.Unlock()
92-
93-
list := make([]*Client, len(cs.clients))
94-
for clientID := range cs.clients {
95-
cli := cs.clients[clientID]
96-
list[idx] = cli
97-
idx++
131+
rch := make(chan *eventResponse)
132+
cs.events <- event{
133+
Type: eTypeClientList,
134+
Response: rch,
98135
}
99136

100-
return list
137+
response := <-rch
138+
return response.Clients
101139
}
102140

103141
func (cs *Clients) Client(clientID string) *Client {
104-
cs.locker.Lock()
105-
defer cs.locker.Unlock()
106-
cli := cs.clients[clientID]
107-
108-
return cli
142+
rch := make(chan *eventResponse)
143+
cs.events <- event{
144+
Type: eTypeClientList,
145+
Response: rch,
146+
}
147+
cli := <-rch
148+
return cli.Client
109149
}
110150

111151
func NewClientManager() ClientManager {
112-
return &Clients{
113-
clients: make(map[string]*Client),
114-
locker: sync.Mutex{},
152+
events := make(chan event, 10)
153+
cli := &Clients{
154+
clients: make(map[string]*Client),
155+
events: events,
156+
MsgBuffer: 10,
115157
}
158+
go cli.listener(events)
159+
return cli
116160
}

0 commit comments

Comments
 (0)