summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-10-07 14:31:09 +0100
committerKarl Nilsson <kjnilsson@gmail.com>2021-10-07 14:31:09 +0100
commitdb3944cfc4eb80f0ee261f41b4fdaf331399f186 (patch)
treef6489a9fcd83b75f7bc92c7ed075e226932d52b3
parent49a47586b075c19e5af521a3f1d333ed8fab4654 (diff)
downloadrabbitmq-server-git-stream-queue-consumer-fix.tar.gz
Stream queue: use local pid for offset listenersstream-queue-consumer-fix
When a consumer reaches the end of a stream it need to register an offset listener with the local stream member so that it can be notified when new stream messages are committed. The stream queue implementation for some reason registered offset listeners with the leader, not the local member.
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl37
1 files changed, 17 insertions, 20 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 20166bfa9c..14e83d85d3 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -274,7 +274,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
Actions = [],
%% TODO: we need to monitor the local pid in case the stream is
%% restarted
- {ok, State#stream_client{readers = Readers0#{Tag => Str0}}, Actions}
+ {ok, State#stream_client{local_pid = LocalPid,
+ readers = Readers0#{Tag => Str0}}, Actions}
end.
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
@@ -290,11 +291,11 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
credit(CTag, Credit, Drain, #stream_client{readers = Readers0,
name = Name,
- leader = Leader} = State) ->
+ local_pid = LocalPid} = State) ->
{Readers1, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
- {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
{Readers0#{CTag => Str}, Msgs0};
_ ->
{Readers0, []}
@@ -379,19 +380,15 @@ handle_event({osiris_written, From, _WriterId, Corrs},
{ok, State#stream_client{correlation = Correlation,
slow = Slow}, [{settled, From, MsgIds}]};
handle_event({osiris_offset, _From, _Offs},
- State = #stream_client{leader = Leader,
+ State = #stream_client{local_pid = LocalPid,
readers = Readers0,
name = Name}) ->
%% offset isn't actually needed as we use the atomic to read the
%% current committed
{Readers, TagMsgs} = maps:fold(
fun (Tag, Str0, {Acc, TM}) ->
- {Str, Msgs} = stream_entries(Name, Leader, Str0),
- %% HACK for now, better to just return but
- %% tricky with acks credits
- %% that also evaluate the stream
- % gen_server:cast(self(), {stream_delivery, Tag, Msgs}),
- {Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]}
+ {Str, Msgs} = stream_entries(Name, LocalPid, Str0),
+ {Acc#{Tag => Str}, [{Tag, LocalPid, Msgs} | TM]}
end, {#{}, []}, Readers0),
Ack = true,
Deliveries = [{deliver, Tag, Ack, OffsetMsg}
@@ -414,13 +411,13 @@ recover(_VHost, Queues) ->
end, {[], []}, Queues).
settle(complete, CTag, MsgIds, #stream_client{readers = Readers0,
- name = Name,
- leader = Leader} = State) ->
+ local_pid = LocalPid,
+ name = Name} = State) ->
Credit = length(MsgIds),
{Readers, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
- {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
{Readers0#{CTag => Str}, Msgs0};
_ ->
{Readers0, []}
@@ -848,10 +845,10 @@ check_queue_exists_in_local_node(Q) ->
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
-stream_entries(Name, Id, Str) ->
- stream_entries(Name, Id, Str, []).
+stream_entries(Name, LocalPid, Str) ->
+ stream_entries(Name, LocalPid, Str, []).
-stream_entries(Name, LeaderPid,
+stream_entries(Name, LocalPid,
#stream{name = QName,
credit = Credit,
start_offset = StartOffs,
@@ -863,7 +860,7 @@ stream_entries(Name, LeaderPid,
NextOffset = osiris_log:next_offset(Seg),
case NextOffset > LOffs of
true ->
- osiris:register_offset_listener(LeaderPid, NextOffset),
+ osiris:register_offset_listener(LocalPid, NextOffset),
{Str0#stream{log = Seg,
listening_offset = NextOffset}, MsgIn};
false ->
@@ -877,7 +874,7 @@ stream_entries(Name, LeaderPid,
Msg0 = binary_to_msg(QName, B),
Msg = rabbit_basic:add_header(<<"x-stream-offset">>,
long, O, Msg0),
- {Name, LeaderPid, O, false, Msg}
+ {Name, LocalPid, O, false, Msg}
end || {O, B} <- Records,
O >= StartOffs],
@@ -892,10 +889,10 @@ stream_entries(Name, LeaderPid,
false ->
%% if there are fewer Msgs than Entries0 it means there were non-events
%% in the log and we should recurse and try again
- stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs)
+ stream_entries(Name, LocalPid, Str, MsgIn ++ Msgs)
end
end;
-stream_entries(_Name, _Id, Str, Msgs) ->
+stream_entries(_Name, _LocalPid, Str, Msgs) ->
{Str, Msgs}.
binary_to_msg(#resource{virtual_host = VHost,