Skip to content

Commit 611a31e

Browse files
committed
refactor: refine InMemoryStream
1 parent 27c8cbf commit 611a31e

File tree

3 files changed

+120
-399
lines changed

3 files changed

+120
-399
lines changed

src/WeihanLi.Common/Helpers/InMemoryStream.cs

Lines changed: 102 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,95 +2,138 @@
22
// Licensed under the Apache license.
33

44
using System.Collections.Concurrent;
5-
using System.Runtime.CompilerServices;
6-
using WeihanLi.Common.Abstractions;
5+
using WeihanLi.Common.Models;
6+
using WeihanLi.Extensions;
77

88
namespace WeihanLi.Common.Helpers;
99

10-
public interface IStream
10+
public class StreamMessage<T>
1111
{
12-
Task AddMessageAsync(string streamName, StreamMessage message);
13-
Task AcknowledgeMessageAsync(string streamName, string messageId);
14-
Task<int> CountAsync(string streamName, string? start = null, string? end = null);
15-
Task TrimAsync(string streamName, int maxSize);
16-
Task TrimAsync(string streamName, string maxId);
17-
IAsyncEnumerable<StreamMessage> ReadMessagesAsync(string streamName, int count, string? start = null, string? end = null, CancellationToken cancellationToken = default);
12+
public required T Id { get; init; }
13+
public DateTimeOffset Timestamp { get; set; }
14+
public Dictionary<string, string> Fields { get => field; set => field = Guard.NotNull(value); } = [];
15+
public Dictionary<string, object> Properties { get => field; set => field = Guard.NotNull(value); } = [];
1816
}
1917

20-
public class InMemoryStream : IStream
18+
public class StreamInfo<T>
2119
{
22-
private readonly ConcurrentDictionary<string, ConcurrentQueue<StreamMessage>> _streams = new();
20+
public required T MinId { get; set; }
21+
public DateTimeOffset MinTimestamp { get; set; }
22+
public required T MaxId { get; set; }
23+
public DateTimeOffset MaxTimestamp { get; set; }
24+
public int Count { get; set; }
25+
}
2326

24-
public Task AddMessageAsync(string streamName, StreamMessage message)
25-
{
26-
var stream = _streams.GetOrAdd(streamName, _ => new ConcurrentQueue<StreamMessage>());
27-
stream.Enqueue(message);
28-
return Task.CompletedTask;
29-
}
27+
public class StreamGroupInfo<T>
28+
{
29+
public required string GroupName { get; set; }
30+
public required T Offset { get; set; }
31+
}
32+
33+
public interface IStream<T>
34+
{
35+
string StreamName { get; }
36+
37+
Task AddAsync(T id, Dictionary<string, string> fields, DateTimeOffset? timestamp = null, Dictionary<string, object>? properties = null, CancellationToken cancellationToken = default);
38+
IAsyncEnumerable<StreamMessage<T>> FetchAsync(T lastId, int count, Ordering order = default, CancellationToken cancellationToken = default);
3039

31-
public Task AcknowledgeMessageAsync(string streamName, string messageId) => Task.CompletedTask;
40+
Task<int> CountAsync(T? min = default, T? max = default, RangeInclusion inclusion = default, CancellationToken cancellationToken = default);
3241

33-
public Task<int> CountAsync(string streamName, string? start = null, string? end = null)
42+
Task<StreamInfo<T>> InfoAsync(CancellationToken cancellationToken = default);
43+
Task<IReadOnlyCollection<StreamGroupInfo<T>>> GroupsAsync(CancellationToken cancellationToken = default);
44+
Task<StreamGroupInfo<T>?> GroupInfoAsync(string groupName, CancellationToken cancellationToken = default);
45+
Task AddGroupAsync(string groupName, T offset, CancellationToken cancellationToken = default);
46+
Task<bool> RemoveGroupAsync(string groupName, CancellationToken cancellationToken = default);
47+
Task AckAsync(string groupName, T id, CancellationToken cancellationToken = default);
48+
}
49+
50+
public sealed class InMemoryStream<T>(string name, IComparer<T>? comparer = null) : IStream<T>
51+
{
52+
private readonly PriorityQueue<StreamMessage<T>, T> _messages = new(comparer);
53+
private readonly ConcurrentDictionary<string, StreamGroupInfo<T>> _groups = new();
54+
55+
public string StreamName => name;
56+
57+
public Task AckAsync(string groupName, T id, CancellationToken cancellationToken = default)
3458
{
35-
if (_streams.TryGetValue(streamName, out var stream))
59+
if (!_groups.TryGetValue(groupName, out var groupInfo))
3660
{
37-
return Task.FromResult(stream.Count(message =>
38-
(start == null || string.Compare(message.Id, start) >= 0) &&
39-
(end == null || string.Compare(message.Id, end) <= 0)));
61+
throw new InvalidOperationException($"Group [{groupName}] not exists");
4062
}
4163

42-
return Task.FromResult(0);
64+
groupInfo.Offset = id;
65+
return Task.CompletedTask;
4366
}
4467

45-
public Task TrimAsync(string streamName, int maxSize)
68+
public Task AddAsync(T id, Dictionary<string, string> fields, DateTimeOffset? timestamp = null, Dictionary<string, object>? properties = null, CancellationToken cancellationToken = default)
4669
{
47-
if (_streams.TryGetValue(streamName, out var stream))
70+
var message = new StreamMessage<T>
4871
{
49-
while (stream.Count > maxSize && stream.TryDequeue(out _))
50-
{
51-
// Remove messages until the stream size is within the limit
52-
}
53-
}
72+
Id = id,
73+
Fields = fields,
74+
Timestamp = timestamp ?? DateTimeOffset.Now,
75+
Properties = properties ?? new()
76+
};
77+
_messages.Enqueue(message, id);
78+
5479
return Task.CompletedTask;
5580
}
5681

