@@ -15,7 +15,7 @@ namespace RabbitMQ.Client.Core.DependencyInjection
15
15
/// <summary>
16
16
/// Implementation of the custom RabbitMQ queue service.
17
17
/// </summary>
18
- internal class QueueService : IQueueService
18
+ internal class QueueService : IQueueService , IDisposable
19
19
{
20
20
/// <summary>
21
21
/// RabbitMQ connection.
@@ -56,7 +56,9 @@ public QueueService(
56
56
IOptions < RabbitMqClientOptions > options )
57
57
{
58
58
if ( options is null )
59
+ {
59
60
throw new ArgumentException ( $ "Argument { nameof ( options ) } is null.", nameof ( options ) ) ;
61
+ }
60
62
61
63
_exchanges = exchanges ;
62
64
@@ -103,17 +105,22 @@ public void Dispose()
103
105
_connection . CallbackException -= HandleConnectionCallbackException ;
104
106
_connection . ConnectionRecoveryError -= HandleConnectionRecoveryError ;
105
107
}
108
+
106
109
if ( _channel != null )
107
110
{
108
111
_channel . CallbackException -= HandleChannelCallbackException ;
109
112
_channel . BasicRecoverOk -= HandleChannelBasicRecoverOk ;
110
113
}
111
114
112
115
if ( _channel ? . IsOpen == true )
116
+ {
113
117
_channel . Close ( ( int ) HttpStatusCode . OK , "Channel closed" ) ;
118
+ }
114
119
115
120
if ( _connection ? . IsOpen == true )
121
+ {
116
122
_connection . Close ( ) ;
123
+ }
117
124
}
118
125
119
126
/// <summary>
@@ -122,7 +129,9 @@ public void Dispose()
122
129
public void StartConsuming ( )
123
130
{
124
131
if ( _consumingStarted )
132
+ {
125
133
return ;
134
+ }
126
135
127
136
_consumer . Received += _receivedMessage ;
128
137
_consumingStarted = true ;
@@ -246,7 +255,9 @@ IBasicProperties CreateJsonProperties()
246
255
void HandleConnectionCallbackException ( object sender , CallbackExceptionEventArgs @event )
247
256
{
248
257
if ( @event is null )
258
+ {
249
259
return ;
260
+ }
250
261
251
262
_logger . LogError ( new EventId ( ) , @event . Exception , @event . Exception . Message , @event ) ;
252
263
throw @event . Exception ;
@@ -255,7 +266,9 @@ void HandleConnectionCallbackException(object sender, CallbackExceptionEventArgs
255
266
void HandleConnectionRecoveryError ( object sender , ConnectionRecoveryErrorEventArgs @event )
256
267
{
257
268
if ( @event is null )
269
+ {
258
270
return ;
271
+ }
259
272
260
273
_logger . LogError ( new EventId ( ) , @event . Exception , @event . Exception . Message , @event ) ;
261
274
throw @event . Exception ;
@@ -264,15 +277,19 @@ void HandleConnectionRecoveryError(object sender, ConnectionRecoveryErrorEventAr
264
277
void HandleChannelBasicRecoverOk ( object sender , EventArgs @event )
265
278
{
266
279
if ( @event is null )
280
+ {
267
281
return ;
282
+ }
268
283
269
284
_logger . LogInformation ( "Connection has been reestablished." ) ;
270
285
}
271
286
272
287
void HandleChannelCallbackException ( object sender , CallbackExceptionEventArgs @event )
273
288
{
274
289
if ( @event is null )
290
+ {
275
291
return ;
292
+ }
276
293
277
294
_logger . LogError ( new EventId ( ) , @event . Exception , @event . Exception . Message , @event ) ;
278
295
}
@@ -311,7 +328,7 @@ IDictionary<string, IList<T>> TransformMessageHandlersCollection<T>(IEnumerable<
311
328
}
312
329
else
313
330
{
314
- dictionary . Add ( routingKey , new List < T > ( ) { handler } ) ;
331
+ dictionary . Add ( routingKey , new List < T > { handler } ) ;
315
332
}
316
333
}
317
334
}
@@ -375,7 +392,9 @@ void StartClient()
375
392
Channel . BasicAck ( @event . DeliveryTag , false ) ;
376
393
377
394
if ( @event . BasicProperties . Headers is null )
395
+ {
378
396
@event . BasicProperties . Headers = new Dictionary < string , object > ( ) ;
397
+ }
379
398
380
399
var exchange = _exchanges . FirstOrDefault ( x => x . Name == @event . Exchange ) ;
381
400
if ( exchange is null )
@@ -393,7 +412,9 @@ void StartClient()
393
412
_logger . LogInformation ( "The failed message has been requeued." ) ;
394
413
}
395
414
else
415
+ {
396
416
_logger . LogInformation ( "The failed message would not be requeued." ) ;
417
+ }
397
418
}
398
419
} ;
399
420
@@ -462,10 +483,12 @@ void StartQueue(RabbitMqQueueOptions queue, string exchangeName)
462
483
{
463
484
// If there are not any routing keys then make a bind with a queue name.
464
485
foreach ( var route in queue . RoutingKeys )
486
+ {
465
487
_channel . QueueBind (
466
488
queue : queue . Name ,
467
489
exchange : exchangeName ,
468
490
routingKey : route ) ;
491
+ }
469
492
}
470
493
else
471
494
{
@@ -479,20 +502,28 @@ void StartQueue(RabbitMqQueueOptions queue, string exchangeName)
479
502
void ValidateArguments ( string exchangeName , string routingKey )
480
503
{
481
504
if ( string . IsNullOrEmpty ( exchangeName ) )
505
+ {
482
506
throw new ArgumentException ( $ "Argument { nameof ( exchangeName ) } is null or empty.", nameof ( exchangeName ) ) ;
507
+ }
483
508
if ( string . IsNullOrEmpty ( routingKey ) )
509
+ {
484
510
throw new ArgumentException ( $ "Argument { nameof ( routingKey ) } is null or empty.", nameof ( routingKey ) ) ;
511
+ }
485
512
486
513
var deadLetterExchanges = _exchanges . Select ( x => x . Options . DeadLetterExchange ) . Distinct ( ) ;
487
514
if ( ! _exchanges . Any ( x => x . Name == exchangeName ) && ! deadLetterExchanges . Any ( x => x == exchangeName ) )
515
+ {
488
516
throw new ArgumentException ( $ "Exchange { nameof ( exchangeName ) } has not been deaclared yet.", nameof ( exchangeName ) ) ;
517
+ }
489
518
}
490
519
491
520
string GetDeadLetterExchange ( string exchangeName )
492
521
{
493
522
var exchange = _exchanges . FirstOrDefault ( x => x . Name == exchangeName ) ;
494
523
if ( string . IsNullOrEmpty ( exchange . Options . DeadLetterExchange ) )
524
+ {
495
525
throw new ArgumentException ( $ "Exchange { nameof ( exchangeName ) } has not been configured with a dead letter exchange.", nameof ( exchangeName ) ) ;
526
+ }
496
527
497
528
return exchange . Options . DeadLetterExchange ;
498
529
}
0 commit comments