Skip to content

Commit ebd6b25

Browse files
committed
Rewrite cargo logs processing using an MPSC channel
1 parent bb8d49e commit ebd6b25

File tree

2 files changed

+63
-33
lines changed

2 files changed

+63
-33
lines changed

crates/tracel-xtask/src/commands/test.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@ pub fn handle_command(args: TestCmdArgs) -> anyhow::Result<()> {
3838
}
3939

4040
fn push_optional_args(cmd_args: &mut Vec<String>, args: &TestCmdArgs) {
41+
// cargo options
4142
if let Some(jobs) = &args.jobs {
4243
cmd_args.extend(vec!["--jobs".to_string(), jobs.to_string()]);
4344
};
45+
// test harness options
46+
cmd_args.extend(vec!["--".to_string(), "--color=always".to_string()]);
4447
if let Some(threads) = &args.threads {
4548
cmd_args.extend(vec![
46-
"--".to_string(),
4749
"--test-threads".to_string(),
4850
threads.to_string(),
4951
]);

crates/tracel-xtask/src/utils/process.rs

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{
22
collections::HashMap,
33
io::{BufRead, BufReader},
44
path::Path,
5-
process::{Command, Stdio},
5+
process::{Command, Stdio}, sync::mpsc, thread,
66
};
77

88
use anyhow::anyhow;
@@ -61,6 +61,7 @@ pub fn run_process_for_workspace<'a>(
6161
.iter()
6262
.for_each(|ex| args.extend(["--exclude", ex]));
6363
group_info!("Command line: cargo {}", args.join(" "));
64+
// process
6465
let mut child = Command::new(name)
6566
.args(&args)
6667
.stdout(Stdio::piped())
@@ -75,53 +76,80 @@ pub fn run_process_for_workspace<'a>(
7576
))
7677
})?;
7778

78-
let mut ignore_error = false;
79-
let mut close_group = false;
79+
// hanlde stdout and stderr in dedicated threads using a MPSC channel for synchronization
80+
let (tx, rx) = mpsc::channel();
81+
// stdout processing thread
8082
if let Some(stdout) = child.stdout.take() {
81-
let reader = BufReader::new(stdout);
82-
reader.lines().for_each(|line| {
83-
if let Ok(line) = line {
84-
println!("{}", line);
83+
let tx = tx.clone();
84+
thread::spawn(move || {
85+
let reader = BufReader::new(stdout);
86+
for line in reader.lines() {
87+
if let Ok(line) = line {
88+
tx.send((line, false)).unwrap();
89+
}
8590
}
8691
});
8792
}
93+
// stderr processing thread
8894
if let Some(stderr) = child.stderr.take() {
89-
let reader = BufReader::new(stderr);
90-
reader.lines().for_each(|line| {
91-
let mut skip_line = false;
92-
if let Ok(line) = line {
93-
if let Some(rx) = &group_rx {
94-
let cleaned_line = standardize_slashes(&remove_ansi_codes(&line));
95-
if let Some(caps) = rx.captures(&cleaned_line) {
96-
let crate_name = &caps[1];
97-
if close_group {
98-
endgroup!();
99-
}
100-
close_group = true;
101-
group!("{}: {}", group_name.unwrap_or("Group"), crate_name);
102-
}
95+
let tx = tx.clone();
96+
thread::spawn(move || {
97+
let reader = BufReader::new(stderr);
98+
for line in reader.lines() {
99+
if let Ok(line) = line {
100+
tx.send((line, true)).unwrap();
103101
}
104-
if let Some(log) = ignore_log {
105-
if line.contains(log) {
106-
if let Some(msg) = ignore_msg {
107-
warn!("{}", msg);
108-
}
109-
ignore_error = true;
110-
skip_line = true;
111-
}
102+
}
103+
});
104+
}
105+
// Drop the sender once all the logs have been processed to close the channel
106+
drop(tx);
107+
108+
// Process the stdout to inject log groups
109+
let mut ignore_error = false;
110+
let mut close_group = false;
111+
for (line, is_stderr) in rx.iter() {
112+
let mut skip_line = false;
113+
114+
if let Some(rx) = &group_rx {
115+
let cleaned_line = standardize_slashes(&remove_ansi_codes(&line));
116+
if let Some(caps) = rx.captures(&cleaned_line) {
117+
let crate_name = &caps[1];
118+
if close_group {
119+
endgroup!();
112120
}
113-
if !skip_line {
114-
eprintln!("{}", line);
121+
close_group = true;
122+
group!("{}: {}", group_name.unwrap_or("Group"), crate_name);
123+
}
124+
}
125+
126+
if let Some(log) = ignore_log {
127+
if line.contains(log) {
128+
if let Some(msg) = ignore_msg {
129+
warn!("{}", msg);
115130
}
131+
ignore_error = true;
132+
skip_line = true;
116133
}
117-
});
134+
}
135+
136+
if !skip_line {
137+
if is_stderr {
138+
eprintln!("{}", line);
139+
} else {
140+
println!("{}", line);
141+
}
142+
}
118143
}
144+
119145
if close_group {
120146
endgroup!();
121147
}
148+
122149
let status = child
123150
.wait()
124151
.expect("Should be able to wait for the process to finish.");
152+
125153
if status.success() || ignore_error {
126154
anyhow::Ok(())
127155
} else {

0 commit comments

Comments
 (0)