Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 6433931

Browse files
Merge pull request #54 from AntonyVorontsov/feature/advanced-batch-message-handling
Added timer for batch message handlers.
2 parents 0de3417 + 49b1672 commit 6433931

File tree

12 files changed

+257
-26
lines changed

12 files changed

+257
-26
lines changed

docs/advanced-usage.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Advanced usage
22

3-
`IQueueService` is an interface that implements two other interfaces - `IConsumingService` and `IProducingService`. By default a RabbitMQ Client is registered as `IQueueService` without a logical separation at producing and consuming code. Thus, you can inject only a `IQueueService` instance, and `IConsumingService` or `IProducingService` won't be available. This is not a real deal until you want to control the way a RabbitMQ Client connects to the server.
3+
`IQueueService` is an interface that implements two other interfaces - `IConsumingService` and `IProducingService`. By default, a RabbitMQ Client is registered as `IQueueService` without a logical separation at producing and consuming code. Thus, you can inject only a `IQueueService` instance, and `IConsumingService` or `IProducingService` won't be available. This is not a real deal until you want to control the way a RabbitMQ Client connects to the server.
44

5-
An instance of `IQueueService` opens two connections to the RabbitMQ server, one is for message production and the other one is for message consumption. Normally a RabbitMQ Client is added in a singleton mode, so both connections stay opened while application is running. It is also noticeable that a RabbitMQ Client uses the same credentials for both connections. If you add `IQueueService` in a transient mode (via the `AddRabbitMqClientTransient` extension method) both connections will be opened each time `IQueueService` is being injected somewhere else. This behavior does not fit everybody, so you can change it a little.
5+
An instance of `IQueueService` opens two connections to the RabbitMQ server, one is for message production, and the other one is for message consumption. Normally a RabbitMQ Client is added in a singleton mode, so both connections stay opened while application is running. It is also noticeable that a RabbitMQ Client uses the same credentials for both connections. If you add `IQueueService` in the transient mode (via the `AddRabbitMqClientTransient` extension method) both connections will be opened each time `IQueueService` is being injected somewhere else. This behavior does not fit everybody, so you can change it a little.
66

77
You are allowed to register a RabbitMQ Client as an implementation of two interfaces that have been mentioned before - `IConsumingService` and `IProducingService`. Each interface defines its own connection and its own collection of methods, obviously, for message production and message consumption. You can also use different credentials for different connections, and there is an option `ClientProvidedName` which allows you to create a "named" connection (which will be easier to find in the RabbitMQ management UI). There is also a possibility of registering `IConsumingService` and `IProducingService` in different lifetime modes, in case you want your consumption connection to be persist (singleton `IConsumingService`) and open a connection each time you want to send a message (a transient `IProducingService`). This situation will be covered in code examples below.
88

docs/message-consumption.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,8 @@ The message handling process organized as follows:
316316

317317
There are also a feature that you can use in case of necessity of handling messages in batches.
318318
First of all you have to create a class that inherits a `BatchMessageHandler` class.
319-
You have to set up values for `QueueName` and `PrefetchCount` properties. These values are responsible for the queue that will be read by the message handler, and the size of batches of messages.
320-
You have to be aware that batch message handlers do not declare queues, so if it does not exists an exception will be thrown. Either declare manually or using RabbitMqClient configuration features.
319+
You have to set up values for `QueueName` and `PrefetchCount` properties. These values are responsible for the queue that will be read by the message handler, and the size of batches of messages. You can also set a `MessageHandlingPeriod` property value and the method `HandleMessage` will be executed repeatedly so messages in unfilled batches could be processed too, but keep in mind that this property is optional.
320+
Be aware that batch message handlers *do not declare queues*, so if it does not exist an exception will be thrown. Either declare manually or using RabbitMqClient configuration features.
321321

