@@ -121,7 +121,7 @@ func WtfConnect(sc SlaveConfig, pool *redis.Pool, jobPool *redis.Pool, rHub *red
121
121
logger .WithFields (logrus.Fields {
122
122
"data" : string (data ),
123
123
}).Info ("receive start" )
124
- h , err := CommanRouter (data , jobPool , s . SocketId () )
124
+ h , err := CommanRouter (data , jobPool )
125
125
if err != nil {
126
126
logger .WithFields (logrus.Fields {
127
127
"data" : string (data ),
@@ -140,7 +140,7 @@ func WtfConnect(sc SlaveConfig, pool *redis.Pool, jobPool *redis.Pool, rHub *red
140
140
"data" : string (data ),
141
141
"pdata" : string (d ),
142
142
}).Info ("receive to sub" )
143
- res , err := h (appKey , s .GetAuth (), d , true )
143
+ res , err := h (appKey , s .GetAuth (), d , s . SocketId (), true )
144
144
if err != nil {
145
145
logger .WithFields (logrus.Fields {
146
146
"data" : string (data ),
@@ -208,7 +208,7 @@ func WsConnect(sc SlaveConfig, pool *redis.Pool, jobPool *redis.Pool, rHub *redi
208
208
defer s .Close ()
209
209
210
210
s .Listen (func (data []byte ) (b []byte , err error ) {
211
- h , err := CommanRouter (data , jobPool , s . SocketId () )
211
+ h , err := CommanRouter (data , jobPool )
212
212
if err != nil {
213
213
logger .WithError (err ).Info ("router error" )
214
214
return
@@ -222,7 +222,7 @@ func WsConnect(sc SlaveConfig, pool *redis.Pool, jobPool *redis.Pool, rHub *redi
222
222
if err != nil {
223
223
debug = false
224
224
}
225
- res , err := h (appKey , s .GetAuth (), d , debug )
225
+ res , err := h (appKey , s .GetAuth (), d , s . SocketId (), debug )
226
226
if err != nil {
227
227
logger .WithError (err ).Info ("handler error" )
228
228
return
@@ -244,7 +244,7 @@ func WsConnect(sc SlaveConfig, pool *redis.Pool, jobPool *redis.Pool, rHub *redi
244
244
}
245
245
246
246
}
247
- func MultiSubscribeCommand (appkey string , auth redisocket.Auth , data []byte , debug bool ) (msg * commandResponse , err error ) {
247
+ func MultiSubscribeCommand (appkey string , auth redisocket.Auth , data []byte , socketId string , debug bool ) (msg * commandResponse , err error ) {
248
248
249
249
multiChannel := make ([]string , 0 )
250
250
_ , err = jsonparser .ArrayEach (data , func (v []byte , dataType jsonparser.ValueType , offset int , err error ) {
@@ -286,6 +286,7 @@ func MultiSubscribeCommand(appkey string, auth redisocket.Auth, data []byte, deb
286
286
if len (subChannels ) > 0 {
287
287
msg .multiData = subChannels
288
288
command .Event = MultiSubscribeReplySucceeded
289
+ command .SocketId = socketId
289
290
command .Data .Channel = subChannels
290
291
reply , err = json .Marshal (command )
291
292
if err != nil {
@@ -297,6 +298,7 @@ func MultiSubscribeCommand(appkey string, auth redisocket.Auth, data []byte, deb
297
298
//TODO 需重構 不讓他進入訂閱模式
298
299
msg .cmdType = ""
299
300
command .Event = MultiSubscribeReplyError
301
+ command .SocketId = socketId
300
302
command .Data .Channel = multiChannel
301
303
reply , err = json .Marshal (command )
302
304
if err != nil {
@@ -309,7 +311,7 @@ func MultiSubscribeCommand(appkey string, auth redisocket.Auth, data []byte, deb
309
311
return
310
312
}
311
313
312
- func SubscribeCommand (appkey string , auth redisocket.Auth , data []byte , debug bool ) (msg * commandResponse , err error ) {
314
+ func SubscribeCommand (appkey string , auth redisocket.Auth , data []byte , socketId string , debug bool ) (msg * commandResponse , err error ) {
313
315
314
316
channel , err := jsonparser .GetString (data , "channel" )
315
317
if err != nil {
@@ -339,6 +341,7 @@ func SubscribeCommand(appkey string, auth redisocket.Auth, data []byte, debug bo
339
341
var reply []byte
340
342
if exist {
341
343
msg .data = channel
344
+ command .SocketId = socketId
342
345
command .Event = SubscribeReplySucceeded
343
346
command .Data .Channel = channel
344
347
reply , err = json .Marshal (command )
@@ -350,6 +353,7 @@ func SubscribeCommand(appkey string, auth redisocket.Auth, data []byte, debug bo
350
353
351
354
//TODO 需重構 不讓他進入訂閱模式
352
355
msg .cmdType = ""
356
+ command .SocketId = socketId
353
357
command .Event = SubscribeReplyError
354
358
command .Data .Channel = channel
355
359
reply , err = json .Marshal (command )
@@ -362,14 +366,15 @@ func SubscribeCommand(appkey string, auth redisocket.Auth, data []byte, debug bo
362
366
363
367
return
364
368
}
365
- func QueryChannelCommand (appkey string , auth redisocket.Auth , data []byte , debug bool ) (msg * commandResponse , err error ) {
369
+ func QueryChannelCommand (appkey string , auth redisocket.Auth , data []byte , socketId string , debug bool ) (msg * commandResponse , err error ) {
366
370
msg = & commandResponse {
367
371
handler : DefaultSubHandler ,
368
372
cmdType : "QUERYCHANNEL" ,
369
373
}
370
374
371
375
command := & QueryChannelResponse {}
372
376
command .Event = QueryChannelReplySucceeded
377
+ command .SocketId = socketId
373
378
command .Data = struct {
374
379
Channels []string `json:"channels"`
375
380
}{
@@ -384,14 +389,15 @@ func QueryChannelCommand(appkey string, auth redisocket.Auth, data []byte, debug
384
389
return
385
390
}
386
391
387
- func PingPongCommand (appkey string , auth redisocket.Auth , data []byte , debug bool ) (msg * commandResponse , err error ) {
392
+ func PingPongCommand (appkey string , auth redisocket.Auth , data []byte , socketId string , debug bool ) (msg * commandResponse , err error ) {
388
393
msg = & commandResponse {
389
394
handler : DefaultSubHandler ,
390
395
cmdType : "PING" ,
391
396
}
392
397
393
398
command := & PongResponse {}
394
399
command .Event = QueryChannelReplySucceeded
400
+ command .SocketId = socketId
395
401
command .Data = data
396
402
command .Time = time .Now ().Unix ()
397
403
@@ -402,8 +408,8 @@ func PingPongCommand(appkey string, auth redisocket.Auth, data []byte, debug boo
402
408
msg .msg = reply
403
409
return
404
410
}
405
- func Remote (pool * redis.Pool , socketId string ) func (string , redisocket.Auth , []byte , bool ) (msg * commandResponse , err error ) {
406
- return func (appkey string , auth redisocket.Auth , data []byte , debug bool ) (msg * commandResponse , err error ) {
411
+ func Remote (pool * redis.Pool ) func (string , redisocket.Auth , []byte , string , bool ) (msg * commandResponse , err error ) {
412
+ return func (appkey string , auth redisocket.Auth , data []byte , socketId string , debug bool ) (msg * commandResponse , err error ) {
407
413
408
414
remote , err := jsonparser .GetString (data , "remote" )
409
415
if err != nil {
@@ -429,6 +435,7 @@ func Remote(pool *redis.Pool, socketId string) func(string, redisocket.Auth, []b
429
435
//沒有這個remote 返回錯誤訊息不斷線
430
436
if ! ok || ! b {
431
437
command .Event = RemoteReplyError
438
+ command .SocketId = socketId
432
439
reply , err = json .Marshal (command )
433
440
if err != nil {
434
441
return
@@ -451,6 +458,7 @@ func Remote(pool *redis.Pool, socketId string) func(string, redisocket.Auth, []b
451
458
return
452
459
}
453
460
command .Event = RemoteReplySucceeded
461
+ command .SocketId = socketId
454
462
reply , err = json .Marshal (command )
455
463
if err != nil {
456
464
return
@@ -463,7 +471,7 @@ func Remote(pool *redis.Pool, socketId string) func(string, redisocket.Auth, []b
463
471
}
464
472
465
473
}
466
- func UnSubscribeCommand (appkey string , auth redisocket.Auth , data []byte , debug bool ) (msg * commandResponse , err error ) {
474
+ func UnSubscribeCommand (appkey string , auth redisocket.Auth , data []byte , socketId string , debug bool ) (msg * commandResponse , err error ) {
467
475
channel , err := jsonparser .GetString (data , "channel" )
468
476
if err != nil {
469
477
return
@@ -493,6 +501,7 @@ func UnSubscribeCommand(appkey string, auth redisocket.Auth, data []byte, debug
493
501
if exist {
494
502
msg .data = channel
495
503
command .Event = UnSubscribeReplySucceeded
504
+ command .SocketId = socketId
496
505
command .Data .Channel = channel
497
506
reply , err = json .Marshal (command )
498
507
if err != nil {
@@ -505,6 +514,7 @@ func UnSubscribeCommand(appkey string, auth redisocket.Auth, data []byte, debug
505
514
//TODO 需重構 先不讓他進入訂閱模式
506
515
msg .cmdType = ""
507
516
command .Event = UnSubscribeReplyError
517
+ command .SocketId = socketId
508
518
command .Data .Channel = channel
509
519
reply , err = json .Marshal (command )
510
520
if err != nil {
@@ -515,15 +525,15 @@ func UnSubscribeCommand(appkey string, auth redisocket.Auth, data []byte, debug
515
525
return
516
526
}
517
527
518
- func CommanRouter (data []byte , pool * redis.Pool , socketId string ) (fn func (appkey string , auth redisocket.Auth , data []byte , debug bool ) (msg * commandResponse , err error ), err error ) {
528
+ func CommanRouter (data []byte , pool * redis.Pool ) (fn func (appkey string , auth redisocket.Auth , data []byte , socketId string , debug bool ) (msg * commandResponse , err error ), err error ) {
519
529
520
530
val , err := jsonparser .GetString (data , "event" )
521
531
if err != nil {
522
532
return
523
533
}
524
534
switch val {
525
535
case RemoteEvent :
526
- return Remote (pool , socketId ), nil
536
+ return Remote (pool ), nil
527
537
case QueryChannelEvent :
528
538
return QueryChannelCommand , nil
529
539
case SubscribeEvent :
0 commit comments