Skip to content

Commit 4ca7b62

Browse files
committed
Cancel runs when the manifest generation times out, and show a message when a run is cancelled
1 parent df991d0 commit 4ca7b62

File tree

7 files changed

+64
-71
lines changed

7 files changed

+64
-71
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ rand_chacha = "0.3.1"
6565

6666
blake3 = "=1.4.0"
6767

68-
tracing = { version = "0.1.37", features = ["release_max_level_info"] }
68+
tracing = { version = "0.1.37", features = ["release_max_level_debug"] }
6969
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] }
7070
tracing-appender = "0.2.2"
7171

_typos.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[default.extend-words]
2+
acception = "acception"

crates/abq_cli/src/workers.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -178,26 +178,28 @@ async fn do_shutdown(
178178

179179
let (suite_result, errors) = finalized_reporters.finish(&completed_summary);
180180

181-
for error in errors {
182-
eprintln!("{error}");
183-
}
181+
if let WorkersExitStatus::Completed { .. } = status {
182+
for error in errors {
183+
eprintln!("{error}");
184+
}
184185

185-
print!("\n\n");
186-
suite_result
187-
.write_short_summary_lines(&mut stdout, ShortSummaryGrouping::Runner)
188-
.unwrap();
189-
println!("\n");
190-
if execution_mode == ExecutionMode::WriteNormal {
191-
println!("Run the following command to replay these tests locally:");
192-
println!("\n");
193-
println!(
194-
"\tabq test --run-id {} --worker {} --num {} -- <your-test-command>",
195-
run_id,
196-
worker_tag.index(),
197-
num_runners,
198-
);
186+
print!("\n\n");
187+
suite_result
188+
.write_short_summary_lines(&mut stdout, ShortSummaryGrouping::Runner)
189+
.unwrap();
199190
println!("\n");
200-
println!("Specify your Access Token with the RWX_ACCESS_TOKEN env variable, passing --access-token, or running `abq login`.");
191+
if execution_mode == ExecutionMode::WriteNormal {
192+
println!("Run the following command to replay these tests locally:");
193+
println!("\n");
194+
println!(
195+
"\tabq test --run-id {} --worker {} --num {} -- <your-test-command>",
196+
run_id,
197+
worker_tag.index(),
198+
num_runners,
199+
);
200+
println!("\n");
201+
println!("Specify your Access Token with the RWX_ACCESS_TOKEN env variable, passing --access-token, or running `abq login`.");
202+
}
201203
}
202204

203205
// If the workers didn't fault, exit with whatever status the test suite run is at; otherwise,

crates/abq_queue/src/queue.rs

Lines changed: 7 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,12 @@ enum RunState {
136136
#[derive(Debug)]
137137
enum ManifestPersistence {
138138
Persisted(ManifestPersistedCell),
139-
ManifestNeverReceived,
140139
EmptyManifest,
141140
}
142141

143142
#[derive(Debug)]
144143
enum ResultsPersistence {
145144
Persisted(ResultsPersistedCell),
146-
ManifestNeverReceived,
147145
}
148146

149147
const MAX_BATCH_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(100) };
@@ -283,8 +281,6 @@ enum WriteResultsError {
283281
RunNotFound,
284282
#[error("attempting to write results before manifest received")]
285283
WaitingForManifest,
286-
#[error("attempting to write results when manifest failed to be generated")]
287-
ManifestNeverReceived,
288284
#[error("attempting to write results for cancelled run")]
289285
RunCancelled,
290286
}
@@ -295,8 +291,6 @@ enum ReadResultsError {
295291
RunNotFound,
296292
#[error("results cannot be read before manifest is received")]
297293
WaitingForManifest,
298-
#[error("a manifest failed to be generated")]
299-
ManifestNeverReceived,
300294
#[error("the run was cancelled before all test results were received")]
301295
RunCancelled,
302296
}
@@ -504,9 +498,7 @@ impl AllRuns {
504498
AssignedRunStatus::AlreadyDone { exit_code }
505499
}
506500
}
507-
RunState::Cancelled { .. } => AssignedRunStatus::AlreadyDone {
508-
exit_code: ExitCode::CANCELLED,
509-
},
501+
RunState::Cancelled { reason } => AssignedRunStatus::Cancelled { reason: *reason },
510502
}
511503
}
512504

