Skip to content

Commit 7233595

Browse files
committed
Rewrite cargo logs processing using an MPSC channel
1 parent 620adc4 commit 7233595

File tree

2 files changed

+69
-44
lines changed

2 files changed

+69
-44
lines changed

crates/tracel-xtask/src/commands/test.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@ pub fn handle_command(args: TestCmdArgs) -> anyhow::Result<()> {
3838
}
3939

4040
fn push_optional_args(cmd_args: &mut Vec<String>, args: &TestCmdArgs) {
41+
// cargo options
4142
if let Some(jobs) = &args.jobs {
4243
cmd_args.extend(vec!["--jobs".to_string(), jobs.to_string()]);
4344
};
45+
// test harness options
46+
cmd_args.extend(vec!["--".to_string(), "--color=always".to_string()]);
4447
if let Some(threads) = &args.threads {
45-
cmd_args.extend(vec![
46-
"--".to_string(),
47-
"--test-threads".to_string(),
48-
threads.to_string(),
49-
]);
48+
cmd_args.extend(vec!["--test-threads".to_string(), threads.to_string()]);
5049
};
5150
}
5251

crates/tracel-xtask/src/utils/process.rs

Lines changed: 65 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ use std::{
33
io::{BufRead, BufReader},
44
path::Path,
55
process::{Command, Stdio},
6+
sync::mpsc,
7+
thread,
68
};
79

8-
use anyhow::anyhow;
10+
use anyhow;
911
use rand::Rng;
1012
use regex::Regex;
1113

@@ -30,15 +32,15 @@ pub fn run_process(
3032
command.envs(&envs);
3133
}
3234
let status = command.args(args).status().map_err(|e| {
33-
anyhow!(
35+
anyhow::anyhow!(
3436
"Failed to execute {} {}: {}",
3537
name,
3638
args.first().unwrap(),
3739
e
3840
)
3941
})?;
4042
if !status.success() {
41-
return Err(anyhow!("{}", error_msg));
43+
return Err(anyhow::anyhow!("{}", error_msg));
4244
}
4345
anyhow::Ok(())
4446
}
@@ -61,71 +63,95 @@ pub fn run_process_for_workspace<'a>(
6163
.iter()
6264
.for_each(|ex| args.extend(["--exclude", ex]));
6365
group_info!("Command line: cargo {}", args.join(" "));
66+
// process
6467
let mut child = Command::new(name)
6568
.args(&args)
6669
.stdout(Stdio::piped())
6770
.stderr(Stdio::piped())
6871
.spawn()
6972
.map_err(|e| {
70-
anyhow!(format!(
73+
anyhow::anyhow!(format!(
7174
"Failed to start {} {}: {}",
7275
name,
7376
args.first().unwrap(),
7477
e
7578
))
7679
})?;
7780

78-
let mut ignore_error = false;
79-
let mut close_group = false;
81+
// handle stdout and stderr in dedicated threads using a MPSC channel for synchronization
82+
let (tx, rx) = mpsc::channel();
83+
// stdout processing thread
8084
if let Some(stdout) = child.stdout.take() {
81-
let reader = BufReader::new(stdout);
82-
reader.lines().for_each(|line| {
83-
if let Ok(line) = line {
84-
println!("{}", line);
85+
let tx = tx.clone();
86+
thread::spawn(move || {
87+
let reader = BufReader::new(stdout);
88+
for line in reader.lines().map_while(Result::ok) {
89+
tx.send((line, false)).unwrap();
8590
}
8691
});
8792
}
93+
// stderr processing thread
8894
if let Some(stderr) = child.stderr.take() {
89-
let reader = BufReader::new(stderr);
90-
reader.lines().for_each(|line| {
91-
let mut skip_line = false;
92-
if let Ok(line) = line {
93-
if let Some(rx) = &group_rx {
94-
let cleaned_line = standardize_slashes(&remove_ansi_codes(&line));
95-
if let Some(caps) = rx.captures(&cleaned_line) {
96-
let crate_name = &caps[1];
97-
if close_group {
98-
endgroup!();
99-
}
100-
close_group = true;
101-
group!("{}: {}", group_name.unwrap_or("Group"), crate_name);
102-
}
103-
}
104-
if let Some(log) = ignore_log {
105-
if line.contains(log) {
106-
if let Some(msg) = ignore_msg {
107-
warn!("{}", msg);
108-
}
109-
ignore_error = true;
110-
skip_line = true;
111-
}
95+
let tx = tx.clone();
96+
thread::spawn(move || {
97+
let reader = BufReader::new(stderr);
98+
for line in reader.lines().map_while(Result::ok) {
99+
tx.send((line, true)).unwrap();
100+
}
101+
});
102+
}
103+
// Drop the sender once all the logs have been processed to close the channel
104+
drop(tx);
105+
106+
// Process the stdout to inject log groups
107+
let mut ignore_error = false;
108+
let mut close_group = false;
109+
for (line, is_stderr) in rx.iter() {
110+
let mut skip_line = false;
111+
112+
if let Some(rx) = &group_rx {
113+
let cleaned_line = standardize_slashes(&remove_ansi_codes(&line));
114+
if let Some(caps) = rx.captures(&cleaned_line) {
115+
let crate_name = &caps[1];
116+
if close_group {
117+
endgroup!();
112118
}
113-
if !skip_line {
114-
eprintln!("{}", line);
119+
close_group = true;
120+
group!("{}: {}", group_name.unwrap_or("Group"), crate_name);
121+
}
122+
}
123+
124+
if let Some(log) = ignore_log {
125+
if line.contains(log) {
126+
if let Some(msg) = ignore_msg {
127+
warn!("{}", msg);
115128
}
129+
ignore_error = true;
130+
skip_line = true;
116131
}
117-
});
132+
}
133+
134+
if !skip_line {
135+
if is_stderr {
136+
eprintln!("{}", line);
137+
} else {
138+
println!("{}", line);
139+
}
140+
}
118141
}
142+
119143
if close_group {
120144
endgroup!();
121145
}
146+
122147
let status = child
123148
.wait()
124149
.expect("Should be able to wait for the process to finish.");
150+
125151
if status.success() || ignore_error {
126152
anyhow::Ok(())
127153
} else {
128-
Err(anyhow!("{}", error_msg))
154+
Err(anyhow::anyhow!("{}", error_msg))
129155
}
130156
}
131157

@@ -152,7 +178,7 @@ pub fn run_process_for_package(
152178
.stdout(Stdio::inherit())
153179
.stderr(Stdio::inherit())
154180
.output()
155-
.map_err(|e| anyhow!("Failed to execute process for '{}': {}", name, e))?;
181+
.map_err(|e| anyhow::anyhow!("Failed to execute process for '{}': {}", name, e))?;
156182

157183
if output.status.success() {
158184
return anyhow::Ok(());
@@ -166,7 +192,7 @@ pub fn run_process_for_package(
166192
return anyhow::Ok(());
167193
}
168194
}
169-
Err(anyhow!("{}", error_msg))
195+
Err(anyhow::anyhow!("{}", error_msg))
170196
}
171197

172198
/// Return a random port between 3000 and 9999

0 commit comments

Comments
 (0)