Skip to content

Commit 39e8103

Browse files
committed
feat(core): reimplement stale THP channel interruption
It has been removed by #5546. [no changelog]
1 parent 165d121 commit 39e8103

File tree

6 files changed

+64
-9
lines changed

6 files changed

+64
-9
lines changed

core/src/trezor/wire/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ async def handle_session(iface: WireInterface) -> None:
104104
if __debug__:
105105
_THP_CHANNELS.append(ctx._channels)
106106
try:
107-
channel = await ctx.get_next_message()
108-
while await received_message_handler.handle_received_message(channel):
107+
while await received_message_handler.handle_received_message(ctx):
109108
pass
110109
finally:
111110
# Wait for all active workflows to finish.

core/src/trezor/wire/thp/channel.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import ustruct
2+
import utime
23
from micropython import const
34
from typing import TYPE_CHECKING
45

@@ -58,6 +59,8 @@
5859
_MAX_RETRANSMISSION_COUNT = const(50)
5960
_MIN_RETRANSMISSION_COUNT = const(2)
6061

62+
_STALE_CHANNEL_TIMEOUT_MS = const(500)
63+
6164

6265
class Reassembler:
6366
def __init__(self, cid: int, read_buf: ThpBuffer) -> None:
@@ -129,6 +132,11 @@ def verify_checksum(buffer: memoryview) -> memoryview | None:
129132
return None
130133

131134

135+
# Raised when a new channel is being allocated, in order to close an existing (unresponsive) workflow.
136+
class StaleChannelInterrupt(Exception):
137+
pass
138+
139+
132140
class Channel:
133141
"""
134142
THP protocol encrypted communication channel.
@@ -270,9 +278,23 @@ async def _get_reassembled_message(
270278
self, timeout_ms: int | None = None
271279
) -> memoryview:
272280
"""Doesn't block if a message has been already reassembled."""
281+
start_ms = utime.ticks_ms()
273282
while self.reassembler.message is None:
274283
# receive and reassemble a new message from this channel
275284
channel = await self.ctx.get_next_message(timeout_ms=timeout_ms)
285+
if channel is None:
286+
# interrupted by a new channel allocation request
287+
elapsed = utime.ticks_diff(utime.ticks_ms(), start_ms)
288+
if elapsed < _STALE_CHANNEL_TIMEOUT_MS:
289+
continue
290+
if __debug__:
291+
self._log(
292+
f"Stale channel interrupted after {elapsed} ms",
293+
logger=log.warning,
294+
)
295+
# Will close stale workflow and restart the event loop, in order to allow new channels to be handled
296+
raise StaleChannelInterrupt
297+
276298
if channel is self:
277299
break
278300

core/src/trezor/wire/thp/interface_context.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,12 @@ def __init__(self, iface: WireInterface) -> None:
4747
self._write = loop.wait(iface.iface_num() | io.POLL_WRITE)
4848
self._channels: dict[int, Channel] = {}
4949

50-
async def get_next_message(self, timeout_ms: int | None = None) -> Channel:
50+
async def get_next_message(self, timeout_ms: int | None = None) -> Channel | None:
5151
"""
5252
Reassemble a valid THP payload and return its channel.
5353
5454
Also handle THP channel allocation.
55+
If buffer allocation fails, return `None` to allow stale channel interruption.
5556
"""
5657
from .. import THP_BUFFERS_PROVIDER
5758

@@ -80,9 +81,10 @@ async def get_next_message(self, timeout_ms: int | None = None) -> Channel:
8081

8182
if (channel := self._channels.get(cid)) is None:
8283
if (buffers := THP_BUFFERS_PROVIDER.take()) is None:
83-
# concurrent payload reassembly is not supported
84+
# send TRANSPORT_BUSY to the new channel, which should retry later
8485
await self.write_error(cid, ThpErrorType.TRANSPORT_BUSY)
85-
continue
86+
return None
87+
8688
channel = self._channels[cid] = Channel(cache, self, buffers)
8789

8890
if channel.reassemble(packet):

core/src/trezor/wire/thp/received_message_handler.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from trezor.messages import ThpHandshakeCompletionReqNoisePayload
3636

3737
from .channel import Channel
38+
from .interface_context import ThpContext
3839

3940
if __debug__:
4041
from trezor import log
@@ -45,12 +46,18 @@
4546
_TREZOR_STATE_PAIRED_AUTOCONNECT = b"\x02"
4647

4748

48-
async def handle_received_message(channel: Channel) -> bool:
49+
async def handle_received_message(ctx: ThpContext) -> bool:
4950
"""
5051
Handle a message received from the channel.
5152
5253
Returns False if we can restart the event loop.
5354
"""
55+
channel = await ctx.get_next_message()
56+
if channel is None:
57+
# Channel buffers' allocation failed (due to another active THP interface).
58+
# Continue handling THP messages until the event loop is restarted.
59+
return True
60+
5461
try:
5562
state = channel.get_channel_state()
5663
if state is ChannelState.ENCRYPTED_TRANSPORT:

python/src/trezorlib/transport/thp/protocol_v2.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,8 @@ def _read_handshake_completion_response(self) -> int:
278278
self._send_ack_bit(bit=1)
279279
return int.from_bytes(trezor_state, "big")
280280

281-
def _read_ack(self):
282-
header, payload = self._read_until_valid_crc_check()
281+
def _read_ack(self, timeout: float | None = None):
282+
header, payload = self._read_until_valid_crc_check(timeout=timeout)
283283
if not header.is_ack() or len(payload) > 0:
284284
LOG.error("Received message is not a valid ACK")
285285

tests/device_tests/thp/test_multiple_hosts.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import time
2+
13
import pytest
24

3-
from trezorlib import exceptions
5+
from trezorlib import exceptions, transport
46
from trezorlib.client import ProtocolV2Channel
57
from trezorlib.debuglink import TrezorClientDebugLink as Client
68

@@ -42,3 +44,26 @@ def test_concurrent_handshakes(client: Client) -> None:
4244

4345
# Now the second handshake can be done
4446
protocol_2._do_handshake()
47+
48+
49+
def test_channel_interrupt(client: Client) -> None:
50+
protocol_1 = _new_channel(client)
51+
protocol_2 = _new_channel(client)
52+
53+
# The first host starts handshake
54+
protocol_1._send_handshake_init_request()
55+
protocol_1._read_ack()
56+
protocol_1._read_handshake_init_response()
57+
58+
# The second host can interrupt the first host's handshake after a while
59+
time.sleep(1)
60+
with pytest.raises(exceptions.TransportBusy):
61+
protocol_2._do_handshake()
62+
63+
# The first host has been interrupted, retrying handshake should succeed now
64+
protocol_2._do_handshake()
65+
66+
# The first handshake was interrupted, so no response will be sent
67+
protocol_1._send_handshake_completion_request()
68+
with pytest.raises(transport.Timeout):
69+
protocol_1._read_ack(timeout=1)

0 commit comments

Comments
 (0)