Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit d5bda99

Browse files
Merge pull request #55 from AntonyVorontsov/feature/full-messages
Made message handlers consume BasicDeliverEventArgs instead of raw string messages.
2 parents 9bdb6e8 + 7759acc commit d5bda99

38 files changed

+294
-440
lines changed

docs/advanced-usage.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Let' say your application is a web API and you want to use both `IConsumingServi
1111
```c#
1212
public class Startup
1313
{
14-
private IConfiguration Configuration { get; }
14+
IConfiguration Configuration { get; }
1515

1616
public Startup(IConfiguration configuration)
1717
{

docs/changelog.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22

33
All notable changes to this library will be documented in this file.
44

5+
## [4.3.0] - 2020-10-03
6+
7+
### Added
8+
9+
- `BasicDeliverEventArgs` extensions for parsing messages.
10+
11+
### Changed
12+
13+
- **Breaking!** `IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler` and `IAsyncNonCyclicMessageHandler` get messages in `Handle` methods as `BasicDeliverEventArgs` instead of string values.
14+
- **Breaking!** `BatchMessageHandler` has been removed, `BaseBatchMessageHandler` is now one and only base class for handling messages in batches. `HandleMessages` method of `BaseBatchMessageHandler` gets a collection of messages as `BasicDeliverEventArgs` instead of bytes.
15+
516
## [4.2.0] - 2020-10-01
617

718
### Added

docs/message-consumption.md

Lines changed: 39 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public class Program
9191

9292
public class Worker : BackgroundService
9393
{
94-
private readonly IQueueService _queueService;
94+
readonly IQueueService _queueService;
9595

9696
public Worker(IQueueService queueService)
9797
{
@@ -109,16 +109,16 @@ The second step is to define classes that will take responsibility of handling r
109109

110110
### Synchronous message handlers
111111

112-
`IMessageHandler` consists of one method `Handle` that gets a message in a string format. You can deserialize that message (if it is a json message) or handle its raw value.
112+
`IMessageHandler` consists of one method `Handle` that gets a message. You can deserialize that message with `BasicDeliverEventArgs` extensions (described below).
113113
Thus, a message handler will look like this.
114114

115115
```c#
116116
public class CustomMessageHandler : IMessageHandler
117117
{
118-
public void Handle(string message, string routingKey)
118+
public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
119119
{
120120
// Do whatever you want.
121-
var messageObject = JsonConvert.DeserializeObject<YourClass>(message);
121+
var messageObject = eventArgs.GetPayload<YourClass>();
122122
}
123123
}
124124
```
@@ -134,9 +134,9 @@ public class CustomMessageHandler : IMessageHandler
134134
_logger = logger;
135135
}
136136

137-
public void Handle(string message, string routingKey)
137+
public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
138138
{
139-
_logger.LogInformation($"I got a message {message} by routing key {routingKey}");
139+
_logger.LogInformation($"I got a message {eventArgs.GetMessage()} by routing key {matchingRoute}");
140140
}
141141
}
142142
```
@@ -153,10 +153,10 @@ public class CustomNonCyclicMessageHandler : INonCyclicMessageHandler
153153
_logger = logger;
154154
}
155155

