6
6
using AutoMapper ;
7
7
using LeadCMS . Data ;
8
8
using LeadCMS . DataAnnotations ;
9
- using LeadCMS . Entities ;
10
9
using Microsoft . EntityFrameworkCore ;
11
10
using Npgsql ;
12
11
@@ -23,11 +22,12 @@ public class PostgresNotificationService : IHostedService, IDisposable
23
22
private readonly IConfiguration configuration ;
24
23
private readonly IMapper mapper ;
25
24
private readonly HashSet < Type > supportedTypes ;
26
-
25
+
27
26
private NpgsqlConnection ? notificationConnection ;
28
27
private Task ? listeningTask ;
29
28
private Timer ? pollingTimer ;
30
29
private CancellationTokenSource ? cancellationTokenSource ;
30
+ private CancellationTokenSource ? listeningCancellationTokenSource ;
31
31
private bool isListening = false ;
32
32
private int lastPolledChangeLogId = 0 ;
33
33
@@ -49,10 +49,10 @@ public PostgresNotificationService(
49
49
public Task StartAsync ( CancellationToken cancellationToken )
50
50
{
51
51
cancellationTokenSource = new CancellationTokenSource ( ) ;
52
-
52
+
53
53
// Start monitoring client connections
54
54
_ = Task . Run ( MonitorClientConnections , cancellationToken ) ;
55
-
55
+
56
56
logger . LogInformation ( "PostgresNotificationService started" ) ;
57
57
return Task . CompletedTask ;
58
58
}
@@ -77,6 +77,7 @@ public void Dispose()
77
77
finally
78
78
{
79
79
cancellationTokenSource ? . Dispose ( ) ;
80
+ listeningCancellationTokenSource ? . Dispose ( ) ;
80
81
notificationConnection ? . Dispose ( ) ;
81
82
pollingTimer ? . Dispose ( ) ;
82
83
}
@@ -92,7 +93,7 @@ private async Task MonitorClientConnections()
92
93
try
93
94
{
94
95
var clientCount = clientManager . ConnectedClientCount ;
95
-
96
+
96
97
if ( clientCount > 0 && ! isListening )
97
98
{
98
99
// First client connected - start listening
@@ -136,7 +137,7 @@ private async Task StartListening()
136
137
// This determines what changes we need to poll for
137
138
using var scope = serviceProvider . CreateScope ( ) ;
138
139
var dbContext = scope . ServiceProvider . GetRequiredService < PgDbContext > ( ) ;
139
-
140
+
140
141
var minClientLastId = clientManager . GetMinimumLastChangeLogId ( ) ;
141
142
if ( minClientLastId . HasValue )
142
143
{
@@ -168,9 +169,9 @@ private async Task StartListening()
168
169
}
169
170
170
171
// Setup PostgreSQL NOTIFY listener
171
- var connectionString = configuration . GetConnectionString ( "DefaultConnection" ) ??
172
+ var connectionString = configuration . GetConnectionString ( "DefaultConnection" ) ??
172
173
BuildConnectionString ( ) ;
173
-
174
+
174
175
notificationConnection = new NpgsqlConnection ( connectionString ) ;
175
176
await notificationConnection . OpenAsync ( cancellationTokenSource ! . Token ) ;
176
177
@@ -188,8 +189,11 @@ private async Task StartListening()
188
189
189
190
logger . LogInformation ( "Successfully executed LISTEN entity_changes and draft_changes commands" ) ;
190
191
191
- // Start background listening task
192
- listeningTask = Task . Run ( ListenForNotifications , cancellationTokenSource . Token ) ;
192
+ // Create separate cancellation token for listening task
193
+ listeningCancellationTokenSource = new CancellationTokenSource ( ) ;
194
+
195
+ // Start background listening task with separate cancellation token
196
+ listeningTask = Task . Run ( ( ) => ListenForNotifications ( listeningCancellationTokenSource . Token ) , cancellationTokenSource . Token ) ;
193
197
194
198
// Start polling timer as Plan B (every 5 seconds)
195
199
pollingTimer = new Timer ( async _ => await PollForChanges ( ) , null , TimeSpan . FromSeconds ( 5 ) , TimeSpan . FromSeconds ( 5 ) ) ;
@@ -222,18 +226,41 @@ private async Task StopListening()
222
226
pollingTimer ? . Dispose ( ) ;
223
227
pollingTimer = null ;
224
228
225
- // Stop listening task
229
+ // Cancel the listening task (cancels WaitAsync)
230
+ if ( listeningCancellationTokenSource != null && ! listeningCancellationTokenSource . IsCancellationRequested )
231
+ {
232
+ listeningCancellationTokenSource . Cancel ( ) ;
233
+ }
234
+
235
+ // Wait for listening task to finish
236
+ if ( listeningTask != null )
237
+ {
238
+ try
239
+ {
240
+ await listeningTask ;
241
+ }
242
+ catch
243
+ {
244
+ /* ignore */
245
+ }
246
+ finally
247
+ {
248
+ listeningTask = null ;
249
+ }
250
+ }
251
+
252
+ // Now it is safe to execute commands and close the connection
226
253
if ( notificationConnection != null )
227
254
{
228
255
try
229
256
{
230
257
// Unlisten from both channels
231
258
using var cmd1 = new NpgsqlCommand ( "UNLISTEN entity_changes" , notificationConnection ) ;
232
259
await cmd1 . ExecuteNonQueryAsync ( ) ;
233
-
260
+
234
261
using var cmd2 = new NpgsqlCommand ( "UNLISTEN draft_changes" , notificationConnection ) ;
235
262
await cmd2 . ExecuteNonQueryAsync ( ) ;
236
-
263
+
237
264
logger . LogInformation ( "Successfully executed UNLISTEN commands for both entity_changes and draft_changes" ) ;
238
265
}
239
266
catch ( Exception ex )
@@ -257,11 +284,9 @@ private async Task StopListening()
257
284
}
258
285
}
259
286
260
- if ( listeningTask != null )
261
- {
262
- await listeningTask ;
263
- listeningTask = null ;
264
- }
287
+ // Dispose of the listening cancellation token source
288
+ listeningCancellationTokenSource ? . Dispose ( ) ;
289
+ listeningCancellationTokenSource = null ;
265
290
266
291
logger . LogInformation ( "Stopped PostgreSQL LISTEN and polling" ) ;
267
292
}
@@ -274,22 +299,22 @@ private async Task StopListening()
274
299
/// <summary>
275
300
/// Background task that waits for PostgreSQL notifications.
276
301
/// </summary>
277
- private async Task ListenForNotifications ( )
302
+ private async Task ListenForNotifications ( CancellationToken cancellationToken )
278
303
{
279
304
try
280
305
{
281
- while ( isListening && ! cancellationTokenSource ! . Token . IsCancellationRequested )
306
+ while ( isListening && ! cancellationToken . IsCancellationRequested )
282
307
{
283
308
if ( notificationConnection != null && notificationConnection . State == System . Data . ConnectionState . Open )
284
309
{
285
- await notificationConnection . WaitAsync ( cancellationTokenSource . Token ) ;
310
+ await notificationConnection . WaitAsync ( cancellationToken ) ;
286
311
}
287
312
else
288
313
{
289
314
// Connection is closed or null, wait a bit before retrying
290
315
logger . LogWarning ( "PostgreSQL notification connection is not available, waiting before retry" ) ;
291
- await Task . Delay ( 5000 , cancellationTokenSource . Token ) ;
292
-
316
+ await Task . Delay ( 5000 , cancellationToken ) ;
317
+
293
318
// Try to restart listening if connection is lost
294
319
if ( isListening )
295
320
{
@@ -315,7 +340,7 @@ private async Task ListenForNotifications()
315
340
{
316
341
try
317
342
{
318
- await Task . Delay ( 5000 , cancellationTokenSource . Token ) ;
343
+ await Task . Delay ( 5000 , cancellationToken ) ;
319
344
await StopListening ( ) ;
320
345
await StartListening ( ) ;
321
346
}
@@ -408,11 +433,11 @@ await clientManager.SendDraftNotificationAsync(
408
433
draft . Data ) ;
409
434
410
435
var thisDraftAt = draft . UpdatedAt ?? draft . CreatedAt ;
411
-
436
+
412
437
if ( maxSent == null || thisDraftAt > maxSent )
413
438
{
414
439
maxSent = thisDraftAt ;
415
- }
440
+ }
416
441
}
417
442
418
443
if ( maxSent != null )
@@ -514,7 +539,7 @@ await clientManager.SendNotificationAsync(
514
539
if ( maxSent == null || change . Id > maxSent )
515
540
{
516
541
maxSent = change . Id ;
517
- }
542
+ }
518
543
}
519
544
}
520
545
@@ -542,7 +567,7 @@ private async Task<Dictionary<int, object>> GetEntityData(PgDbContext dbContext,
542
567
// Find the entity type
543
568
var assembly = typeof ( PgDbContext ) . Assembly ; // Use the correct assembly
544
569
var type = assembly . GetTypes ( ) . FirstOrDefault ( t => t . Name == entityType ) ;
545
-
570
+
546
571
if ( type == null )
547
572
{
548
573
logger . LogWarning ( "Entity type {EntityType} not found" , entityType ) ;
@@ -602,7 +627,7 @@ private async Task<Dictionary<int, object>> GetEntityData(PgDbContext dbContext,
602
627
foreach ( var entity in entities )
603
628
{
604
629
var idValue = ( int ) entity . GetType ( ) . GetProperty ( "Id" ) ! . GetValue ( entity ) ! ;
605
-
630
+
606
631
try
607
632
{
608
633
if ( detailsDtoType != null )
0 commit comments