summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2022-01-19 13:13:58 +0000
committerKarl Nilsson <kjnilsson@gmail.com>2022-01-19 13:13:58 +0000
commitde7a3d32ce98e510b6f359907add841b5f323e70 (patch)
treeed39a84a8863a2787be8b9a480b8870b16ce34ca
parent4e40ed6a862a22a04d3f3cb39961e6e386920bc8 (diff)
downloadrabbitmq-server-git-qq-v2-no-inmem.tar.gz
fix rabbit_fifo property testsqq-v2-no-inmem
-rw-r--r--deps/rabbit/src/rabbit_fifo_dlx.erl83
-rw-r--r--deps/rabbit/test/rabbit_fifo_prop_SUITE.erl34
2 files changed, 61 insertions, 56 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_dlx.erl b/deps/rabbit/src/rabbit_fifo_dlx.erl
index 81ab629401..c376142bbf 100644
--- a/deps/rabbit/src/rabbit_fifo_dlx.erl
+++ b/deps/rabbit/src/rabbit_fifo_dlx.erl
@@ -87,21 +87,17 @@ stat(#?MODULE{consumer = Con,
apply(_, {dlx, #settle{msg_ids = MsgIds}}, at_least_once,
#?MODULE{consumer = #dlx_consumer{checked_out = Checked0}} = State0) ->
Acked = maps:with(MsgIds, Checked0),
- State = maps:fold(fun(MsgId, {_Rsn, Msg},
- #?MODULE{consumer = #dlx_consumer{checked_out = Checked} = C,
- msg_bytes_checkout = BytesCheckout,
- ra_indexes = Indexes0} = S) ->
- Indexes = case Msg of
- ?INDEX_MSG(I, _)
- when is_integer(I) ->
- rabbit_fifo_index:delete(I, Indexes0);
- _ ->
- Indexes0
- end,
- S#?MODULE{consumer = C#dlx_consumer{checked_out = maps:remove(MsgId, Checked)},
- msg_bytes_checkout = BytesCheckout - size_in_bytes(Msg),
- ra_indexes = Indexes}
- end, State0, Acked),
+ State = maps:fold(
+ fun(MsgId, {_Rsn, ?INDEX_MSG(Idx, ?DISK_MSG(_)) = Msg},
+ #?MODULE{consumer = #dlx_consumer{checked_out = Checked} = C,
+ msg_bytes_checkout = BytesCheckout,
+ ra_indexes = Indexes0} = S) ->
+ Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
+ S#?MODULE{consumer = C#dlx_consumer{checked_out =
+ maps:remove(MsgId, Checked)},
+ msg_bytes_checkout = BytesCheckout - size_in_bytes(Msg),
+ ra_indexes = Indexes}
+ end, State0, Acked),
{State, []};
apply(_, {dlx, #checkout{consumer = Pid,
prefetch = Prefetch}},
@@ -138,7 +134,8 @@ apply(_, Cmd, DLH, State) ->
rabbit_log:debug("Ignoring command ~p for dead_letter_handler ~p", Cmd, DLH),
{State, []}.
--spec discard([msg()], rabbit_dead_letter:reason(), dead_letter_handler(), state()) ->
+-spec discard([indexed_msg()], rabbit_dead_letter:reason(),
+ dead_letter_handler(), state()) ->
{state(), ra_machine:effects()}.
discard(_, _, undefined, State) ->
{State, []};
@@ -167,18 +164,13 @@ discard(Msgs, Reason, {at_most_once, {Mod, Fun, Args}}, State) ->
discard(Msgs, Reason, at_least_once, State0)
when Reason =/= maxlen ->
%%TODO delete delivery_count header to save space? It's not needed anymore.
- State = lists:foldl(fun (Msg, #?MODULE{discards = D0,
- msg_bytes = B0,
- ra_indexes = I0} = S0) ->
+ State = lists:foldl(fun (?INDEX_MSG(Idx, _) = Msg,
+ #?MODULE{discards = D0,
+ msg_bytes = B0,
+ ra_indexes = I0} = S0) ->
D = lqueue:in({Reason, Msg}, D0),
B = B0 + size_in_bytes(Msg),
- I = case Msg of
- ?INDEX_MSG(Idx, _)
- when is_integer(Idx) ->
- rabbit_fifo_index:append(Idx, I0);
- _ ->
- I0
- end,
+ I = rabbit_fifo_index:append(Idx, I0),
S0#?MODULE{discards = D,
msg_bytes = B,
ra_indexes = I}
@@ -248,8 +240,7 @@ add_bytes_checkout(Size, #?MODULE{msg_bytes = Bytes,
State#?MODULE{msg_bytes = Bytes - Size,
msg_bytes_checkout = BytesCheckout + Size}.
-size_in_bytes(Msg) ->
- Header = rabbit_fifo:get_msg_header(Msg),
+size_in_bytes(?INDEX_MSG(_Idx, ?DISK_MSG(Header))) ->
rabbit_fifo:get_header(size, Header).
%% returns at most one delivery effect because there is only one consumer
@@ -402,26 +393,26 @@ purge(#?MODULE{consumer = Consumer0} = State) ->
-spec dehydrate(state()) ->
state().
-dehydrate(#?MODULE{discards = Discards,
- consumer = Con} = State) ->
- State#?MODULE{discards = dehydrate_messages(Discards),
- consumer = dehydrate_consumer(Con),
+dehydrate(#?MODULE{discards = _Discards,
+ consumer = _Con} = State) ->
+ State#?MODULE{%%discards = dehydrate_messages(Discards),
+ %%consumer = dehydrate_consumer(Con),
ra_indexes = rabbit_fifo_index:empty()}.
-dehydrate_messages(Discards) ->
- L0 = lqueue:to_list(Discards),
- L1 = lists:map(fun({_Reason, Msg}) ->
- {?NIL, rabbit_fifo:dehydrate_message(Msg)}
- end, L0),
- lqueue:from_list(L1).
-
-dehydrate_consumer(#dlx_consumer{checked_out = Checked0} = Con) ->
- Checked = maps:map(fun (_, {_, Msg}) ->
- {?NIL, rabbit_fifo:dehydrate_message(Msg)}
- end, Checked0),
- Con#dlx_consumer{checked_out = Checked};
-dehydrate_consumer(undefined) ->
- undefined.
+% dehydrate_messages(Discards) ->
+% L0 = lqueue:to_list(Discards),
+% L1 = lists:map(fun({_Reason, Msg}) ->
+% {?NIL, rabbit_fifo:dehydrate_message(Msg)}
+% end, L0),
+% lqueue:from_list(L1).
+
+% dehydrate_consumer(#dlx_consumer{checked_out = Checked0} = Con) ->
+% Checked = maps:map(fun (_, {_, Msg}) ->
+% {?NIL, rabbit_fifo:dehydrate_message(Msg)}
+% end, Checked0),
+% Con#dlx_consumer{checked_out = Checked};
+% dehydrate_consumer(undefined) ->
+% undefined.
-spec normalize(state()) ->
state().
diff --git a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl
index 3b3c6284f6..5f3a9bca26 100644
--- a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl
@@ -1696,7 +1696,8 @@ checkout_gen(Pid) ->
enq_body_fun = {0, fun ra_lib:id/1},
config :: map(),
log = [] :: list(),
- down = #{} :: #{pid() => noproc | noconnection}
+ down = #{} :: #{pid() => noproc | noconnection},
+ enq_cmds = #{} :: #{ra:index() => rabbit_fifo:enqueue()}
}).
expand(Ops, Config) ->
@@ -1843,12 +1844,20 @@ handle_op({checkout_dlx, Prefetch}, #t{config = #{dead_letter_handler := at_leas
do_apply(Cmd, #t{effects = Effs,
index = Index, state = S0,
down = Down,
+ enq_cmds = EnqCmds0,
log = Log} = T) ->
case Cmd of
- {enqueue, Pid, _, _} when is_map_key(Pid, Down) ->
+ {enqueue, Pid, _, _Msg} when is_map_key(Pid, Down) ->
%% down
T;
_ ->
+ EnqCmds = case Cmd of
+ {enqueue, _Pid, _, _Msg} ->
+ EnqCmds0#{Index => Cmd};
+ _ ->
+ EnqCmds0
+ end,
+
{St, Effects} = case rabbit_fifo:apply(meta(Index), Cmd, S0) of
{S, _, E} when is_list(E) ->
{S, E};
@@ -1860,23 +1869,28 @@ do_apply(Cmd, #t{effects = Effs,
T#t{state = St,
index = Index + 1,
- effects = enq_effs(Effects, Effs),
+ enq_cmds = EnqCmds,
+ effects = enq_effs(Effects, Effs, EnqCmds),
log = [Cmd | Log]}
end.
-enq_effs([], Q) -> Q;
-enq_effs([{send_msg, P, {delivery, CTag, Msgs}, _Opts} | Rem], Q) ->
+enq_effs([], Q, _) -> Q;
+enq_effs([{send_msg, P, {delivery, CTag, Msgs}, _Opts} | Rem], Q, Cmds) ->
MsgIds = [I || {I, _} <- Msgs],
%% always make settle commands by default
%% they can be changed depending on the input event later
Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds),
- enq_effs(Rem, queue:in(Cmd, Q));
-enq_effs([{send_msg, _, {dlx_delivery, Msgs}, _Opts} | Rem], Q) ->
+ enq_effs(Rem, queue:in(Cmd, Q), Cmds);
+enq_effs([{log, RaIdxs, Fun, _} | Rem], Q, Cmds) ->
+ M = [maps:get(I, Cmds) || I <- RaIdxs],
+ Effs = Fun(M),
+ enq_effs(Effs ++ Rem, Q, Cmds);
+enq_effs([{send_msg, _, {dlx_delivery, Msgs}, _Opts} | Rem], Q, Cmds) ->
MsgIds = [I || {I, _} <- Msgs],
Cmd = rabbit_fifo_dlx:make_settle(MsgIds),
- enq_effs(Rem, queue:in(Cmd, Q));
-enq_effs([_ | Rem], Q) ->
- enq_effs(Rem, Q).
+ enq_effs(Rem, queue:in(Cmd, Q), Cmds);
+enq_effs([_ | Rem], Q, Cmds) ->
+ enq_effs(Rem, Q, Cmds).
%% Utility