Skip to content

Commit 64992a9

Browse files
authored
feat: add early shutdown when no requests received for X milliseconds (#582)
* feat: add early shutdown when no requests received for X seconds * chore: add an integration test * stamp: clippy
1 parent d18cc32 commit 64992a9

File tree

4 files changed

+82
-3
lines changed

4 files changed

+82
-3
lines changed

crates/base/src/worker/supervisor/strategy_per_worker.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::atomic::AtomicUsize;
33
use std::sync::atomic::Ordering;
44
use std::sync::Arc;
55
use std::time::Duration;
6+
use std::time::SystemTime;
67

78
use base_rt::RuntimeState;
89
use deno_core::unsync::sync::AtomicFlag;
@@ -37,6 +38,8 @@ use super::V8HandleTerminationData;
3738

3839
#[derive(Debug, Default)]
3940
struct State {
41+
req_absent_duration: Option<Duration>,
42+
4043
is_worker_entered: bool,
4144
is_wall_clock_limit_disabled: bool,
4245
is_wall_clock_beforeunload_armed: bool,
@@ -49,6 +52,7 @@ struct State {
4952
wall_clock_alerts: usize,
5053

5154
req_ack_count: usize,
55+
last_req_ack: Option<SystemTime>,
5256
req_demand: Arc<AtomicUsize>,
5357

5458
runtime: Arc<RuntimeState>,
@@ -80,6 +84,7 @@ impl State {
8084

8185
fn req_acknowledged(&mut self) {
8286
self.req_ack_count += 1;
87+
self.last_req_ack = Some(SystemTime::now());
8388
self.update_runtime_state();
8489
}
8590

@@ -92,6 +97,14 @@ impl State {
9297
|| self.is_cpu_time_soft_limit_reached
9398
|| self.is_mem_half_reached
9499
|| self.wall_clock_alerts == 2
100+
|| matches!(
101+
self
102+
.last_req_ack
103+
.as_ref()
104+
.zip(self.req_absent_duration)
105+
.and_then(|(t, d)| t.checked_add(d)),
106+
Some(t) if t < SystemTime::now()
107+
)
95108
}
96109

97110
fn have_all_reqs_been_acknowledged(&self) -> bool {
@@ -143,6 +156,16 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
143156

144157
let mut complete_reason = None::<ShutdownReason>;
145158
let mut state = State {
159+
req_absent_duration: runtime_opts
160+
.context
161+
.as_ref()
162+
.and_then(|it| it.get("supervisor"))
163+
.and_then(|it| {
164+
it.get("requestAbsentTimeoutMs")
165+
.and_then(|it| it.as_u64())
166+
.map(Duration::from_millis)
167+
}),
168+
146169
is_wall_clock_limit_disabled: worker_timeout_ms == 0,
147170
is_cpu_time_limit_disabled: cpu_time_soft_limit_ms == 0
148171
&& cpu_time_hard_limit_ms == 0,

crates/base/test_cases/main/index.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
console.log("main function started");
22

3-
function parseIntFromHeadersOrDefault(req: Request, key: string, val: number) {
3+
function parseIntFromHeadersOrDefault(req: Request, key: string, val?: number) {
44
const headerValue = req.headers.get(key);
55
if (!headerValue) {
66
return val;
@@ -62,7 +62,14 @@ Deno.serve((req: Request) => {
6262
const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]);
6363
const context = {
6464
sourceMap: req.headers.get("x-context-source-map") == "true",
65-
useReadSyncFileAPI: req.headers.get("x-use-read-sync-file-api") == "true",
65+
useReadSyncFileAPI:
66+
req.headers.get("x-context-use-read-sync-file-api") == "true",
67+
supervisor: {
68+
requestAbsentTimeoutMs: parseIntFromHeadersOrDefault(
69+
req,
70+
"x-context-supervisor-request-absent-timeout-ms",
71+
),
72+
},
6673
};
6774

6875
return await EdgeRuntime.userWorkers.create({

crates/base/tests/integration_tests.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2522,7 +2522,10 @@ async fn test_issue_func_205() {
25222522
b.uri("/issue-func-205")
25232523
.header("x-cpu-time-soft-limit-ms", HeaderValue::from_static("500"))
25242524
.header("x-cpu-time-hard-limit-ms", HeaderValue::from_static("1000"))
2525-
.header("x-use-read-sync-file-api", HeaderValue::from_static("true"))
2525+
.header(
2526+
"x-context-use-read-sync-file-api",
2527+
HeaderValue::from_static("true"),
2528+
)
25262529
.body(Body::empty())
25272530
.context("can't make request")
25282531
})
@@ -3865,6 +3868,49 @@ async fn test_eszip_wasm_import() {
38653868
);
38663869
}
38673870

3871+
#[tokio::test]
3872+
#[serial]
3873+
async fn test_request_absent_timeout() {
3874+
let (tx, mut rx) = mpsc::unbounded_channel();
3875+
let tb = TestBedBuilder::new("./test_cases/main")
3876+
.with_per_worker_policy(None)
3877+
.with_worker_event_sender(Some(tx))
3878+
.build()
3879+
.await;
3880+
3881+
let resp = tb
3882+
.request(|b| {
3883+
b.uri("/sleep-5000ms")
3884+
.header("x-worker-timeout-ms", HeaderValue::from_static("3600000"))
3885+
.header(
3886+
"x-context-supervisor-request-absent-timeout-ms",
3887+
HeaderValue::from_static("1000"),
3888+
)
3889+
.body(Body::empty())
3890+
.context("can't make request")
3891+
})
3892+
.await
3893+
.unwrap();
3894+
3895+
assert_eq!(resp.status().as_u16(), StatusCode::OK);
3896+
3897+
sleep(Duration::from_secs(3)).await;
3898+
rx.close();
3899+
tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await;
3900+
3901+
while let Some(ev) = rx.recv().await {
3902+
let WorkerEvents::Shutdown(ev) = ev.event else {
3903+
continue;
3904+
};
3905+
if ev.reason != ShutdownReason::EarlyDrop {
3906+
break;
3907+
}
3908+
return;
3909+
}
3910+
3911+
unreachable!("test failed");
3912+
}
3913+
38683914
#[derive(Deserialize)]
38693915
struct ErrorResponsePayload {
38703916
msg: String,

types/global.d.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ interface UserWorkerCreateContext {
4949
shouldBootstrapMockFnThrowError?: boolean | null;
5050
suppressEszipMigrationWarning?: boolean | null;
5151
useReadSyncFileAPI?: boolean | null;
52+
supervisor?: {
53+
requestAbsentTimeoutMs?: number | null;
54+
};
5255
}
5356

5457
interface UserWorkerCreateOptions {

0 commit comments

Comments
 (0)