Skip to content

Commit c545c76

Browse files
committed
Merge branch 'main' of https://github.com/swimos/swim-rust into fluvio
2 parents 6829355 + b06cb54 commit c545c76

File tree

5 files changed

+126
-14
lines changed

5 files changed

+126
-14
lines changed

example_apps/transit/src/ui.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ impl UiConfig {
3939
pub fn ui_server_router(port: u16) -> Router {
4040
Router::new()
4141
.route("/index.html", get(index_html))
42-
.route("/dist/main/swimos-transit.js", get(transit_js))
43-
.route("/dist/main/swimos-transit.js.map", get(transit_js_map))
42+
.route("/dist/main/swim-transit.js", get(transit_js))
43+
.route("/dist/main/swim-transit.js.map", get(transit_js_map))
4444
.with_state(UiConfig { port })
4545
}
4646

@@ -49,7 +49,7 @@ const INDEX: &str = "ui/index.html";
4949
async fn index_html(State(UiConfig { port }): State<UiConfig>) -> impl IntoResponse {
5050
if let Ok(file) = File::open(INDEX).await {
5151
let lines = LinesStream::new(BufReader::new(file).lines()).map_ok(move |mut line| {
52-
if line == "const portParam = baseUri.port();" {
52+
if line.contains("const portParam = baseUri.port();") {
5353
format!("const portParam = {};\n", port)
5454
} else {
5555
line.push('\n');
@@ -78,15 +78,15 @@ async fn index_html(State(UiConfig { port }): State<UiConfig>) -> impl IntoRespo
7878

7979
async fn transit_js() -> impl IntoResponse {
8080
let headers = [(header::CONTENT_TYPE, JS.clone())];
81-
(headers, load_file("dist/main/swimos-transit.js").await)
81+
(headers, load_file("dist/main/swim-transit.js").await)
8282
}
8383

8484
static HTML: HeaderValue = HeaderValue::from_static("text/html; charset=utf-8");
8585
static JS: HeaderValue = HeaderValue::from_static("application/json");
8686

8787
async fn transit_js_map() -> impl IntoResponse {
8888
let headers = [(header::CONTENT_TYPE, JS.clone())];
89-
(headers, load_file("dist/main/swimos-transit.js.map").await)
89+
(headers, load_file("dist/main/swim-transit.js.map").await)
9090
}
9191

9292
async fn load_file(path: &str) -> impl IntoResponse {

server/swimos_server_app/src/server/http/mod.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use ratchet::{
3838
WebSocketStream,
3939
};
4040
use ratchet_core::server::UpgradeRequestParts;
41+
use std::env::{var, VarError};
4142
use std::{
4243
marker::PhantomData,
4344
net::SocketAddr,
@@ -61,6 +62,7 @@ use tokio::{
6162
sync::mpsc,
6263
time::{sleep, Sleep},
6364
};
65+
use tracing::{debug, warn};
6466

6567
mod resolver;
6668
#[cfg(test)]
@@ -375,6 +377,37 @@ where
375377
}
376378
}
377379

380+
/// Returns whether to validate the upgrade's requested subprotocol. Users may disable this check
381+
/// by setting WS_NO_SUBPROTOCOL_CHECK=true if they are running a legacy version of Swim which would be
382+
/// incompatible with this version. If the environment variable has not been set or it is invalid
383+
/// then the subprotocol will still be verified and a log entry will be emitted.
384+
///
385+
/// Defaults to `true`.
386+
fn validate_subprotocol() -> bool {
387+
match var("WS_NO_SUBPROTOCOL_CHECK") {
388+
Ok(s) => match s.as_str() {
389+
"true" => {
390+
debug!("Skipping subprotocol check due to WS_NO_SUBPROTOCOL_CHECK=true");
391+
false
392+
}
393+
"false" => true,
394+
s => {
395+
warn!("WS_NO_SUBPROTOCOL_CHECK set to an invalid value. Ignoring. Should be 'true' or 'false': {}", s);
396+
true
397+
}
398+
},
399+
Err(VarError::NotPresent) => true,
400+
Err(VarError::NotUnicode(value)) => {
401+
let value = value.to_string_lossy();
402+
warn!(
403+
"WS_NO_SUBPROTOCOL_CHECK set to non-Unicode value. Ignoring: {}",
404+
value
405+
);
406+
true
407+
}
408+
}
409+
}
410+
378411
/// Perform the websocket negotiation and assign the upgrade future to the target parameter.
379412
fn perform_upgrade<Ext, Sock, B>(
380413
config: WebSocketConfig,
@@ -392,7 +425,7 @@ where
392425
{
393426
match result {
394427
Ok(parts) => {
395-
if parts.subprotocol.is_none() {
428+
if parts.subprotocol.is_none() && validate_subprotocol() {
396429
// We can only speak warp0 so fail the upgrade.
397430
let response = swimos_http::fail_upgrade("Failed to negotiate warp0 subprotocol");
398431
(response, None)

swimos_utilities/swimos_byte_channel/src/coop/mod.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::{
1919
task::{Context, Poll},
2020
};
2121

22-
use futures::Future;
22+
use futures::{ready, Future};
2323
use pin_project::pin_project;
2424

2525
#[cfg(test)]
@@ -71,7 +71,7 @@ fn set_budget(n: usize) {
7171
})
7272
}
7373

74-
/// Wraps a futures and ensures that the byte channel budget is reset each time it is polled.
74+
/// Wraps a futures and ensures that the task budget is reset each time it is polled.
7575
#[pin_project]
7676
#[derive(Debug, Clone, Copy)]
7777
pub struct RunWithBudget<F> {
@@ -95,22 +95,27 @@ impl<F> RunWithBudget<F> {
9595
}
9696
}
9797

98-
/// Extension trait to allow futures to be run with a byte channel budget.
98+
/// Extension trait to allow futures to be run with a task budget.
9999
pub trait BudgetedFutureExt: Sized + Future {
100-
/// Run this future with the default byte channel budget.
100+
/// Run this future with the default task budget.
101101
fn budgeted(self) -> RunWithBudget<Self> {
102102
RunWithBudget::new(self)
103103
}
104104

105-
/// Run this future with the specified byte channel budget.
105+
/// Run this future with the specified task budget.
106106
fn with_budget(self, budget: NonZeroUsize) -> RunWithBudget<Self> {
107107
RunWithBudget::with_budget(budget, self)
108108
}
109109

110-
/// Run this future wit a specified byte channel budget or the default if not is specified.
110+
/// Run this future wit a specified task budget or the default if not is specified.
111111
fn with_budget_or_default(self, budget: Option<NonZeroUsize>) -> RunWithBudget<Self> {
112112
RunWithBudget::with_budget(budget.unwrap_or(DEFAULT_START_BUDGET), self)
113113
}
114+
115+
/// Run this future, consuming budget if it does work.
116+
fn consuming(self) -> BudgetConsumer<Self> {
117+
BudgetConsumer::new(self)
118+
}
114119
}
115120

116121
impl<F: Future> BudgetedFutureExt for F {}
@@ -133,3 +138,26 @@ impl<F: Future> Future for RunWithBudget<F> {
133138
projected.fut.poll(cx)
134139
}
135140
}
141+
142+
/// Wraps a future to consume the task budget when it performs work.
143+
#[pin_project]
144+
pub struct BudgetConsumer<F> {
145+
#[pin]
146+
fut: F,
147+
}
148+
149+
impl<F> BudgetConsumer<F> {
150+
pub fn new(fut: F) -> Self {
151+
BudgetConsumer { fut }
152+
}
153+
}
154+
155+
impl<F: Future> Future for BudgetConsumer<F> {
156+
type Output = F::Output;
157+
158+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159+
ready!(consume_budget(cx));
160+
let projected = self.project();
161+
track_progress(projected.fut.poll(cx))
162+
}
163+
}

swimos_utilities/swimos_byte_channel/src/coop/tests.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use futures::task::{waker, ArcWake};
15+
use futures::{
16+
future::{pending, ready},
17+
task::{waker, ArcWake},
18+
};
1619
use std::{
1720
cell::Cell,
21+
future::Future,
1822
num::NonZeroUsize,
1923
sync::{atomic::AtomicBool, Arc},
2024
task::{Context, Poll},
@@ -25,6 +29,8 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
2529

2630
use crate::{byte_channel, RunWithBudget};
2731

32+
use super::BudgetedFutureExt;
33+
2834
struct TestWaker(AtomicBool);
2935

3036
impl TestWaker {
@@ -165,3 +171,48 @@ async fn with_budget_sets_budget() {
165171

166172
fut.await;
167173
}
174+
175+
#[test]
176+
fn consume_budget_consumes() {
177+
let w = Arc::new(TestWaker::default());
178+
let waker = waker(w.clone());
179+
let mut cx = Context::from_waker(&waker);
180+
super::set_budget(2);
181+
182+
let fut = pin!(ready(0).consuming());
183+
184+
assert_eq!(fut.poll(&mut cx), Poll::Ready(0));
185+
assert_eq!(super::TASK_BUDGET.with(Cell::get), Some(1));
186+
187+
assert!(!w.triggered());
188+
}
189+
190+
#[test]
191+
fn consume_budget_pending_no_consume() {
192+
let w = Arc::new(TestWaker::default());
193+
let waker = waker(w.clone());
194+
let mut cx = Context::from_waker(&waker);
195+
super::set_budget(2);
196+
197+
let fut = pin!(pending::<i32>().consuming());
198+
199+
assert_eq!(fut.poll(&mut cx), Poll::Pending);
200+
assert_eq!(super::TASK_BUDGET.with(Cell::get), Some(2));
201+
assert!(!w.triggered());
202+
}
203+
204+
#[test]
205+
fn consume_budget_yields_on_exhaustion() {
206+
let w = Arc::new(TestWaker::default());
207+
let waker = waker(w.clone());
208+
let mut cx = Context::from_waker(&waker);
209+
super::set_budget(1);
210+
211+
let mut fut = pin!(ready(0).consuming());
212+
213+
assert_eq!(fut.as_mut().poll(&mut cx), Poll::Pending);
214+
assert_eq!(super::TASK_BUDGET.with(Cell::get), None);
215+
assert!(w.triggered());
216+
217+
assert_eq!(fut.poll(&mut cx), Poll::Ready(0));
218+
}

swimos_utilities/swimos_byte_channel/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,4 @@ mod coop;
3838
pub use channel::{are_connected, byte_channel, ByteReader, ByteWriter};
3939

4040
#[cfg(feature = "coop")]
41-
pub use coop::{BudgetedFutureExt, RunWithBudget};
41+
pub use coop::{BudgetConsumer, BudgetedFutureExt, RunWithBudget};

0 commit comments

Comments
 (0)