Skip to content

Commit 831edb9

Browse files
committed
Update NATS and Redis storage implementations to include reset functionality and modify token handling
- Changed TTL in NATS storage from 1 hour to 24 hours. - Updated token handling in NATS storage to use 'tokens' instead of 'maxTokens'. - Added reset method in both NATS and Redis storage to set the value to maxTokens. - Enhanced rate limiter to call reset on distributed storage when necessary.
1 parent 76dc0e0 commit 831edb9

File tree

3 files changed

+57
-6
lines changed

3 files changed

+57
-6
lines changed

src/native/nats_storage.hpp

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class NatsStorage : public DistributedStorage {
108108

109109
kvConf.bucket = bucket.c_str();
110110
kvConf.history = 1;
111-
kvConf.ttl = 3600000000000; // 1 hour TTL in nanoseconds (3600 * 1e9)
111+
kvConf.ttl = 86400000000000; // 24 hours TTL in nanoseconds (86400 * 1e9)
112112

113113
s = g_natsLoader.js_CreateKeyValue(&kv, js, &kvConf);
114114
if (s != NATS_OK) {
@@ -133,7 +133,7 @@ class NatsStorage : public DistributedStorage {
133133
if (nc) g_natsLoader.natsConnection_Destroy(nc);
134134
}
135135

136-
bool tryAcquire(const std::string& key, int64_t maxTokens) override {
136+
bool tryAcquire(const std::string& key, int64_t tokens) override {
137137
if (!kv) return false; // Safety check
138138

139139
// NATS JetStream KV Store does not allow colons in key names
@@ -148,7 +148,7 @@ class NatsStorage : public DistributedStorage {
148148

149149
if (s == NATS_NOT_FOUND) {
150150
// Key doesn't exist, initialize it with maxTokens, then decrement by 1
151-
std::string value = std::to_string(maxTokens);
151+
std::string value = std::to_string(tokens);
152152
uint64_t rev;
153153
s = g_natsLoader.kvStore_CreateString(&rev, kv, fullKey.c_str(), value.c_str());
154154

@@ -157,8 +157,8 @@ class NatsStorage : public DistributedStorage {
157157
}
158158

159159
// Now decrement by 1 (acquiring 1 token)
160-
if (maxTokens > 0) {
161-
std::string newValue = std::to_string(maxTokens - 1);
160+
if (tokens > 0) {
161+
std::string newValue = std::to_string(tokens - 1);
162162
uint64_t newRev;
163163
s = g_natsLoader.kvStore_UpdateString(&newRev, kv, fullKey.c_str(), newValue.c_str(), rev);
164164
return s == NATS_OK;
@@ -245,4 +245,18 @@ class NatsStorage : public DistributedStorage {
245245
uint64_t newRev;
246246
g_natsLoader.kvStore_UpdateString(&newRev, kv, fullKey.c_str(), newValue.c_str(), revision);
247247
}
248+
249+
void reset(const std::string& key, int64_t maxTokens) override {
250+
if (!kv) return; // Safety check
251+
252+
// NATS JetStream KV Store does not allow colons in key names
253+
std::string sanitizedKey = prefix + key;
254+
std::replace(sanitizedKey.begin(), sanitizedKey.end(), ':', '_');
255+
const std::string fullKey = sanitizedKey;
256+
257+
// Set the value to maxTokens regardless of current value
258+
std::string value = std::to_string(maxTokens);
259+
uint64_t rev;
260+
g_natsLoader.kvStore_Put(&rev, kv, fullKey.c_str(), value.c_str(), value.length());
261+
}
248262
};

src/native/ratelimiter.hpp

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class DistributedStorage {
3434
virtual ~DistributedStorage() = default;
3535
virtual bool tryAcquire(const std::string& key, int64_t tokens) = 0;
3636
virtual void release(const std::string& key, int64_t tokens) = 0;
37+
virtual void reset(const std::string& key, int64_t maxTokens) = 0;
3738
};
3839

3940
class RateLimiter {
@@ -257,6 +258,17 @@ class RateLimiter {
257258
std::memory_order_acq_rel, std::memory_order_acquire)) {
258259
entry.dynamicMaxTokens.store(dynamicLimit, std::memory_order_release);
259260
entry.tokens.store(newTokens, std::memory_order_release);
261+
262+
// Sync sliding window refill with distributed storage
263+
if (distributedStorage && !entry.distributedKey.empty() && tokensToAdd > 0) {
264+
try {
265+
// Release tokens back to distributed storage (effectively adding them)
266+
distributedStorage->release(entry.distributedKey, tokensToAdd);
267+
} catch (...) {
268+
// Ignore errors - distributed storage might be temporarily unavailable
269+
}
270+
}
271+
260272
return;
261273
}
262274
} else {
@@ -265,6 +277,16 @@ class RateLimiter {
265277
std::memory_order_acq_rel, std::memory_order_acquire)) {
266278
entry.dynamicMaxTokens.store(dynamicLimit, std::memory_order_release);
267279
entry.tokens.store(dynamicLimit, std::memory_order_release);
280+
281+
// Reset distributed storage for fixed window
282+
if (distributedStorage && !entry.distributedKey.empty()) {
283+
try {
284+
distributedStorage->reset(entry.distributedKey, dynamicLimit);
285+
} catch (...) {
286+
// Ignore errors - distributed storage might be temporarily unavailable
287+
}
288+
}
289+
268290
return;
269291
}
270292
}
@@ -512,7 +534,7 @@ class RateLimiter {
512534
// If we have distributed storage and a distributed key is set, check it first
513535
if (distributedStorage && !entry->distributedKey.empty()) {
514536
try {
515-
if (!distributedStorage->tryAcquire(entry->distributedKey, entry->baseMaxTokens)) {
537+
if (!distributedStorage->tryAcquire(entry->distributedKey, entry->dynamicMaxTokens.load(std::memory_order_acquire))) {
516538
metrics.blockedRequests.fetch_add(1, std::memory_order_relaxed);
517539
return false;
518540
}

src/native/redis_storage.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,19 @@ class RedisStorage : public DistributedStorage {
8080

8181
g_redisLoader.freeReplyObject(reply);
8282
}
83+
84+
void reset(const std::string& key, int64_t maxTokens) override {
85+
std::string fullKey = prefix + key;
86+
87+
// SET key maxTokens
88+
redisReply* reply = (redisReply*)g_redisLoader.redisCommand(redis,
89+
"SET %s %lld",
90+
fullKey.c_str(), maxTokens);
91+
92+
if (!reply) {
93+
throw std::runtime_error("Redis command failed");
94+
}
95+
96+
g_redisLoader.freeReplyObject(reply);
97+
}
8398
};

0 commit comments

Comments
 (0)