@@ -217,7 +217,8 @@ def is_channel_to_replace(self) -> bool:
217
217
218
218
async def recv_payload (
219
219
self ,
220
- expected_ctrl_byte : Callable [[int ], bool ] | None ,
220
+ expected_ctrl_byte : Callable [[int ], bool ],
221
+ return_ack : bool = False ,
221
222
timeout_ms : int | None = None ,
222
223
) -> memoryview :
223
224
"""
@@ -226,7 +227,7 @@ async def recv_payload(
226
227
227
228
Raise if the received control byte is an unexpected one.
228
229
229
- If `expected_ctrl_byte ` is `None`, returns after the first received ACK.
230
+ If `return_ack ` is set, return after receiving an expected ACK.
230
231
"""
231
232
232
233
while True :
@@ -236,17 +237,32 @@ async def recv_payload(
236
237
237
238
# Synchronization process
238
239
ctrl_byte = msg [0 ]
240
+ if not (control_byte .is_ack (ctrl_byte ) or control_byte .is_data (ctrl_byte )):
241
+ if __debug__ :
242
+ self ._log (
243
+ "Unrelated control byte - ignoring " ,
244
+ utils .hexlify_if_bytes (msg ),
245
+ logger = log .warning ,
246
+ )
247
+ continue
248
+
239
249
payload = msg [PacketHeader .INIT_LENGTH : - CHECKSUM_LENGTH ]
240
250
seq_bit = control_byte .get_seq_bit (ctrl_byte )
251
+ ack_bit = control_byte .get_ack_bit (ctrl_byte )
241
252
242
- # 1: Handle ACKs
243
- if control_byte .is_ack (ctrl_byte ):
244
- handle_ack (self , control_byte .get_ack_bit (ctrl_byte ))
245
- if expected_ctrl_byte is None :
246
- return payload
247
- continue
253
+ # 0: Handle ACKs
254
+ if received_expected_ack (self , ack_bit ):
255
+ if control_byte .is_ack (ctrl_byte ):
256
+ # ACK message is handled
257
+ self .reassembler .reset ()
248
258
249
- if expected_ctrl_byte is None or not expected_ctrl_byte (ctrl_byte ):
259
+ if return_ack :
260
+ # keep the reassembled payload (for future handling)
261
+ return payload [:0 ]
262
+
263
+ # 1: Check expected control byte
264
+ self .reassembler .reset ()
265
+ if not expected_ctrl_byte (ctrl_byte ) or return_ack :
250
266
if __debug__ :
251
267
self ._log (
252
268
"Unexpected control byte - ignoring " ,
@@ -400,8 +416,12 @@ async def write_encrypted_payload(self, ctrl_byte: int, payload: bytes) -> None:
400
416
# starting from 100ms till ~3.42s
401
417
timeout_ms = round (10200 - 1010000 / (100 + i ))
402
418
try :
403
- # wait and return after receiving an ACK, or raise in case of an unexpected message.
404
- await self .recv_payload (expected_ctrl_byte = None , timeout_ms = timeout_ms )
419
+ # wait and return after receiving an expected ACK.
420
+ await self .recv_payload (
421
+ expected_ctrl_byte = control_byte .is_encrypted_transport ,
422
+ return_ack = True ,
423
+ timeout_ms = timeout_ms ,
424
+ )
405
425
except Timeout :
406
426
if __debug__ :
407
427
log .warning (__name__ , "Retransmit after %d ms" , timeout_ms )
@@ -462,9 +482,9 @@ def send_ack(channel: Channel, ack_bit: int) -> Awaitable[None]:
462
482
return channel .ctx .write_payload (header , b"" )
463
483
464
484
465
- def handle_ack (ctx : Channel , ack_bit : int ) -> None :
485
+ def received_expected_ack (ctx : Channel , ack_bit : int ) -> bool :
466
486
if not ABP .is_ack_valid (ctx .channel_cache , ack_bit ):
467
- return
487
+ return False
468
488
# ACK is expected and it has correct sync bit
469
489
if __debug__ :
470
490
log .debug (
@@ -473,3 +493,4 @@ def handle_ack(ctx: Channel, ack_bit: int) -> None:
473
493
iface = ctx .iface ,
474
494
)
475
495
ABP .set_sending_allowed (ctx .channel_cache , True )
496
+ return True
0 commit comments