Skip to content

Commit 3916356

Browse files
authored
Do not check readiness for call (ntex-rs#178)
1 parent 001ae5f commit 3916356

File tree

3 files changed

+17
-23
lines changed

3 files changed

+17
-23
lines changed

CHANGES.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changes
22

3+
## [4.1.0] - 2024-10-10
4+
5+
* Do not check readiness for call
6+
7+
* Handle service readiness errors during shutdown
8+
39
## [4.0.0] - 2024-10-05
410

511
* Middlewares support for mqtt server

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-mqtt"
3-
version = "4.0.0"
3+
version = "4.1.0"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
66
documentation = "https://docs.rs/ntex-mqtt"

src/io.rs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
use std::task::{ready, Context, Poll};
33
use std::{cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc};
44

5-
use ntex_bytes::Pool;
65
use ntex_codec::{Decoder, Encoder};
76
use ntex_io::{
87
Decoded, DispatchItem, DispatcherConfig, IoBoxed, IoRef, IoStatusUpdate, RecvError,
@@ -22,8 +21,7 @@ pin_project_lite::pin_project! {
2221
U: Decoder,
2322
U: 'static,
2423
{
25-
inner: DispatcherInner<S, U>,
26-
pool: Pool,
24+
inner: DispatcherInner<S, U>
2725
}
2826
}
2927

@@ -122,11 +120,9 @@ where
122120
base: 0,
123121
queue: VecDeque::new(),
124122
}));
125-
let pool = io.memory_pool().pool();
126123
let keepalive_timeout = config.keepalive_timeout();
127124

128125
Dispatcher {
129-
pool,
130126
inner: DispatcherInner {
131127
io,
132128
codec,
@@ -249,14 +245,6 @@ where
249245
}
250246
}
251247

252-
// handle memory pool pressure
253-
if this.pool.poll_ready(cx).is_pending() {
254-
inner.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
255-
inner.io.stop_timer();
256-
inner.io.pause();
257-
return Poll::Pending;
258-
}
259-
260248
loop {
261249
match inner.st {
262250
IoDispatcherState::Processing => {
@@ -312,9 +300,7 @@ where
312300
IoDispatcherState::Backpressure => {
313301
match ready!(inner.poll_service(cx)) {
314302
PollService::Ready => (),
315-
PollService::Item(item) => {
316-
inner.call_service(cx, item);
317-
}
303+
PollService::Item(item) => inner.call_service(cx, item),
318304
PollService::Continue => continue,
319305
};
320306

@@ -334,7 +320,11 @@ where
334320

335321
// service may relay on poll_ready for response results
336322
if !inner.flags.contains(Flags::READY_ERR) {
337-
let _ = inner.service.poll_ready(cx);
323+
if let Poll::Ready(res) = inner.service.poll_ready(cx) {
324+
if res.is_err() {
325+
inner.flags.insert(Flags::READY_ERR);
326+
}
327+
}
338328
}
339329

340330
if inner.state.borrow().queue.is_empty() {
@@ -394,7 +384,7 @@ where
394384
{
395385
fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>) {
396386
let mut state = self.state.borrow_mut();
397-
let mut fut = self.service.call(item);
387+
let mut fut = self.service.call_nowait(item);
398388

399389
// optimize first call
400390
if self.response.is_none() {
@@ -429,8 +419,7 @@ where
429419
let codec = self.codec.clone();
430420
let state = self.state.clone();
431421

432-
#[allow(clippy::let_underscore_future)]
433-
let _ = ntex_util::spawn(async move {
422+
ntex_util::spawn(async move {
434423
let item = fut.await;
435424
state.borrow_mut().handle_result(item, response_idx, &st, &codec, true);
436425
});
@@ -502,8 +491,8 @@ where
502491
Poll::Ready(Err(err)) => {
503492
log::trace!("{}: Service readiness check failed, stopping", self.io.tag());
504493
self.st = IoDispatcherState::Stop;
505-
self.state.borrow_mut().error = Some(IoDispatcherError::Service(err));
506494
self.flags.insert(Flags::READY_ERR);
495+
self.state.borrow_mut().error = Some(IoDispatcherError::Service(err));
507496
Poll::Ready(PollService::Continue)
508497
}
509498
}
@@ -629,7 +618,6 @@ mod tests {
629618

630619
(
631620
Dispatcher {
632-
pool: io.memory_pool().pool(),
633621
inner: DispatcherInner {
634622
codec,
635623
state,

0 commit comments

Comments
 (0)