Skip to content

Commit e0696d3

Browse files
committed
Return stream frame header binary in dispatch chunk callback
This saves a system call by sending the frame header and the chunk header at the same time. References rabbitmq/osiris#192
1 parent 268a16c commit e0696d3

File tree

2 files changed

+10
-18
lines changed

2 files changed

+10
-18
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3550,12 +3550,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
35503550
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
35513551

35523552
send_file_callback(?VERSION_1,
3553-
Transport,
35543553
_Log,
35553554
#consumer{configuration =
3556-
#consumer_configuration{socket = S,
3557-
subscription_id =
3558-
SubscriptionId,
3555+
#consumer_configuration{subscription_id = SubId,
35593556
counters = Counters}},
35603557
Counter) ->
35613558
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3566,19 +3563,16 @@ send_file_callback(?VERSION_1,
35663563
?REQUEST:1,
35673564
?COMMAND_DELIVER:15,
35683565
?VERSION_1:16,
3569-
SubscriptionId:8/unsigned>>,
3570-
Transport:send(S, FrameBeginning),
3566+
SubId:8/unsigned>>,
35713567
atomics:add(Counter, 1, Size),
35723568
increase_messages_consumed(Counters, NumEntries),
3573-
set_consumer_offset(Counters, FirstOffsetInChunk)
3569+
set_consumer_offset(Counters, FirstOffsetInChunk),
3570+
FrameBeginning
35743571
end;
35753572
send_file_callback(?VERSION_2,
3576-
Transport,
35773573
Log,
35783574
#consumer{configuration =
3579-
#consumer_configuration{socket = S,
3580-
subscription_id =
3581-
SubscriptionId,
3575+
#consumer_configuration{subscription_id = SubId,
35823576
counters = Counters}},
35833577
Counter) ->
35843578
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3590,12 +3584,12 @@ send_file_callback(?VERSION_2,
35903584
?REQUEST:1,
35913585
?COMMAND_DELIVER:15,
35923586
?VERSION_2:16,
3593-
SubscriptionId:8/unsigned,
3587+
SubId:8/unsigned,
35943588
CommittedChunkId:64>>,
3595-
Transport:send(S, FrameBeginning),
35963589
atomics:add(Counter, 1, Size),
35973590
increase_messages_consumed(Counters, NumEntries),
3598-
set_consumer_offset(Counters, FirstOffsetInChunk)
3591+
set_consumer_offset(Counters, FirstOffsetInChunk),
3592+
FrameBeginning
35993593
end.
36003594

36013595
send_chunks(DeliverVersion,
@@ -3665,9 +3659,7 @@ send_chunks(DeliverVersion,
36653659
Retry,
36663660
Counter) ->
36673661
case osiris_log:send_file(Socket, Log,
3668-
send_file_callback(DeliverVersion,
3669-
Transport,
3670-
Log,
3662+
send_file_callback(DeliverVersion, Log,
36713663
Consumer,
36723664
Counter))
36733665
of

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
4949
dep_khepri = hex 0.17.2
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
52-
dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0
52+
dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements
5353
dep_prometheus = hex 5.1.1
5454
dep_ra = hex 2.17.0
5555
dep_ranch = hex 2.2.0

0 commit comments

Comments
 (0)