Skip to content

Commit 46a6f1b

Browse files
committed
Rewrite cargo logs processing using an MPSC channel
1 parent bb8d49e commit 46a6f1b

File tree

2 files changed

+62
-37
lines changed

2 files changed

+62
-37
lines changed

crates/tracel-xtask/src/commands/test.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@ pub fn handle_command(args: TestCmdArgs) -> anyhow::Result<()> {
3838
}
3939

4040
fn push_optional_args(cmd_args: &mut Vec<String>, args: &TestCmdArgs) {
41+
// cargo options
4142
if let Some(jobs) = &args.jobs {
4243
cmd_args.extend(vec!["--jobs".to_string(), jobs.to_string()]);
4344
};
45+
// test harness options
46+
cmd_args.extend(vec!["--".to_string(), "--color=always".to_string()]);
4447
if let Some(threads) = &args.threads {
45-
cmd_args.extend(vec![
46-
"--".to_string(),
47-
"--test-threads".to_string(),
48-
threads.to_string(),
49-
]);
48+
cmd_args.extend(vec!["--test-threads".to_string(), threads.to_string()]);
5049
};
5150
}
5251

crates/tracel-xtask/src/utils/process.rs

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use std::{
33
io::{BufRead, BufReader},
44
path::Path,
55
process::{Command, Stdio},
6+
sync::mpsc,
7+
thread,
68
};
79

810
use anyhow::anyhow;
@@ -61,6 +63,7 @@ pub fn run_process_for_workspace<'a>(
6163
.iter()
6264
.for_each(|ex| args.extend(["--exclude", ex]));
6365
group_info!("Command line: cargo {}", args.join(" "));
66+
// process
6467
let mut child = Command::new(name)
6568
.args(&args)
6669
.stdout(Stdio::piped())
@@ -75,53 +78,76 @@ pub fn run_process_for_workspace<'a>(
7578
))
7679
})?;
7780

78-
let mut ignore_error = false;
79-
let mut close_group = false;
81+
// handle stdout and stderr in dedicated threads using a MPSC channel for synchronization
82+
let (tx, rx) = mpsc::channel();
83+
// stdout processing thread
8084
if let Some(stdout) = child.stdout.take() {
81-
let reader = BufReader::new(stdout);
82-
reader.lines().for_each(|line| {
83-
if let Ok(line) = line {
84-
println!("{}", line);
85+
let tx = tx.clone();
86+
thread::spawn(move || {
87+
let reader = BufReader::new(stdout);
88+
for line in reader.lines().flatten() {
89+
tx.send((line, false)).unwrap();
8590
}
8691
});
8792
}
93+
// stderr processing thread
8894
if let Some(stderr) = child.stderr.take() {
89-
let reader = BufReader::new(stderr);
90-
reader.lines().for_each(|line| {
91-
let mut skip_line = false;
92-
if let Ok(line) = line {
93-
if let Some(rx) = &group_rx {
94-
let cleaned_line = standardize_slashes(&remove_ansi_codes(&line));
95-
if let Some(caps) = rx.captures(&cleaned_line) {
96-
let crate_name = &caps[1];
97-
if close_group {
98-
endgroup!();
99-
}
100-
close_group = true;
101-
group!("{}: {}", group_name.unwrap_or("Group"), crate_name);
102-
}
103-
}
104-
if let Some(log) = ignore_log {
105-
if line.contains(log) {
106-
if let Some(msg) = ignore_msg {
107-
warn!("{}", msg);
108-
}
109-
ignore_error = true;
110-
skip_line = true;
111-
}
95+
let tx = tx.clone();
96+
thread::spawn(move || {
97+
let reader = BufReader::new(stderr);
98+
for line in reader.lines().flatten() {
99+
tx.send((line, true)).unwrap();
100+
}
101+
});
102+
}
103+
// Drop the sender once all the logs have been processed to close the channel
104+
drop(tx);
105+
106+
// Process the stdout to inject log groups
107+
let mut ignore_error = false;
108+
let mut close_group = false;
109+
for (line, is_stderr) in rx.iter() {
110+
let mut skip_line = false;
111+
112+
if let Some(rx) = &group_rx {
113+
let cleaned_line = standardize_slashes(&remove_ansi_codes(&line));
114+
if let Some(caps) = rx.captures(&cleaned_line) {
115+
let crate_name = &caps[1];
116+
if close_group {
117+
endgroup!();
112118
}
113-
if !skip_line {
114-
eprintln!("{}", line);
119+
close_group = true;
120+
group!("{}: {}", group_name.unwrap_or("Group"), crate_name);
121+
}
122+
}
123+
124+
if let Some(log) = ignore_log {
125+
if line.contains(log) {
126+
if let Some(msg) = ignore_msg {
127+
warn!("{}", msg);
115128
}
129+
ignore_error = true;
130+
skip_line = true;
116131
}
117-
});
132+
}
133+
134+
if !skip_line {
135+
if is_stderr {
136+
eprintln!("{}", line);
137+
} else {
138+
println!("{}", line);
139+
}
140+
}
118141
}
142+
119143
if close_group {
120144
endgroup!();
121145
}
146+
122147
let status = child
123148
.wait()
124149
.expect("Should be able to wait for the process to finish.");
150+
125151
if status.success() || ignore_error {
126152
anyhow::Ok(())
127153
} else {

0 commit comments

Comments
 (0)