322322
```c#
323323
public class CustomBatchMessageHandler : BatchMessageHandler
@@ -337,6 +337,8 @@ public class CustomBatchMessageHandler : BatchMessageHandler
337337

338338
public override string QueueName { get; set; } = "another.queue.name";
339339

340+
public override TimeSpan? MessageHandlingPeriod { get; set; } = TimeSpan.FromMilliseconds(500);
341+
340342
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
341343
{
342344
_logger.LogInformation("Handling a batch of messages.");

examples/Examples.BatchMessageHandler/AnotherCustomBatchMessageHandler.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using System.Threading;
34
using System.Threading.Tasks;
@@ -25,6 +26,9 @@ public AnotherCustomBatchMessageHandler(
2526
// You have to be aware that BaseBatchMessageHandler does not declare the specified queue. So if it does not exists an exception will be thrown.
2627
public override string QueueName { get; set; } = "another.queue.name";
2728

29+
// This thing will fire message handling if there are not enough messages, but timeout is already off.
30+
public override TimeSpan? MessageHandlingPeriod { get; set; } = TimeSpan.FromMilliseconds(500);
31+
2832
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
2933
{
3034
_logger.LogInformation("Handling a batch of messages.");

readme.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,9 @@ public class CustomBatchMessageHandler : BatchMessageHandler
259259

260260
public override ushort PrefetchCount { get; set; } = 50;
261261

262-
public override string QueueName { get; set; } = "another.queue.name";
262+
public override string QueueName { get; set; } = "queue.name";
263+
264+
public override TimeSpan? MessageHandlingPeriod { get; set; } = TimeSpan.FromMilliseconds(500);
263265

264266
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
265267
{
@@ -279,7 +281,7 @@ services.AddBatchMessageHandler<CustomBatchMessageHandler>(Configuration.GetSect
279281
```
280282

281283
The message handler will create a separate connection and use it for reading messages.
282-
When the message collection is full to the size of `PrefetchCount` they are passed to the `HandleMessage` method. For more information, see the [message-consuming](./docs/message-consumption.md) documentation file.
284+
When the message collection is full to the size of `PrefetchCount` they are passed to the `HandleMessage` method. You can also set a `MessageHandlingPeriod` property value and the method `HandleMessage` will be executed repeatedly so messages in unfilled batches could be processed too. For more information, see the [message-consuming](./docs/message-consumption.md) documentation file.
283285

284286
## Advanced usage and nuances
285287

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BaseBatchMessageHandler.cs

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,19 @@ public abstract class BaseBatchMessageHandler : IHostedService, IDisposable
4343
/// Prefetch count value (batch size).
4444
/// </summary>
4545
public abstract ushort PrefetchCount { get; set; }
46+
47+
/// <summary>
48+
/// The TimeSpan period through which messages will be processing.
49+
/// </summary>
50+
public virtual TimeSpan? MessageHandlingPeriod { get; set; }
4651

4752
readonly IRabbitMqConnectionFactory _rabbitMqConnectionFactory;
4853
readonly RabbitMqClientOptions _clientOptions;
4954
readonly ILogger<BaseBatchMessageHandler> _logger;
5055

56+
readonly ConcurrentBag<BasicDeliverEventArgs> _messages = new ConcurrentBag<BasicDeliverEventArgs>();
57+
Timer _timer;
58+
readonly object _lock = new object();
5159
bool _disposed = false;
5260

5361
protected BaseBatchMessageHandler(
@@ -73,27 +81,61 @@ public Task StartAsync(CancellationToken cancellationToken)
7381
Connection = _rabbitMqConnectionFactory.CreateRabbitMqConnection(_clientOptions);
7482
Channel = Connection.CreateModel();
7583
Channel.BasicQos(PrefetchSize, PrefetchCount, false);
84+
85+
if (MessageHandlingPeriod != null)
86+
{
87+
_timer = new Timer(async _ => await ProcessBatchOfMessages(cancellationToken).ConfigureAwait(false), null, MessageHandlingPeriod.Value, MessageHandlingPeriod.Value);
88+
}
7689

77-
var messages = new ConcurrentBag<BasicDeliverEventArgs>();
7890
var consumer = _rabbitMqConnectionFactory.CreateConsumer(Channel);
7991
consumer.Received += async (sender, eventArgs) =>
8092
{
81-
messages.Add(eventArgs);
82-
if (messages.Count < PrefetchCount)
93+
lock (_lock)
8394
{
84-
return;
95+
_messages.Add(eventArgs);
96+
if (_messages.Count < PrefetchCount)
97+
{
98+
return;
99+
}
85100
}
86101

87-
var byteMessages = messages.Select(x => x.Body).ToList();
88-
await HandleMessages(byteMessages, cancellationToken).ConfigureAwait(false);
89-
var latestDeliveryTag = messages.Max(x => x.DeliveryTag);
90-
messages.Clear();
91-
Channel.BasicAck(latestDeliveryTag, true);
102+
await ProcessBatchOfMessages(cancellationToken).ConfigureAwait(false);
92103
};
104+
93105
Channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
94106
return Task.CompletedTask;
95107
}
96108

