diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2022-01-19 13:13:58 +0000 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2022-01-19 13:13:58 +0000 |
commit | de7a3d32ce98e510b6f359907add841b5f323e70 (patch) | |
tree | ed39a84a8863a2787be8b9a480b8870b16ce34ca | |
parent | 4e40ed6a862a22a04d3f3cb39961e6e386920bc8 (diff) | |
download | rabbitmq-server-git-qq-v2-no-inmem.tar.gz |
fix rabbit_fifo property testsqq-v2-no-inmem
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_dlx.erl | 83 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_prop_SUITE.erl | 34 |
2 files changed, 61 insertions, 56 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_dlx.erl b/deps/rabbit/src/rabbit_fifo_dlx.erl index 81ab629401..c376142bbf 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx.erl @@ -87,21 +87,17 @@ stat(#?MODULE{consumer = Con, apply(_, {dlx, #settle{msg_ids = MsgIds}}, at_least_once, #?MODULE{consumer = #dlx_consumer{checked_out = Checked0}} = State0) -> Acked = maps:with(MsgIds, Checked0), - State = maps:fold(fun(MsgId, {_Rsn, Msg}, - #?MODULE{consumer = #dlx_consumer{checked_out = Checked} = C, - msg_bytes_checkout = BytesCheckout, - ra_indexes = Indexes0} = S) -> - Indexes = case Msg of - ?INDEX_MSG(I, _) - when is_integer(I) -> - rabbit_fifo_index:delete(I, Indexes0); - _ -> - Indexes0 - end, - S#?MODULE{consumer = C#dlx_consumer{checked_out = maps:remove(MsgId, Checked)}, - msg_bytes_checkout = BytesCheckout - size_in_bytes(Msg), - ra_indexes = Indexes} - end, State0, Acked), + State = maps:fold( + fun(MsgId, {_Rsn, ?INDEX_MSG(Idx, ?DISK_MSG(_)) = Msg}, + #?MODULE{consumer = #dlx_consumer{checked_out = Checked} = C, + msg_bytes_checkout = BytesCheckout, + ra_indexes = Indexes0} = S) -> + Indexes = rabbit_fifo_index:delete(Idx, Indexes0), + S#?MODULE{consumer = C#dlx_consumer{checked_out = + maps:remove(MsgId, Checked)}, + msg_bytes_checkout = BytesCheckout - size_in_bytes(Msg), + ra_indexes = Indexes} + end, State0, Acked), {State, []}; apply(_, {dlx, #checkout{consumer = Pid, prefetch = Prefetch}}, @@ -138,7 +134,8 @@ apply(_, Cmd, DLH, State) -> rabbit_log:debug("Ignoring command ~p for dead_letter_handler ~p", Cmd, DLH), {State, []}. --spec discard([msg()], rabbit_dead_letter:reason(), dead_letter_handler(), state()) -> +-spec discard([indexed_msg()], rabbit_dead_letter:reason(), + dead_letter_handler(), state()) -> {state(), ra_machine:effects()}. discard(_, _, undefined, State) -> {State, []}; @@ -167,18 +164,13 @@ discard(Msgs, Reason, {at_most_once, {Mod, Fun, Args}}, State) -> discard(Msgs, Reason, at_least_once, State0) when Reason =/= maxlen -> %%TODO delete delivery_count header to save space? It's not needed anymore. - State = lists:foldl(fun (Msg, #?MODULE{discards = D0, - msg_bytes = B0, - ra_indexes = I0} = S0) -> + State = lists:foldl(fun (?INDEX_MSG(Idx, _) = Msg, + #?MODULE{discards = D0, + msg_bytes = B0, + ra_indexes = I0} = S0) -> D = lqueue:in({Reason, Msg}, D0), B = B0 + size_in_bytes(Msg), - I = case Msg of - ?INDEX_MSG(Idx, _) - when is_integer(Idx) -> - rabbit_fifo_index:append(Idx, I0); - _ -> - I0 - end, + I = rabbit_fifo_index:append(Idx, I0), S0#?MODULE{discards = D, msg_bytes = B, ra_indexes = I} @@ -248,8 +240,7 @@ add_bytes_checkout(Size, #?MODULE{msg_bytes = Bytes, State#?MODULE{msg_bytes = Bytes - Size, msg_bytes_checkout = BytesCheckout + Size}. -size_in_bytes(Msg) -> - Header = rabbit_fifo:get_msg_header(Msg), +size_in_bytes(?INDEX_MSG(_Idx, ?DISK_MSG(Header))) -> rabbit_fifo:get_header(size, Header). %% returns at most one delivery effect because there is only one consumer @@ -402,26 +393,26 @@ purge(#?MODULE{consumer = Consumer0} = State) -> -spec dehydrate(state()) -> state(). -dehydrate(#?MODULE{discards = Discards, - consumer = Con} = State) -> - State#?MODULE{discards = dehydrate_messages(Discards), - consumer = dehydrate_consumer(Con), +dehydrate(#?MODULE{discards = _Discards, + consumer = _Con} = State) -> + State#?MODULE{%%discards = dehydrate_messages(Discards), + %%consumer = dehydrate_consumer(Con), ra_indexes = rabbit_fifo_index:empty()}. -dehydrate_messages(Discards) -> - L0 = lqueue:to_list(Discards), - L1 = lists:map(fun({_Reason, Msg}) -> - {?NIL, rabbit_fifo:dehydrate_message(Msg)} - end, L0), - lqueue:from_list(L1). - -dehydrate_consumer(#dlx_consumer{checked_out = Checked0} = Con) -> - Checked = maps:map(fun (_, {_, Msg}) -> - {?NIL, rabbit_fifo:dehydrate_message(Msg)} - end, Checked0), - Con#dlx_consumer{checked_out = Checked}; -dehydrate_consumer(undefined) -> - undefined. +% dehydrate_messages(Discards) -> +% L0 = lqueue:to_list(Discards), +% L1 = lists:map(fun({_Reason, Msg}) -> +% {?NIL, rabbit_fifo:dehydrate_message(Msg)} +% end, L0), +% lqueue:from_list(L1). + +% dehydrate_consumer(#dlx_consumer{checked_out = Checked0} = Con) -> +% Checked = maps:map(fun (_, {_, Msg}) -> +% {?NIL, rabbit_fifo:dehydrate_message(Msg)} +% end, Checked0), +% Con#dlx_consumer{checked_out = Checked}; +% dehydrate_consumer(undefined) -> +% undefined. -spec normalize(state()) -> state(). diff --git a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl index 3b3c6284f6..5f3a9bca26 100644 --- a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl @@ -1696,7 +1696,8 @@ checkout_gen(Pid) -> enq_body_fun = {0, fun ra_lib:id/1}, config :: map(), log = [] :: list(), - down = #{} :: #{pid() => noproc | noconnection} + down = #{} :: #{pid() => noproc | noconnection}, + enq_cmds = #{} :: #{ra:index() => rabbit_fifo:enqueue()} }). expand(Ops, Config) -> @@ -1843,12 +1844,20 @@ handle_op({checkout_dlx, Prefetch}, #t{config = #{dead_letter_handler := at_leas do_apply(Cmd, #t{effects = Effs, index = Index, state = S0, down = Down, + enq_cmds = EnqCmds0, log = Log} = T) -> case Cmd of - {enqueue, Pid, _, _} when is_map_key(Pid, Down) -> + {enqueue, Pid, _, _Msg} when is_map_key(Pid, Down) -> %% down T; _ -> + EnqCmds = case Cmd of + {enqueue, _Pid, _, _Msg} -> + EnqCmds0#{Index => Cmd}; + _ -> + EnqCmds0 + end, + {St, Effects} = case rabbit_fifo:apply(meta(Index), Cmd, S0) of {S, _, E} when is_list(E) -> {S, E}; @@ -1860,23 +1869,28 @@ do_apply(Cmd, #t{effects = Effs, T#t{state = St, index = Index + 1, - effects = enq_effs(Effects, Effs), + enq_cmds = EnqCmds, + effects = enq_effs(Effects, Effs, EnqCmds), log = [Cmd | Log]} end. -enq_effs([], Q) -> Q; -enq_effs([{send_msg, P, {delivery, CTag, Msgs}, _Opts} | Rem], Q) -> +enq_effs([], Q, _) -> Q; +enq_effs([{send_msg, P, {delivery, CTag, Msgs}, _Opts} | Rem], Q, Cmds) -> MsgIds = [I || {I, _} <- Msgs], %% always make settle commands by default %% they can be changed depending on the input event later Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds), - enq_effs(Rem, queue:in(Cmd, Q)); -enq_effs([{send_msg, _, {dlx_delivery, Msgs}, _Opts} | Rem], Q) -> + enq_effs(Rem, queue:in(Cmd, Q), Cmds); +enq_effs([{log, RaIdxs, Fun, _} | Rem], Q, Cmds) -> + M = [maps:get(I, Cmds) || I <- RaIdxs], + Effs = Fun(M), + enq_effs(Effs ++ Rem, Q, Cmds); +enq_effs([{send_msg, _, {dlx_delivery, Msgs}, _Opts} | Rem], Q, Cmds) -> MsgIds = [I || {I, _} <- Msgs], Cmd = rabbit_fifo_dlx:make_settle(MsgIds), - enq_effs(Rem, queue:in(Cmd, Q)); -enq_effs([_ | Rem], Q) -> - enq_effs(Rem, Q). + enq_effs(Rem, queue:in(Cmd, Q), Cmds); +enq_effs([_ | Rem], Q, Cmds) -> + enq_effs(Rem, Q, Cmds). %% Utility |