diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-10-07 14:31:09 +0100 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-10-07 14:31:09 +0100 |
commit | db3944cfc4eb80f0ee261f41b4fdaf331399f186 (patch) | |
tree | f6489a9fcd83b75f7bc92c7ed075e226932d52b3 | |
parent | 49a47586b075c19e5af521a3f1d333ed8fab4654 (diff) | |
download | rabbitmq-server-git-stream-queue-consumer-fix.tar.gz |
Stream queue: use local pid for offset listenersstream-queue-consumer-fix
When a consumer reaches the end of a stream it need to register an
offset listener with the local stream member so that it can be notified
when new stream messages are committed. The stream queue implementation
for some reason registered offset listeners with the leader, not the local
member.
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 37 |
1 files changed, 17 insertions, 20 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 20166bfa9c..14e83d85d3 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -274,7 +274,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0, Actions = [], %% TODO: we need to monitor the local pid in case the stream is %% restarted - {ok, State#stream_client{readers = Readers0#{Tag => Str0}}, Actions} + {ok, State#stream_client{local_pid = LocalPid, + readers = Readers0#{Tag => Str0}}, Actions} end. cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, @@ -290,11 +291,11 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, credit(CTag, Credit, Drain, #stream_client{readers = Readers0, name = Name, - leader = Leader} = State) -> + local_pid = LocalPid} = State) -> {Readers1, Msgs} = case Readers0 of #{CTag := #stream{credit = Credit0} = Str0} -> Str1 = Str0#stream{credit = Credit0 + Credit}, - {Str, Msgs0} = stream_entries(Name, Leader, Str1), + {Str, Msgs0} = stream_entries(Name, LocalPid, Str1), {Readers0#{CTag => Str}, Msgs0}; _ -> {Readers0, []} @@ -379,19 +380,15 @@ handle_event({osiris_written, From, _WriterId, Corrs}, {ok, State#stream_client{correlation = Correlation, slow = Slow}, [{settled, From, MsgIds}]}; handle_event({osiris_offset, _From, _Offs}, - State = #stream_client{leader = Leader, + State = #stream_client{local_pid = LocalPid, readers = Readers0, name = Name}) -> %% offset isn't actually needed as we use the atomic to read the %% current committed {Readers, TagMsgs} = maps:fold( fun (Tag, Str0, {Acc, TM}) -> - {Str, Msgs} = stream_entries(Name, Leader, Str0), - %% HACK for now, better to just return but - %% tricky with acks credits - %% that also evaluate the stream - % gen_server:cast(self(), {stream_delivery, Tag, Msgs}), - {Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]} + {Str, Msgs} = stream_entries(Name, LocalPid, Str0), + {Acc#{Tag => Str}, [{Tag, LocalPid, Msgs} | TM]} end, {#{}, []}, Readers0), Ack = true, Deliveries = [{deliver, Tag, Ack, OffsetMsg} @@ -414,13 +411,13 @@ recover(_VHost, Queues) -> end, {[], []}, Queues). settle(complete, CTag, MsgIds, #stream_client{readers = Readers0, - name = Name, - leader = Leader} = State) -> + local_pid = LocalPid, + name = Name} = State) -> Credit = length(MsgIds), {Readers, Msgs} = case Readers0 of #{CTag := #stream{credit = Credit0} = Str0} -> Str1 = Str0#stream{credit = Credit0 + Credit}, - {Str, Msgs0} = stream_entries(Name, Leader, Str1), + {Str, Msgs0} = stream_entries(Name, LocalPid, Str1), {Readers0#{CTag => Str}, Msgs0}; _ -> {Readers0, []} @@ -848,10 +845,10 @@ check_queue_exists_in_local_node(Q) -> maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). -stream_entries(Name, Id, Str) -> - stream_entries(Name, Id, Str, []). +stream_entries(Name, LocalPid, Str) -> + stream_entries(Name, LocalPid, Str, []). -stream_entries(Name, LeaderPid, +stream_entries(Name, LocalPid, #stream{name = QName, credit = Credit, start_offset = StartOffs, @@ -863,7 +860,7 @@ stream_entries(Name, LeaderPid, NextOffset = osiris_log:next_offset(Seg), case NextOffset > LOffs of true -> - osiris:register_offset_listener(LeaderPid, NextOffset), + osiris:register_offset_listener(LocalPid, NextOffset), {Str0#stream{log = Seg, listening_offset = NextOffset}, MsgIn}; false -> @@ -877,7 +874,7 @@ stream_entries(Name, LeaderPid, Msg0 = binary_to_msg(QName, B), Msg = rabbit_basic:add_header(<<"x-stream-offset">>, long, O, Msg0), - {Name, LeaderPid, O, false, Msg} + {Name, LocalPid, O, false, Msg} end || {O, B} <- Records, O >= StartOffs], @@ -892,10 +889,10 @@ stream_entries(Name, LeaderPid, false -> %% if there are fewer Msgs than Entries0 it means there were non-events %% in the log and we should recurse and try again - stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs) + stream_entries(Name, LocalPid, Str, MsgIn ++ Msgs) end end; -stream_entries(_Name, _Id, Str, Msgs) -> +stream_entries(_Name, _LocalPid, Str, Msgs) -> {Str, Msgs}. binary_to_msg(#resource{virtual_host = VHost, |