156-
public void Handle(string message, string routingKey, IQueueService queueService)
156+
public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute, IQueueService queueService)
157157
{
158158
_logger.LogInformation("Got a message. I will send it back to another queue.");
159-
var response = new { Message = message };
159+
var response = new { Message = eventArgs.GetMessage() };
160160
queueService.Send(response, "exchange.name", "routing.key");
161161
}
162162
}
@@ -171,7 +171,7 @@ public class CustomNonCyclicMessageHandler : INonCyclicMessageHandler
171171
```c#
172172
public class CustomAsyncMessageHandler : IAsyncMessageHandler
173173
{
174-
public async Task Handle(string message, string routingKey)
174+
public async Task Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
175175
{
176176
// Do whatever you want asynchronously!
177177
}
@@ -191,10 +191,10 @@ public class CustomAsyncNonCyclicMessageHandler : IAsyncNonCyclicMessageHandler
191191
_logger = logger;
192192
}
193193

194-
public async Task Handle(string message, string routingKey, IQueueService queueService)
194+
public async Task Handle(BasicDeliverEventArgs eventArgs, string matchingRoute, IQueueService queueService)
195195
{
196196
_logger.LogInformation("You can do something async, e.g. send message back.");
197-
var response = new { Message = message };
197+
var response = new { Message = eventArgs.GetMessage() };
198198
await queueService.SendAsync(response, "exchange.name", "routing.key");
199199
}
200200
}
@@ -306,21 +306,21 @@ services.AddRabbitMqClient(clientConfiguration)
306306
The message handling process organized as follows:
307307

308308
- `IQueueMessage` receives a message and delegates it to `IMessageHandlingService`.
309-
- `IMessageHandlingService` gets a message (as a byte array) and decodes it to the UTF8 string. It also checks if there are any message handlers in a combined collection of `IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler` and `IAsyncNonCyclicMessageHandler` instances and forwards a message to them.
309+
- `IMessageHandlingService` gets a message and checks if there are any message handlers in a combined collection of `IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler` and `IAsyncNonCyclicMessageHandler` instances and forwards a message to them.
310310
- All subscribed message handlers (`IMessageHandler`, `IAsyncMessageHandler`, `INonCyclicMessageHandler`, `IAsyncNonCyclicMessageHandler`) process the given message in a given or a default order.
311311
- `IMessageHandlingService` acknowledges the message by its `DeliveryTag`.
312312
- If any exception occurs `IMessageHandlingService` acknowledges the message anyway and checks if the message has to be re-send. If exchange option `RequeueFailedMessages` is set `true` then `IMessageHandlingService` adds a header `"re-queue-attempts"` to the message and sends it again with delay in value of `RequeueTimeoutMilliseconds` (default is 200 milliseconds). The number of attempts is configurable and re-delivery will be made that many times as the value of `RequeueAttempts` property. Mechanism of sending delayed messages covered in the message production [documentation](message-production.md).
313313
- If any exception occurs within handling the message that has been already re-sent that message will not be re-send again (re-send happens only once).
314314

315315
### Batch message handlers
316316

317-
There are also a feature that you can use in case of necessity of handling messages in batches.
318-
First of all you have to create a class that inherits a `BatchMessageHandler` class.
317+
There is a feature that you can use in case of necessity of handling messages in batches.
318+
First of all you have to create a class that inherits `BaseBatchMessageHandler`.
319319
You have to set up values for `QueueName` and `PrefetchCount` properties. These values are responsible for the queue that will be read by the message handler, and the size of batches of messages. You can also set a `MessageHandlingPeriod` property value and the method `HandleMessage` will be executed repeatedly so messages in unfilled batches could be processed too, but keep in mind that this property is optional.
320320
Be aware that batch message handlers **do not declare queues**, so if it does not exist an exception will be thrown. Either declare manually or using RabbitMqClient configuration features.
321321

322322
```c#
323-
public class CustomBatchMessageHandler : BatchMessageHandler
323+
public class CustomBatchMessageHandler : BaseBatchMessageHandler
324324
{
325325
readonly ILogger<CustomBatchMessageHandler> _logger;
326326

@@ -339,61 +339,51 @@ public class CustomBatchMessageHandler : BatchMessageHandler
339339

340340
public override TimeSpan? MessageHandlingPeriod { get; set; } = TimeSpan.FromMilliseconds(500);
341341

342-
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
342+
public override Task HandleMessages(IEnumerable<BasicDeliverEventArgs> messages, CancellationToken cancellationToken)
343343
{
344344
_logger.LogInformation("Handling a batch of messages.");
345345
foreach (var message in messages)
346346
{
347-
_logger.LogInformation(message);
347+
_logger.LogInformation(message.GetMessage());
348348
}
349349
return Task.CompletedTask;
350350
}
351351
}
352352
```
353353

354-
If you want to get raw messages as `ReadOnlyMemory<byte>` you can inherit base message handler class.
354+
After all you have to register that batch message handler via DI.
355355

356356
```c#
357-
public class CustomBatchMessageHandler : BaseBatchMessageHandler
358-
{
359-
readonly ILogger<CustomBatchMessageHandler> _logger;
357+
services.AddBatchMessageHandler<CustomBatchMessageHandler>(Configuration.GetSection("RabbitMq"));
358+
```
360359

361-
public CustomBatchMessageHandler(
362-
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
363-
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
364-
ILogger<CustomBatchMessageHandler> logger)
365-
: base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
366-
{
367-
_logger = logger;
368-
}
360+
The message handler will create a separate connection and use it for reading messages.
361+
When the message collection is full to the size of `PrefetchCount` it will be passed to the `HandleMessage` method.
369362

370-
public override ushort PrefetchCount { get; set; } = 3;
363+
### Parsing extensions
371364

372-
public override string QueueName { get; set; } = "queue.name";
365+
There are some simple extensions for `BasicDeliverEventArgs` class that helps to parse messages. You have to use `RabbitMQ.Client.Core.DependencyInjection` namespace to enable those extensions.
366+
There is an example of using those extensions inside a `Handle` method of `IMessageHandler`.
373367

374-
public override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
368+
```c#
369+
public class CustomMessageHandler : IMessageHandler
370+
{
371+
public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
375372
{
376-
_logger.LogInformation("Handling a batch of messages.");
377-
foreach (var message in messages)
378-
{
379-
var stringifiedMessage = Encoding.UTF8.GetString(message.ToArray());
380-
_logger.LogInformation(stringifiedMessage);
381-
}
382-
return Task.CompletedTask;
373+
// You can get string message.
374+
var stringifiedMessage = eventArgs.GetMessage();
375+
376+
// Or object payload.
377+
var payload = eventArgs.GetPayload<YourClass>();
378+
379+
// Or anonymous object by another example object.
380+
var anonymousObject = new { message = string.Empty, number = 0 };
381+
var anonymousPayload = eventArgs.GetAnonymousPayload(anonymousObject);
383382
}
384383
}
385384
```
386385

387-
After all you have to register that batch message handler via DI.
388-
```c#
389-
services.AddBatchMessageHandler<CustomBatchMessageHandler>(Configuration.GetSection("RabbitMq"));
390-
```
391-
392-
The message handler will create a separate connection and use it for reading messages.
393-
When the message collection is full to the size of `PrefetchCount` they are passed to the `HandleMessage` method.
394-
Both `BaseBatchMessageHandler` and `BatchMessageHandler` implement `IDisposable` interface, so you can use it for release of resources.
395-
396-
Use this method of getting messages only when you sure that the number of messages that pass through this queue is really huge. Otherwise, messages could stack in the temporary collection of messages waiting to get in full.
386+
You can also pass `JsonSerializerSettings` to `GetPayload` or `GetAnonymousPayload` methods as well as collection of `JsonConverter` in case you use custom serialization.
397387

398388
For message production features see the [Previous page](message-production.md)
399389

docs/message-production.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ It can be a custom service or a controller.
1010
[Route("api/[controller]")]
1111
public class HomeController : Controller
1212
{
13-
private readonly IQueueService _queueService;
13+
readonly IQueueService _queueService;
1414
public HomeController(IQueueService queueService)
1515
{
1616
_queueService = queueService;

examples/Examples.AdvancedConfiguration/MessageHandlers/CustomAsyncMessageHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
using System.Threading.Tasks;
22
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
3+
using RabbitMQ.Client.Events;
34

45
namespace Examples.AdvancedConfiguration.MessageHandlers
56
{
67
public class CustomAsyncMessageHandler : IAsyncMessageHandler
78
{
8-
public async Task Handle(string message, string routingKey)
9+
public async Task Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
910
{
1011
// The message handler does not do anything.
1112
// It is just an example.

examples/Examples.AdvancedConfiguration/MessageHandlers/CustomAsyncNonCyclicMessageHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
using System.Threading.Tasks;
22
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
33
using RabbitMQ.Client.Core.DependencyInjection.Services;
4+
using RabbitMQ.Client.Events;
45

56
namespace Examples.AdvancedConfiguration.MessageHandlers
67
{
78
public class CustomAsyncNonCyclicMessageHandler : IAsyncNonCyclicMessageHandler
89
{
9-
public async Task Handle(string message, string routingKey, IQueueService queueService)
10+
public async Task Handle(BasicDeliverEventArgs eventArgs, string matchingRoute, IQueueService queueService)
1011
{
1112
// The message handler does not do anything.
1213
// It is just an example.

examples/Examples.AdvancedConfiguration/MessageHandlers/CustomMessageHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
2+
using RabbitMQ.Client.Events;
23

34
namespace Examples.AdvancedConfiguration.MessageHandlers
45
{
56
public class CustomMessageHandler : IMessageHandler
67
{
7-
public void Handle(string message, string routingKey)
8+
public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
89
{
910
// The message handler does not do anything.
1011
// It is just an example.

examples/Examples.AdvancedConfiguration/MessageHandlers/CustomNonCyclicMessageHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
22
using RabbitMQ.Client.Core.DependencyInjection.Services;
3+
using RabbitMQ.Client.Events;
34

45
namespace Examples.AdvancedConfiguration.MessageHandlers
56
{
67
public class CustomNonCyclicMessageHandler : INonCyclicMessageHandler
78
{
8-
public void Handle(string message, string routingKey, IQueueService queueService)
9+
public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute, IQueueService queueService)
910
{
1011
// The message handler does not do anything.
1112
// It is just an example.

examples/Examples.AdvancedConfiguration/Startup.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace Examples.AdvancedConfiguration
99
{
1010
public class Startup
1111
{
12-
private IConfiguration Configuration { get; }
12+
IConfiguration Configuration { get; }
1313

1414
public Startup(IConfiguration configuration)
1515
{

examples/Examples.BatchMessageHandler/AnotherCustomBatchMessageHandler.cs

Lines changed: 0 additions & 42 deletions
This file was deleted.

0 commit comments

Comments
 (0)