diff options
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 37 |
1 files changed, 17 insertions, 20 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 20166bfa9c..14e83d85d3 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -274,7 +274,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0, Actions = [], %% TODO: we need to monitor the local pid in case the stream is %% restarted - {ok, State#stream_client{readers = Readers0#{Tag => Str0}}, Actions} + {ok, State#stream_client{local_pid = LocalPid, + readers = Readers0#{Tag => Str0}}, Actions} end. cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, @@ -290,11 +291,11 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, credit(CTag, Credit, Drain, #stream_client{readers = Readers0, name = Name, - leader = Leader} = State) -> + local_pid = LocalPid} = State) -> {Readers1, Msgs} = case Readers0 of #{CTag := #stream{credit = Credit0} = Str0} -> Str1 = Str0#stream{credit = Credit0 + Credit}, - {Str, Msgs0} = stream_entries(Name, Leader, Str1), + {Str, Msgs0} = stream_entries(Name, LocalPid, Str1), {Readers0#{CTag => Str}, Msgs0}; _ -> {Readers0, []} @@ -379,19 +380,15 @@ handle_event({osiris_written, From, _WriterId, Corrs}, {ok, State#stream_client{correlation = Correlation, slow = Slow}, [{settled, From, MsgIds}]}; handle_event({osiris_offset, _From, _Offs}, - State = #stream_client{leader = Leader, + State = #stream_client{local_pid = LocalPid, readers = Readers0, name = Name}) -> %% offset isn't actually needed as we use the atomic to read the %% current committed {Readers, TagMsgs} = maps:fold( fun (Tag, Str0, {Acc, TM}) -> - {Str, Msgs} = stream_entries(Name, Leader, Str0), - %% HACK for now, better to just return but - %% tricky with acks credits - %% that also evaluate the stream - % gen_server:cast(self(), {stream_delivery, Tag, Msgs}), - {Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]} + {Str, Msgs} = stream_entries(Name, LocalPid, Str0), + {Acc#{Tag => Str}, [{Tag, LocalPid, Msgs} | TM]} end, {#{}, []}, Readers0), Ack = true, Deliveries = [{deliver, Tag, Ack, OffsetMsg} @@ -414,13 +411,13 @@ recover(_VHost, Queues) -> end, {[], []}, Queues). settle(complete, CTag, MsgIds, #stream_client{readers = Readers0, - name = Name, - leader = Leader} = State) -> + local_pid = LocalPid, + name = Name} = State) -> Credit = length(MsgIds), {Readers, Msgs} = case Readers0 of #{CTag := #stream{credit = Credit0} = Str0} -> Str1 = Str0#stream{credit = Credit0 + Credit}, - {Str, Msgs0} = stream_entries(Name, Leader, Str1), + {Str, Msgs0} = stream_entries(Name, LocalPid, Str1), {Readers0#{CTag => Str}, Msgs0}; _ -> {Readers0, []} @@ -848,10 +845,10 @@ check_queue_exists_in_local_node(Q) -> maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). -stream_entries(Name, Id, Str) -> - stream_entries(Name, Id, Str, []). +stream_entries(Name, LocalPid, Str) -> + stream_entries(Name, LocalPid, Str, []). -stream_entries(Name, LeaderPid, +stream_entries(Name, LocalPid, #stream{name = QName, credit = Credit, start_offset = StartOffs, @@ -863,7 +860,7 @@ stream_entries(Name, LeaderPid, NextOffset = osiris_log:next_offset(Seg), case NextOffset > LOffs of true -> - osiris:register_offset_listener(LeaderPid, NextOffset), + osiris:register_offset_listener(LocalPid, NextOffset), {Str0#stream{log = Seg, listening_offset = NextOffset}, MsgIn}; false -> @@ -877,7 +874,7 @@ stream_entries(Name, LeaderPid, Msg0 = binary_to_msg(QName, B), Msg = rabbit_basic:add_header(<<"x-stream-offset">>, long, O, Msg0), - {Name, LeaderPid, O, false, Msg} + {Name, LocalPid, O, false, Msg} end || {O, B} <- Records, O >= StartOffs], @@ -892,10 +889,10 @@ stream_entries(Name, LeaderPid, false -> %% if there are fewer Msgs than Entries0 it means there were non-events %% in the log and we should recurse and try again - stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs) + stream_entries(Name, LocalPid, Str, MsgIn ++ Msgs) end end; -stream_entries(_Name, _Id, Str, Msgs) -> +stream_entries(_Name, _LocalPid, Str, Msgs) -> {Str, Msgs}. binary_to_msg(#resource{virtual_host = VHost, |