Skip to content

Commit 49d5e00

Browse files
committed
Fix lock contention
- Make InitNetwork() conditional. - Remove event thread, and move the logic into the main thread. There is no advantage in having two threads, because they both use the same mutex. This also prevents internal lock contention. - Increased throughput and lower latency due to two changes mentioned above. - Remove use of ClientQueueEnqueue(*Object, *Client) outside of the server's mutex. This was only called in case AllocateMemory failed, so basically never. This was mainly a problem on linux systems, as the mutexes seem to work a bit different than on windows.
1 parent 1af0ab7 commit 49d5e00

File tree

3 files changed

+33
-92
lines changed

3 files changed

+33
-92
lines changed

Includes/WebSocket_Server.pbi

Lines changed: 30 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
;
33
; The MIT License (MIT)
44
;
5-
; Copyright (c) 2015-2022 David Vogel
5+
; Copyright (c) 2015-2025 David Vogel
66
;
77
; Permission is hereby granted, free of charge, to any person obtaining a copy
88
; of this software and associated documentation files (the "Software"), to deal
@@ -140,6 +140,13 @@
140140
;
141141
; - V1.006 (05.12.2022)
142142
; - Add Get_HTTP_Header function and make HTTP_Header structure public
143+
;
144+
; - V1.007 (04.04.2025)
145+
; - Make InitNetwork() conditional.
146+
; - Remove event thread, and move the logic into the main thread. There is no advantage in having two threads, because they both use the same mutex. This also prevents internal lock contention.
147+
; - Add delay of 0s to help with external lock contention, this is useful when a user calls server API while the server is busy with sending data.
148+
; - Increased throughput and lower latency due to two changes mentioned above.
149+
; - Remove use of ClientQueueEnqueue(*Object, *Client) outside of the server's mutex. This was only called in case AllocateMemory failed, so basically never.
143150

144151
; ##################################################### Check Compiler options ######################################
145152

@@ -251,7 +258,9 @@ Module WebSocket_Server
251258
; #### Only use this for debugging purposes.
252259
;XIncludeFile "AllocationDumper.pbi"
253260

261+
CompilerIf #PB_Compiler_Version < 600
254262
InitNetwork()
263+
CompilerEndIf
255264
UseSHA1Fingerprint()
256265

257266
; ##################################################### Constants ###################################################
@@ -332,7 +341,6 @@ Module WebSocket_Server
332341

333342
List Client.Client()
334343
List *ClientQueue.Client() ; A queue of clients that need to be processed in Event_Callback().
335-
ClientQueueSemaphore.i ; Semaphore for the client queue.
336344

337345
*Event_Thread_Callback.Event_Callback
338346

@@ -371,7 +379,7 @@ Module WebSocket_Server
371379
ProcedureReturn #False
372380
EndProcedure
373381