57-
public Task TrimAsync(string streamName, string maxId)
82+
public Task AddGroupAsync(string groupName, T offset, CancellationToken cancellationToken = default)
5883
{
59-
if (_streams.TryGetValue(streamName, out var stream))
84+
if (_groups.ContainsKey(groupName))
6085
{
61-
var messages = stream.ToArray();
62-
stream.Clear();
63-
foreach (var message in messages)
64-
{
65-
if (string.Compare(message.Id, maxId) > 0)
66-
{
67-
stream.Enqueue(message);
68-
}
69-
}
86+
throw new InvalidOperationException($"Group [{groupName}] not exists");
7087
}
88+
89+
_groups[groupName] = new StreamGroupInfo<T>()
90+
{
91+
GroupName = groupName,
92+
Offset = offset
93+
};
7194
return Task.CompletedTask;
7295
}
7396

74-
public async IAsyncEnumerable<StreamMessage> ReadMessagesAsync(string streamName, int count, string? start = null, string? end = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
97+
public Task<int> CountAsync(T? min = default, T? max = default, RangeInclusion inclusion = default, CancellationToken cancellationToken = default)
7598
{
76-
while (!cancellationToken.IsCancellationRequested)
99+
// TODO: support min/max filter
100+
return _messages.Count.WrapTask();
101+
}
102+
103+
public IAsyncEnumerable<StreamMessage<T>> FetchAsync(T lastId, int count, Ordering order = Ordering.Ascending, CancellationToken cancellationToken = default)
104+
{
105+
throw new NotImplementedException();
106+
}
107+
108+
public Task<StreamGroupInfo<T>?> GroupInfoAsync(string groupName, CancellationToken cancellationToken = default)
109+
{
110+
if (_groups.TryGetValue(groupName, out var groupInfo))
77111
{
78-
if (_streams.TryGetValue(streamName, out var stream))
79-
{
80-
while (stream.TryDequeue(out var message))
81-
{
82-
yield return message;
83-
}
84-
}
85-
await Task.Delay(100, cancellationToken);
112+
return Task.FromResult<StreamGroupInfo<T>?>(groupInfo);
86113
}
114+
115+
return Task.FromResult<StreamGroupInfo<T>?>(null);
87116
}
88-
}
89117

90-
public class StreamMessage : IProperties
91-
{
92-
public required string Id { get; set; }
93-
public string Data { get; set; }
94-
public DateTimeOffset Timestamp { get; set; }
95-
public IDictionary<string, object?> Properties { get; set; } = new Dictionary<string, object?>();
118+
public Task<IReadOnlyCollection<StreamGroupInfo<T>>> GroupsAsync(CancellationToken cancellationToken = default)
119+
{
120+
return _groups.Values.AsReadOnly().WrapTask();
121+
}
122+
123+
public Task<StreamInfo<T>> InfoAsync(CancellationToken cancellationToken = default)
124+
{
125+
var streamInfo = new StreamInfo<T>
126+
{
127+
// TODO: update min/max from messages
128+
MinId = default,
129+
MaxId = default,
130+
Count = _messages.Count
131+
};
132+
return streamInfo.WrapTask();
133+
}
134+
135+
public Task<bool> RemoveGroupAsync(string groupName, CancellationToken cancellationToken = default)
136+
{
137+
return _groups.TryRemove(groupName, out _).WrapTask();
138+
}
96139
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) Weihan Li. All rights reserved.
2+
// Licensed under the Apache license.
3+
4+
namespace WeihanLi.Common.Models;
5+
6+
public enum Ordering
7+
{
8+
Ascending = 0,
9+
Descending = 1
10+
}
11+
12+
[Flags]
13+
public enum RangeInclusion
14+
{
15+
None = 0,
16+
IncludeLowerBound = 1,
17+
IncludeUpperBound = 2
18+
}

0 commit comments

Comments
 (0)