From e0696d324a7645f8280c1ece20556e8d352aa612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 4 Jul 2025 08:18:34 +0000 Subject: [PATCH] Return stream frame header binary in dispatch chunk callback This saves a system call by sending the frame header and the chunk header at the same time. References rabbitmq/osiris#192 --- .../src/rabbit_stream_reader.erl | 26 +++++++------------ rabbitmq-components.mk | 2 +- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 3217409b3bf..287246cb771 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -3550,12 +3550,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) -> lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds). send_file_callback(?VERSION_1, - Transport, _Log, #consumer{configuration = - #consumer_configuration{socket = S, - subscription_id = - SubscriptionId, + #consumer_configuration{subscription_id = SubId, counters = Counters}}, Counter) -> fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, @@ -3566,19 +3563,16 @@ send_file_callback(?VERSION_1, ?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_1:16, - SubscriptionId:8/unsigned>>, - Transport:send(S, FrameBeginning), + SubId:8/unsigned>>, atomics:add(Counter, 1, Size), increase_messages_consumed(Counters, NumEntries), - set_consumer_offset(Counters, FirstOffsetInChunk) + set_consumer_offset(Counters, FirstOffsetInChunk), + FrameBeginning end; send_file_callback(?VERSION_2, - Transport, Log, #consumer{configuration = - #consumer_configuration{socket = S, - subscription_id = - SubscriptionId, + #consumer_configuration{subscription_id = SubId, counters = Counters}}, Counter) -> fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, @@ -3590,12 +3584,12 @@ send_file_callback(?VERSION_2, ?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_2:16, - SubscriptionId:8/unsigned, + SubId:8/unsigned, CommittedChunkId:64>>, - Transport:send(S, FrameBeginning), atomics:add(Counter, 1, Size), increase_messages_consumed(Counters, NumEntries), - set_consumer_offset(Counters, FirstOffsetInChunk) + set_consumer_offset(Counters, FirstOffsetInChunk), + FrameBeginning end. send_chunks(DeliverVersion, @@ -3665,9 +3659,7 @@ send_chunks(DeliverVersion, Retry, Counter) -> case osiris_log:send_file(Socket, Log, - send_file_callback(DeliverVersion, - Transport, - Log, + send_file_callback(DeliverVersion, Log, Consumer, Counter)) of diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 0b2fa759729..626e68ff68d 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -49,7 +49,7 @@ dep_jose = hex 1.11.10 dep_khepri = hex 0.17.2 dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0 +dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements dep_prometheus = hex 5.1.1 dep_ra = hex 2.17.0 dep_ranch = hex 2.2.0