374-
Procedure ClientQueueEnqueue(*Object.Object, *Client.Client, signal=#True)
382+
Procedure ClientQueueEnqueue(*Object.Object, *Client.Client)
375383
If *Client\Enqueued
376384
ProcedureReturn #True
377385
EndIf
@@ -380,13 +388,6 @@ Module WebSocket_Server
380388
If AddElement(*Object\ClientQueue())
381389
*Client\Enqueued = #True
382390
*Object\ClientQueue() = *Client
383-
384-
If *Object\ClientQueueSemaphore And signal
385-
; #### Set semaphore to 1, but don't increase count above 1.
386-
TrySemaphore(*Object\ClientQueueSemaphore)
387-
SignalSemaphore(*Object\ClientQueueSemaphore)
388-
EndIf
389-
390391
ProcedureReturn #True
391392
EndIf
392393

@@ -407,10 +408,6 @@ Module WebSocket_Server
407408
EndProcedure
408409

409410
Procedure ClientQueueRemove(*Object.Object, *Client.Client)
410-
If Not *Client\Enqueued
411-
ProcedureReturn #True
412-
EndIf
413-
414411
ForEach *Object\ClientQueue()
415412
If *Object\ClientQueue() = *Client
416413
DeleteElement(*Object\ClientQueue())
@@ -422,11 +419,6 @@ Module WebSocket_Server
422419
ProcedureReturn #False
423420
EndProcedure
424421

425-
Procedure ClientQueueWait(*Object.Object)
426-
; #### Wait for signal.
427-
WaitSemaphore(*Object\ClientQueueSemaphore)
428-
EndProcedure
429-
430422
Procedure Client_Free(*Client.Client)
431423
; #### Free all RX_Frames()
432424
While FirstElement(*Client\RX_Frame())
@@ -851,11 +843,11 @@ Module WebSocket_Server
851843
Repeat
852844
; #### Network Events
853845
Counter = 0
846+
847+
LockMutex(*Object\Mutex)
854848
Repeat
855-
LockMutex(*Object\Mutex)
856849
Select NetworkServerEvent(*Object\Server_ID)
857850
Case #PB_NetworkEvent_None
858-
UnlockMutex(*Object\Mutex)
859851
Break
860852

861853
Case #PB_NetworkEvent_Connect
@@ -882,19 +874,15 @@ Module WebSocket_Server
882874
Counter + 1
883875

884876
EndSelect
885-
UnlockMutex(*Object\Mutex)
886-
887-
If ListSize(*Object\ClientQueue()) > 100
888-
Delay(1)
889-
EndIf
890-
891877
Until Counter > 10
878+
UnlockMutex(*Object\Mutex)
892879

893-
; #### Busy when there was at least one network event
880+
; #### Busy when there was at least one network event.
894881
Busy = Bool(Counter > 0)
895882

896-
;While Event_Callback(*Object, *Object\Event_Thread_Callback)
897-
;Wend
883+
; #### Handle the enqueued clients and issue callbacks.
884+
While Event_Callback(*Object, *Object\Event_Thread_Callback)
885+
Wend
898886

899887
LockMutex(*Object\Mutex)
900888
;Debug "Queue: " + ListSize(*Object\ClientQueue()) + " Clients: " + ListSize(*Object\Client())
@@ -909,6 +897,8 @@ Module WebSocket_Server
909897

910898
; #### Handle timeouts: Check if a client timed out before the handshake was successful.
911899
If *Client\ConnectTimeout And *Client\ConnectTimeout <= ms
900+
*Client\ConnectTimeout = 0
901+
*Client\Event_Disconnect_Manually = #True
912902
ClientQueueEnqueue(*Object, *Client)
913903
EndIf
914904

@@ -922,41 +912,22 @@ Module WebSocket_Server
922912
; #### Delay only if there is nothing to do
923913
If Not Busy
924914
Delay(1)
915+
Else
916+
Delay(0) ; This is better than no delay, otherwise other threads don't have a chance to unlock the mutex.
925917
EndIf
926918

927919
Until *Object\Free
928920

929921
CloseNetworkServer(*Object\Server_ID) : *Object\Server_ID = #Null
930922

931-
; No need to care about the event thread, as it is shut down before cleanup happens here
932923
ForEach *Object\Client()
933-
ClientQueueRemove(*Object, *Object\Client())
934924
Client_Free(*Object\Client())
935925
Next
936926

937-
If *Object\ClientQueueSemaphore
938-
FreeSemaphore(*Object\ClientQueueSemaphore) : *Object\ClientQueueSemaphore = #Null
939-
EndIf
940-
941927
FreeMutex(*Object\Mutex) : *Object\Mutex = #Null
942928
FreeStructure(*Object)
943929
EndProcedure
944930

945-
Procedure Thread_Events(*Object.Object)
946-
Repeat
947-
; #### Wait for client queue entries.
948-
ClientQueueWait(*Object)
949-
950-
;Debug "New events to process"
951-
952-
; #### Process all events and callbacks. It's important that all events are processed.
953-
While Event_Callback(*Object, *Object\Event_Thread_Callback) And Not *Object\Free_Event
954-
;Debug "Processed one event"
955-
Wend
956-
;Debug "Processed all events"
957-
Until *Object\Free_Event
958-
EndProcedure
959-
960931
Procedure Frame_Send_Mutexless(*Object.Object, *Client.Client, FIN.a, RSV.a, Opcode.a, *Payload, Payload_Size.q)
961932
Protected *Pointer.Ascii
962933
Protected *Eight_Bytes.Eight_Bytes
@@ -1072,7 +1043,6 @@ Module WebSocket_Server
10721043
EndIf
10731044
*Temp = AllocateMemory(Temp_Size)
10741045
If Not *Temp
1075-
*Client\Event_Disconnect_Manually = #True : ClientQueueEnqueue(*Object, *Client)
10761046
ProcedureReturn #False
10771047
EndIf
10781048

@@ -1100,6 +1070,10 @@ Module WebSocket_Server
11001070
ProcedureReturn #False
11011071
EndIf
11021072

1073+
If *Object\Free
1074+
ProcedureReturn #False
1075+
EndIf
1076+
11031077
LockMutex(*Object\Mutex)
11041078

11051079
*Client = ClientQueueDequeue(*Object)
@@ -1115,18 +1089,12 @@ Module WebSocket_Server
11151089
*Client\Event_Connect = #False
11161090
*Client\ConnectTimeout = 0
11171091
*Client\External_Reference = #True
1118-
ClientQueueEnqueue(*Object, *Client)
11191092
UnlockMutex(*Object\Mutex)
11201093
*Callback(*Object, *Client, #Event_Connect)
11211094
LockMutex(*Object\Mutex)
11221095
Continue
11231096
EndIf
11241097

1125-
; #### Connect and handshake timeout. The client will be enqueued for this in Thread().
1126-
If *Client\ConnectTimeout And *Client\ConnectTimeout <= ElapsedMilliseconds()
1127-
*Client\Event_Disconnect_Manually = #True
1128-
EndIf
1129-
11301098
; #### Event: Client disconnected (TCP connection got terminated) (Only return this event if there are no incoming frames left to be read by the application)
11311099
If *Client\ID = #Null And ListSize(*Client\RX_Frame()) = 0
11321100
If *Client\External_Reference
@@ -1181,7 +1149,7 @@ Module WebSocket_Server
11811149
DeleteElement(*Client\RX_Frame())
11821150

11831151
; #### Enqueue again. Either because there are still frames to be read by the user, or because there are no frames anymore and the client can disconnect.
1184-
ClientQueueEnqueue(*Object, *Client)
1152+
;ClientQueueEnqueue(*Object, *Client)
11851153

11861154
; #### Check if any extension bit is set. This lib doesn't support any extensions.
11871155
If Event_Frame\RSV <> 0
@@ -1400,40 +1368,21 @@ Module WebSocket_Server
14001368
ProcedureReturn #Null
14011369
EndIf
14021370

1403-
If *Event_Thread_Callback
1404-
*Object\ClientQueueSemaphore = CreateSemaphore()
1405-
If Not *Object\ClientQueueSemaphore
1406-
FreeMutex(*Object\Mutex) : *Object\Mutex = #Null
1407-
FreeStructure(*Object)
1408-
ProcedureReturn #Null
1409-
EndIf
1410-
EndIf
1411-
14121371
*Object\Server_ID = CreateNetworkServer(#PB_Any, Port, #PB_Network_TCP)
14131372
If Not *Object\Server_ID
14141373
FreeMutex(*Object\Mutex) : *Object\Mutex = #Null
1415-
If *Object\ClientQueueSemaphore : FreeSemaphore(*Object\ClientQueueSemaphore) : *Object\ClientQueueSemaphore = #Null : EndIf
14161374
FreeStructure(*Object)
14171375
ProcedureReturn #Null
14181376
EndIf
14191377

14201378
*Object\Network_Thread_ID = CreateThread(@Thread(), *Object)
14211379
If Not *Object\Network_Thread_ID
14221380
FreeMutex(*Object\Mutex) : *Object\Mutex = #Null
1423-
If *Object\ClientQueueSemaphore : FreeSemaphore(*Object\ClientQueueSemaphore) : *Object\ClientQueueSemaphore = #Null : EndIf
14241381
CloseNetworkServer(*Object\Server_ID) : *Object\Server_ID = #Null
14251382
FreeStructure(*Object)
14261383
ProcedureReturn #Null
14271384
EndIf
14281385

1429-
If *Event_Thread_Callback
1430-
*Object\Event_Thread_ID = CreateThread(@Thread_Events(), *Object)
1431-
If Not *Object\Event_Thread_ID
1432-
*Object\Free = #True
1433-
ProcedureReturn #Null
1434-
EndIf
1435-
EndIf
1436-
14371386
ProcedureReturn *Object
14381387
EndProcedure
14391388

@@ -1445,11 +1394,6 @@ Module WebSocket_Server
14451394
; #### Fetch thread ID here, because the *Object is invalid some time after *Object\Free is set true
14461395
Protected Network_Thread_ID.i = *Object\Network_Thread_ID
14471396

1448-
If *Object\Event_Thread_ID
1449-
*Object\Free_Event = #True
1450-
SignalSemaphore(*Object\ClientQueueSemaphore) ; Misuse the semaphore to get the event thread to quit.
1451-
WaitThread(*Object\Event_Thread_ID)
1452-
EndIf
14531397
*Object\Free = #True
14541398
If Network_Thread_ID
14551399
WaitThread(Network_Thread_ID)
@@ -1459,10 +1403,9 @@ Module WebSocket_Server
14591403
EndProcedure
14601404

14611405
EndModule
1462-
1463-
; IDE Options = PureBasic 6.00 LTS (Windows - x64)
1464-
; CursorPosition = 142
1465-
; FirstLine = 120
1406+
; IDE Options = PureBasic 6.20 (Windows - x64)
1407+
; CursorPosition = 130
1408+
; FirstLine = 112
14661409
; Folding = ----
14671410
; EnableThread
14681411
; EnableXP

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
The MIT License (MIT)
22

3-
Copyright (c) 2015-2022 David Vogel
3+
Copyright (c) 2015-2025 David Vogel
44

55
Permission is hereby granted, free of charge, to any person obtaining a copy
66
of this software and associated documentation files (the "Software"), to deal

StressTest/StressTest.pb

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,11 @@ Repeat
5353
Delay(10)
5454
ForEver
5555

56-
; IDE Options = PureBasic 5.72 (Windows - x64)
57-
; CursorPosition = 49
58-
; FirstLine = 2
56+
; IDE Options = PureBasic 6.20 (Windows - x64)
57+
; CursorPosition = 7
5958
; Folding = -
6059
; EnableThread
6160
; EnableXP
6261
; Executable = stress-test-server.exe
63-
; Debugger = Standalone
6462
; EnablePurifier = 32,32,4096,2
6563
; EnableUnicode

0 commit comments

Comments
 (0)