109+
async Task ProcessBatchOfMessages(CancellationToken cancellationToken)
110+
{
111+
112+
var messages = GetMessages();
113+
if (!messages.Any())
114+
{
115+
return;
116+
}
117+
118+
var byteMessages = messages.Select(x => x.Body).ToList();
119+
await HandleMessages(byteMessages, cancellationToken).ConfigureAwait(false);
120+
var latestDeliveryTag = messages.Max(x => x.DeliveryTag);
121+
Channel.BasicAck(latestDeliveryTag, true);
122+
}
123+
124+
IList<BasicDeliverEventArgs> GetMessages()
125+
{
126+
lock (_lock)
127+
{
128+
if (!_messages.Any())
129+
{
130+
return new List<BasicDeliverEventArgs>();
131+
}
132+
133+
var messages = _messages.ToList();
134+
_messages.Clear();
135+
return messages;
136+
}
137+
}
138+
97139
void ValidateProperties()
98140
{
99141
if (string.IsNullOrEmpty(QueueName))
@@ -117,6 +159,7 @@ void ValidateProperties()
117159

118160
public Task StopAsync(CancellationToken cancellationToken)
119161
{
162+
_timer?.Change(Timeout.Infinite, 0);
120163
_logger.LogInformation($"Batch message handler {GetType()} has been stopped.");
121164
return Task.CompletedTask;
122165
}
@@ -130,6 +173,7 @@ protected virtual void Dispose(bool disposing)
130173

131174
if (disposing)
132175
{
176+
_timer?.Dispose();
133177
Connection?.Dispose();
134178
Channel?.Dispose();
135179
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/RabbitMQ.Client.Core.DependencyInjection.Tests.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
</ItemGroup>
1111

1212
<ItemGroup>
13-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.1" />
14-
<PackageReference Include="Moq" Version="4.14.1" />
13+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.7.1" />
14+
<PackageReference Include="Moq" Version="4.14.5" />
1515
<PackageReference Include="xunit" Version="2.4.1" />
16-
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
16+
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
1717
</ItemGroup>
1818

1919
</Project>

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/Stubs/StubBaseBatchMessageHandler.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@ public StubBaseBatchMessageHandler(
2626
public override ushort PrefetchCount { get; set; }
2727

2828
public override string QueueName { get; set; }
29+
30+
public override TimeSpan? MessageHandlingPeriod { get; set; }
2931

3032
public override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
3133
{
32-
_caller.EmptyCall();
3334
foreach (var message in messages)
3435
{
3536
_caller.Call(message);
3637
}
38+
_caller.EmptyCall();
3739
return Task.CompletedTask;
3840
}
3941
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/Stubs/StubBatchMessageHandler.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using System.Threading;
34
using System.Threading.Tasks;
@@ -25,14 +26,16 @@ public StubBatchMessageHandler(
2526
public override ushort PrefetchCount { get; set; }
2627

2728
public override string QueueName { get; set; }
29+
30+
public override TimeSpan? MessageHandlingPeriod { get; set; }
2831

2932
public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
3033
{
31-
_caller.EmptyCall();
3234
foreach (var message in messages)
3335
{
3436
_caller.Call(message);
3537
}
38+
_caller.EmptyCall();
3639
return Task.CompletedTask;
3740
}
3841
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace RabbitMQ.Client.Core.DependencyInjection.Tests.Stubs
6+
{
7+
public class StubCallerDecorator : IStubCaller
8+
{
9+
private readonly IStubCaller Caller;
10+
11+
public StubCallerDecorator(IStubCaller caller)
12+
{
13+
Caller = caller;
14+
}
15+
16+
public EventWaitHandle WaitHandle { get; set; }
17+
18+
public void EmptyCall()
19+
{
20+
Caller.EmptyCall();
21+
WaitHandle.Set();
22+
}
23+
24+
public void Call(ReadOnlyMemory<byte> message)
25+
{
26+
Caller.Call(message);
27+
}
28+
29+
public void Call(string message)
30+
{
31+
Caller.Call(message);
32+
}
33+
34+
public Task CallAsync(string message)
35+
{
36+
return Caller.CallAsync(message);
37+
}
38+
}
39+
}

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/UnitTests/BaseBatchMessageHandlerTests.cs

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ namespace RabbitMQ.Client.Core.DependencyInjection.Tests.UnitTests
1616
{
1717
public class BaseBatchMessageHandlerTests
1818
{
19+
readonly TimeSpan _globalTestsTimeout = TimeSpan.FromSeconds(60);
20+
1921
[Theory]
2022
[InlineData(1, 10)]
2123
[InlineData(5, 47)]
@@ -42,14 +44,14 @@ public async Task ShouldProperlyHandlerMessagesByBatches(ushort prefetchCount, i
4244

4345
var callerMock = new Mock<IStubCaller>();
4446

45-
using var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, connectionFactoryMock.Object, callerMock.Object);
47+
using var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, null, connectionFactoryMock.Object, callerMock.Object);
4648
await messageHandler.StartAsync(CancellationToken.None);
4749

4850
for (var i = 0; i < numberOfMessages; i++)
4951
{
5052
await consumer.HandleBasicDeliver(
5153
"1",
52-
(ulong)numberOfMessages,
54+
(ulong)i,
5355
false,
5456
"exchange",
5557
"routing,key",
@@ -65,10 +67,74 @@ await consumer.HandleBasicDeliver(
6567

6668
await messageHandler.StopAsync(CancellationToken.None);
6769
}
70+
71+
[Theory]
72+
[InlineData(1)]
73+
[InlineData(5)]
74+
[InlineData(10)]
75+
[InlineData(16)]
76+
[InlineData(40)]
77+
[InlineData(57)]
78+
public async Task ShouldProperlyHandlerMessagesByTimer(int numberOfMessages)
79+
{
80+
const string queueName = "queue.name";
81+
const ushort prefetchCount = 10;
82+
var handlingPeriod = TimeSpan.FromMilliseconds(100);
83+
84+
var channelMock = new Mock<IModel>();
85+
var connectionMock = new Mock<IConnection>();
86+
connectionMock.Setup(x => x.CreateModel())
87+
.Returns(channelMock.Object);
88+
89+
var connectionFactoryMock = new Mock<IRabbitMqConnectionFactory>();
90+
connectionFactoryMock.Setup(x => x.CreateRabbitMqConnection(It.IsAny<RabbitMqClientOptions>()))
91+
.Returns(connectionMock.Object);
92+
93+
var consumer = new AsyncEventingBasicConsumer(channelMock.Object);
94+
connectionFactoryMock.Setup(x => x.CreateConsumer(It.IsAny<IModel>()))
95+
.Returns(consumer);
96+
97+
using var waitHandle = new AutoResetEvent(false);
98+
var callerMock = new Mock<IStubCaller>();
99+
var caller = new StubCallerDecorator(callerMock.Object)
100+
{
101+
WaitHandle = waitHandle
102+
};
103+
104+
using var messageHandler = CreateBatchMessageHandler(queueName, prefetchCount, handlingPeriod, connectionFactoryMock.Object, caller);
105+
await messageHandler.StartAsync(CancellationToken.None);
106+
107+
const int smallBatchSize = prefetchCount - 1;
108+
var numberOfSmallBatches = (int)Math.Ceiling((double)numberOfMessages / smallBatchSize);
109+
for (var b = 0; b < numberOfSmallBatches; b++)
110+
{
111+
var lowerBound = b * smallBatchSize;
112+
var upperBound = (b + 1) * smallBatchSize > numberOfMessages ? numberOfMessages : (b + 1) * smallBatchSize;
113+
for (var i = lowerBound; i < upperBound; i++)
114+
{
115+
await consumer.HandleBasicDeliver(
116+
"1",
117+
(ulong)i,
118+
false,
119+
"exchange",
120+
"routing,key",
121+
null,
122+
new ReadOnlyMemory<byte>());
123+
}
124+
125+
waitHandle.WaitOne(_globalTestsTimeout);
126+
}
127+
128+
callerMock.Verify(x => x.EmptyCall(), Times.Exactly(numberOfSmallBatches));
129+
callerMock.Verify(x => x.Call(It.IsAny<ReadOnlyMemory<byte>>()), Times.Exactly(numberOfMessages));
130+
131+
await messageHandler.StopAsync(CancellationToken.None);
132+
}
68133

69134
static BaseBatchMessageHandler CreateBatchMessageHandler(
70135
string queueName,
71136
ushort prefetchCount,
137+
TimeSpan? handlingPeriod,
72138
IRabbitMqConnectionFactory connectionFactory,
73139
IStubCaller caller)
74140
{
@@ -85,7 +151,8 @@ static BaseBatchMessageHandler CreateBatchMessageHandler(
85151
loggerMock.Object)
86152
{
87153
QueueName = queueName,
88-
PrefetchCount = prefetchCount
154+
PrefetchCount = prefetchCount,
155+
MessageHandlingPeriod = handlingPeriod
89156
};
90157
}
91158
}

0 commit comments

Comments
 (0)