@@ -938,7 +930,6 @@ impl AllRuns {
938930
ResultsPersistence::Persisted(cell) => {
939931
Ok((cell.clone(), EligibleForRemoteDump::Yes))
940932
}
941-
ResultsPersistence::ManifestNeverReceived => Err(ManifestNeverReceived),
942933
},
943934
RunState::Cancelled { .. } => Err(RunCancelled),
944935
}
@@ -990,7 +981,6 @@ impl AllRuns {
990981
Ok(ReadResultsState::RunInProgress { active_runners })
991982
}
992983
}
993-
ResultsPersistence::ManifestNeverReceived => Err(ManifestNeverReceived),
994984
}
995985
}
996986
RunState::Cancelled { .. } => Err(RunCancelled),
@@ -1050,9 +1040,6 @@ impl AllRuns {
10501040
RetryManifestState::NotYetPersisted
10511041
}
10521042
}
1053-
ManifestPersistence::ManifestNeverReceived => {
1054-
RetryManifestState::Error(RetryManifestError::ManifestNeverReceived)
1055-
}
10561043
ManifestPersistence::EmptyManifest => {
10571044
// Ship the empty manifest over.
10581045
RetryManifestState::Manifest(NextWorkBundle {
@@ -1214,37 +1201,22 @@ impl AllRuns {
12141201

12151202
let mut run = runs.get(&run_id).expect("no run recorded").write();
12161203

1217-
let test_command_hash = match run.state {
1218-
RunState::WaitingForManifest {
1219-
test_command_hash, ..
1220-
} => {
1221-
// okay
1222-
test_command_hash
1204+
match run.state {
1205+
RunState::WaitingForManifest { .. } => {
1206+
run.state = RunState::Cancelled { reason: CancelReason::ManifestNeverReceived };
1207+
// NB: Always sub last for conversative estimation.
1208+
self.num_active.fetch_sub(1, atomic::ORDERING);
12231209
}
12241210
RunState::Cancelled { .. } => {
12251211
// No-op, since the run was already cancelled.
1226-
return;
12271212
}
12281213
RunState::HasWork { .. } | RunState::InitialManifestDone { .. } => {
12291214
illegal_state!(
12301215
"attempting to mark failed to receive manifest after manifest was received",
12311216
?run_id
12321217
);
1233-
return;
12341218
}
12351219
};
1236-
1237-
run.state = RunState::InitialManifestDone {
1238-
new_worker_exit_code: ExitCode::FAILURE,
1239-
init_metadata: Default::default(),
1240-
seen_workers: Default::default(),
1241-
manifest_persistence: ManifestPersistence::ManifestNeverReceived,
1242-
results_persistence: ResultsPersistence::ManifestNeverReceived,
1243-
test_command_hash: Some(test_command_hash),
1244-
};
1245-
1246-
// NB: Always sub last for conversative estimation.
1247-
self.num_active.fetch_sub(1, atomic::ORDERING);
12481220
}
12491221

12501222
/// Marks a run as complete because it had the trivial manifest.
@@ -2221,7 +2193,7 @@ impl QueueServer {
22212193
mut stream: Box<dyn net_async::ServerStream>,
22222194
) -> OpaqueResult<()> {
22232195
// If a worker failed to generate a manifest, or the manifest is empty,
2224-
// we're going to immediately end the test run.
2196+
// we're going to immediately cancel the test run.
22252197
//
22262198
// In the former case this indicates a failure in the underlying test runners,
22272199
// and in the latter case we have nothing to do.
@@ -4978,21 +4950,6 @@ mod persist_results {
49784950
Ok(ReadResultsState::RunInProgress { active_runners }) if active_runners == &[Tag::runner(2, 1)]
49794951
}
49804952

4981-
get_read_results_cell! {
4982-
get_read_results_cell_when_done_with_manifest_never_received,
4983-
{
4984-
RunState::InitialManifestDone {
4985-
new_worker_exit_code: ExitCode::SUCCESS,
4986-
init_metadata: Default::default(),
4987-
seen_workers: Default::default(),
4988-
results_persistence: ResultsPersistence::ManifestNeverReceived,
4989-
manifest_persistence: ManifestPersistence::EmptyManifest,
4990-
test_command_hash: Some(TestCommandHash::random()),
4991-
}
4992-
},
4993-
Err(ReadResultsError::ManifestNeverReceived)
4994-
}
4995-
49964953
get_read_results_cell! {
49974954
get_read_results_cell_when_cancelled,
49984955
{

crates/abq_utils/src/net_protocol.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,8 @@ pub mod queue {
618618
User,
619619
/// Timed out because no progress was made popping items off the manifest.
620620
ManifestHadNoProgress,
621+
/// Timed out because the manifest was never received
622+
ManifestNeverReceived,
621623
}
622624

623625
/// A request sent to the queue.

crates/abq_workers/src/assigned_run.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use abq_utils::net_protocol::{entity::Entity, queue::InvokeWork};
1+
use abq_utils::net_protocol::{entity::Entity, queue::{CancelReason, InvokeWork}};
22
use async_trait::async_trait;
33
use serde_derive::{Deserialize, Serialize};
44

@@ -27,6 +27,9 @@ pub enum AssignedRunStatus {
2727
AlreadyDone {
2828
exit_code: abq_utils::exit::ExitCode,
2929
},
30+
Cancelled {
31+
reason: CancelReason,
32+
},
3033
FatalError(String),
3134
}
3235

crates/abq_workers/src/negotiate.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use abq_utils::{
3232
entity::{Entity, WorkerTag},
3333
meta::DeprecationRecord,
3434
publicize_addr,
35-
queue::{InvokeWork, NegotiatorInfo},
35+
queue::{CancelReason, InvokeWork, NegotiatorInfo},
3636
workers::{RunId, RunnerKind},
3737
},
3838
results_handler::SharedResultsHandler,
@@ -81,6 +81,9 @@ enum MessageFromQueueNegotiator {
8181
RunAlreadyCompleted {
8282
exit_code: ExitCode,
8383
},
84+
RunCancelled {
85+
reason: CancelReason,
86+
},
8487
/// The context a worker set should execute a run with.
8588
ExecutionContext(ExecutionContext),
8689
RunUnknown,
@@ -141,6 +144,8 @@ pub struct WorkersNegotiator(Box<dyn net::ClientStream>, WorkerContext);
141144
pub enum NegotiatedWorkers {
142145
/// No more workers were created, because there is no more work to be done.
143146
Redundant { exit_code: ExitCode },
147+
/// No more workers were created because the run is cancelled
148+
Cancelled { error: String },
144149
/// A pool of workers were created.
145150
Pool(WorkerPool),
146151
}
@@ -155,13 +160,21 @@ impl NegotiatedWorkers {
155160
process_outputs: Default::default(),
156161
native_runner_info: None,
157162
},
163+
NegotiatedWorkers::Cancelled { error } => WorkersExit {
164+
status: WorkersExitStatus::Error { errors: vec![error.to_string()] },
165+
manifest_generation_output: None,
166+
final_stdio_outputs: Default::default(),
167+
process_outputs: Default::default(),
168+
native_runner_info: None,
169+
},
158170
NegotiatedWorkers::Pool(pool) => pool.shutdown().await,
159171
}
160172
}
161173

162174
pub async fn cancel(&mut self) {
163175
match self {
164176
NegotiatedWorkers::Redundant { .. } => {}
177+
NegotiatedWorkers::Cancelled { .. } => {}
165178
NegotiatedWorkers::Pool(pool) => pool.cancel().await,
166179
}
167180
}
@@ -170,13 +183,15 @@ impl NegotiatedWorkers {
170183
pub async fn wait(&mut self) {
171184
match self {
172185
NegotiatedWorkers::Redundant { .. } => {}
186+
NegotiatedWorkers::Cancelled { .. } => {}
173187
NegotiatedWorkers::Pool(pool) => pool.wait().await,
174188
}
175189
}
176190

177191
pub fn workers_alive(&self) -> bool {
178192
match self {
179193
NegotiatedWorkers::Redundant { .. } => false,
194+
NegotiatedWorkers::Cancelled { .. } => false,
180195
NegotiatedWorkers::Pool(pool) => pool.workers_alive(),
181196
}
182197
}
@@ -325,6 +340,14 @@ async fn wait_for_execution_context(
325340

326341
let worker_set_decision = match net_protocol::async_read(&mut conn).await? {
327342
MessageFromQueueNegotiator::ExecutionContext(ctx) => Ok(ctx),
343+
MessageFromQueueNegotiator::RunCancelled { reason } => {
344+
let error_suffix = match reason {
345+
CancelReason::User => "a worker received a cancellation signal while still working on tests.",
346+
CancelReason::ManifestHadNoProgress => "the run timed out before any tests were completed.",
347+
CancelReason::ManifestNeverReceived => "the run timed out before the test manifest was received."
348+
};
349+
Err(NegotiatedWorkers::Cancelled { error: format!("{}{}", "Error: This ABQ run was cancelled. When an ABQ run is cancelled, it can no longer be retried. You must start a run with a new run ID instead.\nThis run was cancelled because ", error_suffix) })
350+
}
328351
MessageFromQueueNegotiator::RunAlreadyCompleted { exit_code } => {
329352
Err(NegotiatedWorkers::Redundant { exit_code })
330353
}
@@ -588,6 +611,10 @@ impl QueueNegotiator {
588611
tracing::debug!(?run_id, "run already completed");
589612
MessageFromQueueNegotiator::RunAlreadyCompleted { exit_code }
590613
}
614+
Cancelled { reason } => {
615+
tracing::debug!(?run_id, "run cancelled");
616+
MessageFromQueueNegotiator::RunCancelled { reason }
617+
}
591618
RunUnknown => {
592619
tracing::debug!(?run_id, "run not yet known");
593620
MessageFromQueueNegotiator::RunUnknown

0 commit comments

Comments
 (0)