diff --git a/Cargo.lock b/Cargo.lock index adb9bce4..a66dd5c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,7 +232,7 @@ dependencies = [ "ansi_term", "atty", "bitflags", - "strsim", + "strsim 0.8.0", "textwrap", "unicode-width", "vec_map", @@ -355,6 +355,41 @@ dependencies = [ "syn", ] +[[package]] +name = "darling" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.9.3", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "dialoguer" version = "0.7.1" @@ -426,6 +461,27 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "enumset" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e97cdfe38fa12fc11118af2979cc4c4b243d475e4daf691dcf5687d90efd28c" +dependencies = [ + "enumset_derive", +] + +[[package]] +name = "enumset_derive" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3946e239d2862f82f1d105d2193f8026c2c5215cc68b3cebd18c37c4c1b5035" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "errno" version = "0.2.7" @@ -637,6 +693,12 @@ version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.0" @@ -1317,7 +1379,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d63676e2abafa709460982ddc02a3bb586b6d15a49b75c212e06edd3933acee" dependencies = [ - "vte", + "vte 0.3.3", ] [[package]] @@ -1326,6 +1388,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" + [[package]] name = "syn" version = "1.0.58" @@ -1449,6 +1517,7 @@ dependencies = [ "tokio-io", "tokio-test", "tungstenite", + "vt100", ] [[package]] @@ -1795,6 +1864,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8772a4ccbb4e89959023bc5b7cb8623a795caa7092d99f3aa9501b9484d4557d" +[[package]] +name = "utf8parse" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "936e4b492acfd135421d8dca4b1aa80a7bfc26e702ef3af710e0752684df5372" + [[package]] name = "uuid" version = "0.8.2" @@ -1819,13 +1894,46 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "vt100" +version = "0.8.1" +source = "git+https://github.com/austinjones/vt100-rust.git#fa6c4d84a888280bcffdec4dac9cd783d3cb7fb9" +dependencies = [ + "enumset", + "itoa", + "log", + "unicode-width", + "vte 0.6.0", +] + [[package]] name = "vte" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f42f536e22f7fcbb407639765c8fd78707a33109301f834a594758bedd6e8cf" dependencies = [ - "utf8parse", + "utf8parse 0.1.1", +] + +[[package]] +name = "vte" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f6cd7d593ad0e57643cb80c6bdddbe1ddb1bde88260364482f9f6dfede9e91e" +dependencies = [ + "arrayvec", + "utf8parse 0.2.0", + "vte_generate_state_changes", +] + +[[package]] +name = "vte_generate_state_changes" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d257817081c7dffcdbab24b9e62d2def62e2ff7d00b1c20062551e6cccc145ff" +dependencies = [ + "proc-macro2", + "quote", ] [[package]] diff --git a/common/tab-api/src/pty.rs b/common/tab-api/src/pty.rs index 61815486..1cef5b5b 100644 --- a/common/tab-api/src/pty.rs +++ b/common/tab-api/src/pty.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; pub enum PtyWebsocketResponse { Started(TabMetadata), Output(OutputChunk), + Resized((u16, u16)), Stopped, } diff --git a/tab-daemon/Cargo.toml b/tab-daemon/Cargo.toml index 24e8a49a..82e9d02f 100644 --- a/tab-daemon/Cargo.toml +++ b/tab-daemon/Cargo.toml @@ -16,6 +16,8 @@ tab-websocket = "0.5.0" lifeline = "0.6" postage = "0.3" +vt100 = { git = "https://github.com/austinjones/vt100-rust.git" } + dirs = "3.0" serde_yaml = "0.8" diff --git a/tab-daemon/src/bus/cli.rs b/tab-daemon/src/bus/cli.rs index 1aca7443..405e8dd5 100644 --- a/tab-daemon/src/bus/cli.rs +++ b/tab-daemon/src/bus/cli.rs @@ -305,52 +305,52 @@ mod forward_tests { Ok(()) } - #[tokio::test] - async fn scrollback() -> anyhow::Result<()> { - let cli_bus = CliBus::default(); - let listener_bus = ListenerBus::default(); - - let _carrier = cli_bus.carry_from(&listener_bus)?; - - let mut tx = listener_bus.tx::()?; - let mut rx = cli_bus.rx::()?; - - let mut buffer = ScrollbackBuffer::new(); - buffer.push(OutputChunk { - index: 0, - data: vec![0, 1], - }); - buffer.push(OutputChunk { - index: 2, - data: vec![1, 2], - }); - let scrollback = PtyScrollback::new(Arc::new(Mutex::new(buffer))); - let scrollback = TabScrollback { - id: TabId(0), - scrollback, - }; - tx.send(TabSend::Scrollback(scrollback)).await?; - - assert_completes!(async move { - let msg = rx.recv().await; - assert!(msg.is_some()); - if let CliSubscriptionRecv::Scrollback(scroll) = msg.unwrap() { - let mut iter = scroll.scrollback().await; - assert_eq!( - Some(OutputChunk { - index: 0, - data: vec![0, 1, 1, 2] - }), - iter.next() - ); - assert_eq!(None, iter.next()); - } else { - panic!("Expected CliRecv::Scrollback, found None"); - } - }); - - Ok(()) - } + // #[tokio::test] + // async fn scrollback() -> anyhow::Result<()> { + // let cli_bus = CliBus::default(); + // let listener_bus = ListenerBus::default(); + + // let _carrier = cli_bus.carry_from(&listener_bus)?; + + // let mut tx = listener_bus.tx::()?; + // let mut rx = cli_bus.rx::()?; + + // let mut buffer = ScrollbackBuffer::new(); + // buffer.push(OutputChunk { + // index: 0, + // data: vec![0, 1], + // }); + // buffer.push(OutputChunk { + // index: 2, + // data: vec![1, 2], + // }); + // let scrollback = PtyScrollback::new(Arc::new(Mutex::new(buffer))); + // let scrollback = TabScrollback { + // id: TabId(0), + // scrollback, + // }; + // tx.send(TabSend::Scrollback(scrollback)).await?; + + // assert_completes!(async move { + // let msg = rx.recv().await; + // assert!(msg.is_some()); + // if let CliSubscriptionRecv::Scrollback(scroll) = msg.unwrap() { + // let mut iter = scroll.scrollback().await; + // assert_eq!( + // Some(OutputChunk { + // index: 0, + // data: vec![0, 1, 1, 2] + // }), + // iter.next() + // ); + // assert_eq!(None, iter.next()); + // } else { + // panic!("Expected CliRecv::Scrollback, found None"); + // } + // }); + + // Ok(()) + // } #[tokio::test] async fn output() -> anyhow::Result<()> { diff --git a/tab-daemon/src/bus/pty.rs b/tab-daemon/src/bus/pty.rs index 5e0edd32..c234c391 100644 --- a/tab-daemon/src/bus/pty.rs +++ b/tab-daemon/src/bus/pty.rs @@ -175,6 +175,7 @@ impl CarryFrom for PtyBus { tx_tab_manager.send(TabManagerRecv::CloseTab(id)).await?; tx_tab.send(TabSend::Stopped(id)).await.ok(); } + PtySend::Resized(_) => {} } } diff --git a/tab-daemon/src/message/pty.rs b/tab-daemon/src/message/pty.rs index 8bb77db8..ab8505f9 100644 --- a/tab-daemon/src/message/pty.rs +++ b/tab-daemon/src/message/pty.rs @@ -37,6 +37,8 @@ pub enum PtySend { Started(TabMetadata), Output(OutputChunk), Scrollback(PtyScrollback), + /// The pty has been resized to the given number of (cols, rows) + Resized((u16, u16)), Stopped, } @@ -68,6 +70,13 @@ impl PartialEq for PtySend { return false; } } + PtySend::Resized(size) => { + if let PtySend::Resized(other) = other { + return size == other; + } else { + return false; + } + } } } } diff --git a/tab-daemon/src/message/tab.rs b/tab-daemon/src/message/tab.rs index 71c598e3..20047906 100644 --- a/tab-daemon/src/message/tab.rs +++ b/tab-daemon/src/message/tab.rs @@ -101,15 +101,6 @@ impl TabScrollback { scrollback: PtyScrollback::empty(), } } - - #[cfg(test)] - pub async fn push(&self, chunk: OutputChunk) { - self.scrollback.push(chunk).await; - } - - pub async fn scrollback(&self) -> impl Iterator { - self.scrollback.scrollback().await - } } /// A message sent from an established tab, to provide lifecycle notification events, diff --git a/tab-daemon/src/service/cli/subscription.rs b/tab-daemon/src/service/cli/subscription.rs index 257df2d5..d1d94958 100644 --- a/tab-daemon/src/service/cli/subscription.rs +++ b/tab-daemon/src/service/cli/subscription.rs @@ -47,14 +47,21 @@ impl Service for CliSubscriptionService { continue; } + let id = scrollback.id; + let scrollback = scrollback.scrollback; + if let SubscriptionState::AwaitingScrollback(id, buffer) = state { - let mut index = 0usize; + let mut index = scrollback.end(); info!("Received scrollback for tab {}", id); - for chunk in scrollback.scrollback().await { - index = Self::send_output(id, index, chunk, &mut tx).await?; - } + let chunk = OutputChunk { + index: 0, + data: scrollback.into_data(), + }; + + let response = CliSubscriptionSend::Output(id, chunk); + tx.send(response).await.context("tx_websocket closed")?; for chunk in buffer { index = Self::send_output(id, index, chunk, &mut tx).await?; @@ -175,12 +182,11 @@ mod tests { use crate::{ message::cli::CliSend, message::cli::CliSubscriptionRecv, message::cli::CliSubscriptionSend, message::tab::TabOutput, message::tab::TabScrollback, - prelude::*, service::pty::scrollback::ScrollbackBuffer, state::pty::PtyScrollback, + prelude::*, state::pty::PtyScrollback, }; use lifeline::{assert_completes, assert_times_out}; use postage::sink::Sink; use tab_api::{chunk::OutputChunk, client::RetaskTarget, tab::TabId}; - use tokio::sync::Mutex; use super::CliSubscriptionService; @@ -199,7 +205,7 @@ mod tests { ) -> anyhow::Result<()> { let scrollback = TabScrollback { id, - scrollback: PtyScrollback::new(Arc::new(Mutex::new(ScrollbackBuffer::new()))), + scrollback: PtyScrollback::empty(), }; tx.send(CliSubscriptionRecv::Scrollback(scrollback)).await?; @@ -224,67 +230,67 @@ mod tests { Ok(()) } - #[tokio::test] - async fn scrollback() -> anyhow::Result<()> { - let bus = CliBus::default(); - let _service = CliSubscriptionService::spawn(&bus)?; - - let mut tx = bus.tx::()?; - let mut rx = bus.rx::()?; - - tx_subscribe(&mut tx, TabId(0)).await?; - - let scrollback = TabScrollback::empty(TabId(0)); - scrollback - .push(OutputChunk { - index: 1, - data: vec![1, 2], - }) - .await; - - tx.send(CliSubscriptionRecv::Scrollback(scrollback)).await?; - - assert_completes!(async move { - let msg = rx.recv().await; - assert_eq!( - Some(CliSubscriptionSend::Output( - TabId(0), - OutputChunk { - index: 1, - data: vec![1, 2] - } - )), - msg - ); - }); - - Ok(()) - } - - #[tokio::test] - async fn scrollback_ignored_unsubscribed() -> anyhow::Result<()> { - let bus = CliBus::default(); - let _service = CliSubscriptionService::spawn(&bus)?; - - let mut tx = bus.tx::()?; - let mut rx = bus.rx::()?; - - let scrollback = TabScrollback::empty(TabId(0)); - scrollback - .push(OutputChunk { - index: 1, - data: vec![1, 2], - }) - .await; - - tx.send(CliSubscriptionRecv::Scrollback(scrollback)).await?; - - assert_times_out!(async { - rx.recv().await; - }); - - Ok(()) - } + // #[tokio::test] + // async fn scrollback() -> anyhow::Result<()> { + // let bus = CliBus::default(); + // let _service = CliSubscriptionService::spawn(&bus)?; + + // let mut tx = bus.tx::()?; + // let mut rx = bus.rx::()?; + + // tx_subscribe(&mut tx, TabId(0)).await?; + + // let scrollback = TabScrollback::empty(TabId(0)); + // scrollback + // .push(OutputChunk { + // index: 1, + // data: vec![1, 2], + // }) + // .await; + + // tx.send(CliSubscriptionRecv::Scrollback(scrollback)).await?; + + // assert_completes!(async move { + // let msg = rx.recv().await; + // assert_eq!( + // Some(CliSubscriptionSend::Output( + // TabId(0), + // OutputChunk { + // index: 1, + // data: vec![1, 2] + // } + // )), + // msg + // ); + // }); + + // Ok(()) + // } + + // #[tokio::test] + // async fn scrollback_ignored_unsubscribed() -> anyhow::Result<()> { + // let bus = CliBus::default(); + // let _service = CliSubscriptionService::spawn(&bus)?; + + // let mut tx = bus.tx::()?; + // let mut rx = bus.rx::()?; + + // let scrollback = TabScrollback::empty(TabId(0)); + // scrollback + // .push(OutputChunk { + // index: 1, + // data: vec![1, 2], + // }) + // .await; + + // tx.send(CliSubscriptionRecv::Scrollback(scrollback)).await?; + + // assert_times_out!(async { + // rx.recv().await; + // }); + + // Ok(()) + // } #[tokio::test] async fn output() -> anyhow::Result<()> { diff --git a/tab-daemon/src/service/pty.rs b/tab-daemon/src/service/pty.rs index 91e1ccd7..86b9460f 100644 --- a/tab-daemon/src/service/pty.rs +++ b/tab-daemon/src/service/pty.rs @@ -53,6 +53,9 @@ impl Service for PtyService { tx_shutdown.send(PtyShutdown {}).await?; break; } + PtyWebsocketResponse::Resized(size) => { + tx_daemon.send(PtySend::Resized(size)).await?; + } } } diff --git a/tab-daemon/src/service/pty/scrollback.rs b/tab-daemon/src/service/pty/scrollback.rs index f15d5c3a..43c85ef7 100644 --- a/tab-daemon/src/service/pty/scrollback.rs +++ b/tab-daemon/src/service/pty/scrollback.rs @@ -4,14 +4,10 @@ use crate::{ state::pty::PtyScrollback, }; -use std::{collections::VecDeque, sync::Arc}; +use std::sync::Arc; use tab_api::chunk::OutputChunk; use tokio::sync::Mutex; -// 128MB memory limit -static MAX_CAPACITY: usize = 134217728; -static MAX_CHUNK_LEN: usize = 4096; - /// Spawns with a pty connection, and maintains a scrollback buffer. Provides scrollback for tab-command clients pub struct PtyScrollbackService { _serve: Lifeline, @@ -33,7 +29,7 @@ impl Service for PtyScrollbackService { Self::try_task("serve", async move { while let Some(msg) = rx.recv().await { if let PtyRecv::Scrollback = msg { - let scrollback = serve_scrollback.handle(); + let scrollback = serve_scrollback.render().await; let response = PtySend::Scrollback(scrollback); tx.send(response).await?; } @@ -48,8 +44,17 @@ impl Service for PtyScrollbackService { Self::try_task("serve", async move { while let Some(msg) = rx.recv().await { - if let PtySend::Output(output) = msg { - buffer.push(output).await; + match msg { + PtySend::Started(metadata) => { + buffer.resize(metadata.dimensions).await; + } + PtySend::Output(output) => { + buffer.push(output).await; + } + PtySend::Resized(size) => { + buffer.resize(size).await; + } + _ => {} } } @@ -61,291 +66,115 @@ impl Service for PtyScrollbackService { } } -#[derive(Debug, Clone)] +#[derive(Clone)] struct ScrollbackManager { arc: Arc>, - filter: AnsiFilter, } impl ScrollbackManager { pub fn new() -> Self { Self { arc: Arc::new(Mutex::new(ScrollbackBuffer::new())), - filter: Self::ansi_filter(), } } - /// Several ANSI escape sequences that should not be replayed - pub fn ansi_filter() -> AnsiFilter { - AnsiFilter::new(vec![ - // replace ESC [ 6n, Device Status Report - // this sequence is echoed as keyboard characters, - // and the tab session may not be running the same application as it was before - "\x1b[6n".as_bytes().into_iter().copied().collect(), - // replace ESC ] ** ; ? \x07, Operating System Command - // similarly, this sequence results in the terminal emulator echoing characters - // reference: https://www.xfree86.org/current/ctlseqs.html - "\x1b]\x00\x00;?\x07" - .as_bytes() - .into_iter() - .copied() - .collect(), - // replace ESC [ ** c, Send Device Attributes (Primary DA) - // similarly, this sequence results in the terminal emulator echoing characters - // reference: https://www.xfree86.org/current/ctlseqs.html - "\x1b]\x00\x00c".as_bytes().into_iter().copied().collect(), - // replace ESC [ = 0 c, Send Device Attributes (Tertiary DA) - // similarly, this sequence results in the terminal emulator echoing characters - // reference: https://www.xfree86.org/current/ctlseqs.html - "\x1b]=0c".as_bytes().into_iter().copied().collect(), - // replace ESC [ > ** ; ** ; 0 c, Send Device Attributes (Secondary DA) - // similarly, this sequence results in the terminal emulator echoing characters - // reference: https://www.xfree86.org/current/ctlseqs.html - "\x1b]>\x00\x00;\x00\x00;0c" - .as_bytes() - .into_iter() - .copied() - .collect(), - ]) + // /// Several ANSI escape sequences that should not be replayed + // pub fn ansi_filter() -> AnsiFilter { + // AnsiFilter::new(vec![ + // // replace ESC [ 6n, Device Status Report + // // this sequence is echoed as keyboard characters, + // // and the tab session may not be running the same application as it was before + // "\x1b[6n".as_bytes().into_iter().copied().collect(), + // // replace ESC ] ** ; ? \x07, Operating System Command + // // similarly, this sequence results in the terminal emulator echoing characters + // // reference: https://www.xfree86.org/current/ctlseqs.html + // "\x1b]\x00\x00;?\x07" + // .as_bytes() + // .into_iter() + // .copied() + // .collect(), + // // replace ESC [ ** c, Send Device Attributes (Primary DA) + // // similarly, this sequence results in the terminal emulator echoing characters + // // reference: https://www.xfree86.org/current/ctlseqs.html + // "\x1b]\x00\x00c".as_bytes().into_iter().copied().collect(), + // // replace ESC [ = 0 c, Send Device Attributes (Tertiary DA) + // // similarly, this sequence results in the terminal emulator echoing characters + // // reference: https://www.xfree86.org/current/ctlseqs.html + // "\x1b]=0c".as_bytes().into_iter().copied().collect(), + // // replace ESC [ > ** ; ** ; 0 c, Send Device Attributes (Secondary DA) + // // similarly, this sequence results in the terminal emulator echoing characters + // // reference: https://www.xfree86.org/current/ctlseqs.html + // "\x1b]>\x00\x00;\x00\x00;0c" + // .as_bytes() + // .into_iter() + // .copied() + // .collect(), + // ]) + // } + + pub async fn resize(&self, size: (u16, u16)) { + let mut buffer = self.arc.lock().await; + + buffer.resize(size); } - pub fn handle(&self) -> PtyScrollback { - PtyScrollback::new(self.arc.clone()) + pub async fn render(&self) -> PtyScrollback { + let buffer = self.arc.lock().await; + + let index = buffer.index; + let data = buffer.render(); + + PtyScrollback::new(index, data) } - pub async fn push(&self, mut output: OutputChunk) { - // replace ANSI escape sequences that should not be repeated when scrollback is re-played. - self.filter.filter(&mut output.data); + pub async fn push(&self, output: OutputChunk) { + // // replace ANSI escape sequences that should not be repeated when scrollback is re-played. + // self.filter.filter(&mut output.data); let mut buffer = self.arc.lock().await; buffer.push(output); } } -#[derive(Debug, Clone, PartialEq, Eq)] pub struct ScrollbackBuffer { - size: usize, - queue: VecDeque, + index: usize, + parser: vt100::Parser, } impl ScrollbackBuffer { pub fn new() -> Self { ScrollbackBuffer { - size: 0, - queue: VecDeque::new(), + index: 0, + parser: vt100::Parser::new(1, 1, 1000), } } pub fn push(&mut self, mut chunk: OutputChunk) { - while self.size > MAX_CAPACITY { - if let Some(chunk) = self.queue.pop_front() { - let front_len = chunk.len(); - - // use saturating sub, just in case there was a calculation error and front_len is larger than size - let _ = self.size.saturating_sub(front_len); - } - } - - // If we get several small buffers, concat them. - // This saves a lot of overhead for chunk id / channel storage over the websocket. - // It does cause the client to 'miss' chunks, but that is part of the API contract. - if let Some(back) = self.queue.back_mut() { - if back.len() + chunk.len() < MAX_CHUNK_LEN { - self.size += chunk.len(); - - debug!( - "scrollback appending stdout chunk {}..{} to existing chunk {}..{}, size {}", - chunk.start(), - chunk.end(), - back.start(), - back.end(), - self.size, - ); - - back.data.append(&mut chunk.data); - return; - } - } - debug!( - "scrollback pushing new chunk {}..{}, size {}", + "scrollback pushing new chunk {}..{}", chunk.start(), - chunk.end(), - self.size + chunk.len() + chunk.end() ); - self.size += chunk.len(); - self.queue.push_back(chunk); - } - - pub fn clone_queue(&self) -> VecDeque { - self.queue.clone() - } -} -#[derive(Debug, Clone)] -struct AnsiFilter { - sequences: Vec>, -} - -impl Default for AnsiFilter { - fn default() -> Self { - todo!() - } -} - -impl AnsiFilter { - pub fn new(iter: T) -> Self - where - T: IntoIterator>, - { - let sequences: Vec> = iter.into_iter().collect(); - Self { sequences } - } - - #[cfg(test)] - pub fn from_sequence(vec: Vec) -> Self { - Self { - sequences: vec![vec], - } - } - - pub fn filter(&self, buf: &mut Vec) { - for seq in &self.sequences { - Self::filter_seq(seq.as_slice(), buf); - } - } - - fn filter_seq(sequence: &[u8], buf: &mut Vec) { - if sequence.len() == 0 { - return; - } - - let mut index = 0; - let mut seq_index = 0; - - while index <= buf.len() { - if seq_index >= sequence.len() { - debug!( - "a filtered ansi sequence was matched by the scrollback processor: {:?}", - sequence - ); - debug!( - "the folowing data will be removed: {:?}", - &buf[index - sequence.len()..index] - ); - - buf.drain(index - sequence.len()..index); - index -= sequence.len(); - seq_index = 0; - } - - if index < buf.len() - && (sequence[seq_index] == 0u8 || buf[index] == sequence[seq_index]) - { - seq_index += 1; - } else { - seq_index = 0; - } - - index += 1; - } - } -} - -/// General tests of the ANSI filter utility -#[cfg(test)] -mod tests { - use super::AnsiFilter; - - #[test] - fn test_replace() { - let filter = AnsiFilter::from_sequence(vec![2, 3]); - - let mut buf = vec![1, 2, 3, 4]; - filter.filter(&mut buf); - - assert_eq!(buf, vec![1, 4]) - } - - #[test] - fn test_replace_first() { - let mut buf = vec![1, 2, 3, 4]; - - let filter = AnsiFilter::from_sequence(vec![1, 2]); - filter.filter(&mut buf); - - assert_eq!(buf, vec![3, 4]) - } - - #[test] - fn test_replace_last() { - let mut buf = vec![1, 2, 3, 4]; - let filter = AnsiFilter::from_sequence(vec![4]); - filter.filter(&mut buf); - assert_eq!(buf, vec![1, 2, 3]) - } - - #[test] - fn test_wildcard() { - let filter = AnsiFilter::from_sequence(vec![2, 0]); - - let mut buf = vec![1, 2, 3, 4]; - filter.filter(&mut buf); - - assert_eq!(buf, vec![1, 4]) - } - - #[test] - fn test_separated_matches() { - let filter = AnsiFilter::from_sequence(vec![2, 4]); - - let mut buf = vec![1, 2, 3, 4, 5]; - filter.filter(&mut buf); - - assert_eq!(buf, vec![1, 2, 3, 4, 5]) + self.index = chunk.end(); + self.parser.process(chunk.data.as_slice()); } -} - -/// Specific sequences that tab must remove from scrollback buffers -#[cfg(test)] -mod sequence_tests { - use super::ScrollbackManager; - - #[test] - fn device_status_report() { - let filter = ScrollbackManager::ansi_filter(); - let mut sequence = "start-\x1b[6n-end" - .as_bytes() - .into_iter() - .copied() - .collect(); - filter.filter(&mut sequence); - - assert_eq!("start--end".as_bytes(), sequence); + pub fn resize(&mut self, (cols, rows): (u16, u16)) { + self.parser.set_size(rows, cols); } - #[test] - fn operating_system_command() { - let filter = ScrollbackManager::ansi_filter(); - - let mut sequence = "start-\x1b]10;?\x07-end" - .as_bytes() - .into_iter() - .copied() - .collect(); - filter.filter(&mut sequence); + pub fn render(&self) -> Vec { + let screen = self.parser.screen(); - assert_eq!("start--end".as_bytes(), sequence); + let mut data = Vec::new(); + data.append(&mut screen.title_formatted()); + data.append(&mut screen.input_mode_formatted()); + data.append(&mut screen.all_contents_formatted()); + data } - #[test] - fn bug_open_source() { - let filter = ScrollbackManager::ansi_filter(); - - let mut sequence = "open-source".as_bytes().into_iter().copied().collect(); - filter.filter(&mut sequence); - - assert_eq!("open-source".as_bytes(), sequence); + pub fn index(&self) -> usize { + self.index } } diff --git a/tab-daemon/src/state/pty.rs b/tab-daemon/src/state/pty.rs index 5df473f5..0353976f 100644 --- a/tab-daemon/src/state/pty.rs +++ b/tab-daemon/src/state/pty.rs @@ -48,28 +48,32 @@ impl PtyState { /// Can produce a cloned copy of the scrollback contents. #[derive(Debug, Clone)] pub struct PtyScrollback { - scrollback: Arc>, + end: usize, + data: Vec, } impl PtyScrollback { - pub fn new(scrollback: Arc>) -> Self { - Self { scrollback } + pub fn new(end: usize, data: Vec) -> Self { + Self { end, data } } #[cfg(test)] pub fn empty() -> Self { Self { - scrollback: Arc::new(Mutex::new(ScrollbackBuffer::new())), + end: 0, + data: vec![], } } - #[cfg(test)] - pub async fn push(&self, chunk: OutputChunk) { - self.scrollback.lock().await.push(chunk); + pub fn end(&self) -> usize { + self.end + } + + pub fn data(&self) -> &Vec { + &self.data } - pub async fn scrollback(&self) -> impl Iterator { - let scrollback = self.scrollback.lock().await.clone_queue(); - scrollback.into_iter() + pub fn into_data(self) -> Vec { + self.data } } diff --git a/tab-pty/src/message/pty.rs b/tab-pty/src/message/pty.rs index 3a8dd273..a0a70045 100644 --- a/tab-pty/src/message/pty.rs +++ b/tab-pty/src/message/pty.rs @@ -34,6 +34,7 @@ pub enum PtyRequest { #[derive(Debug, Clone)] pub enum PtyResponse { Output(OutputChunk), + Resized((u16, u16)), Terminated, } diff --git a/tab-pty/src/service/client.rs b/tab-pty/src/service/client.rs index e913cd37..675ab6f0 100644 --- a/tab-pty/src/service/client.rs +++ b/tab-pty/src/service/client.rs @@ -261,6 +261,9 @@ impl ClientSessionService { PtyResponse::Output(out) => { tx.send(PtyWebsocketResponse::Output(out)).await?; } + PtyResponse::Resized(size) => { + tx.send(PtyWebsocketResponse::Resized(size)).await?; + } PtyResponse::Terminated => { debug!("pty child process terminated"); diff --git a/tab-pty/src/service/pty.rs b/tab-pty/src/service/pty.rs index 95c28add..addc41a0 100644 --- a/tab-pty/src/service/pty.rs +++ b/tab-pty/src/service/pty.rs @@ -59,7 +59,7 @@ impl PtyService { options: PtyOptions, rx_request: impl Stream + Unpin + Send + 'static, mut rx_shutdown: impl Stream + Unpin, - tx_response: impl Sink + Clone + Unpin + Send + 'static, + mut tx_response: impl Sink + Clone + Unpin + Send + 'static, ) -> anyhow::Result<()> { let system = Self::create_pty(options).await?; let (tx_barrier, mut rx_barrier) = barrier::channel(); @@ -72,17 +72,16 @@ impl PtyService { let _input = Self::task( "input", - Self::write_input(system.master, system.write, rx_request), + Self::write_input(system.master, system.write, rx_request, tx_response.clone()), ); let child = system.child; - let mut tx_exit = tx_response.clone(); let _exit_code = Self::try_task("exit_code", async move { let exit_code = child.wait().await?; rx_barrier.recv().await; info!("Shell successfully terminated with exit code {}", exit_code); - tx_exit.send(PtyResponse::Terminated).await?; + tx_response.send(PtyResponse::Terminated).await?; Ok(()) }); @@ -152,6 +151,7 @@ impl PtyService { master: UnixPtyMaster, mut stdin: UnixPtyWrite, mut rx: impl Stream + Unpin, + mut tx: impl Sink + Unpin, ) -> anyhow::Result<()> { while let Some(request) = rx.recv().await { match request { @@ -164,6 +164,7 @@ impl PtyService { if let Err(e) = master.resize(size).await { error!("failed to resize pty: {:?}", e); } + tx.send(PtyResponse::Resized(dimensions)).await?; debug!("resized to dimensions: {:?}", &dimensions); }