summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_backing_queue_qc.erl4
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/rabbit_variable_queue.erl8
4 files changed, 11 insertions, 9 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9e99ca5e8c..c65561f545 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -157,8 +157,9 @@
%% Fold over all the messages in a queue and return the accumulated
%% results, leaving the queue undisturbed.
--callback fold(fun((rabbit_types:basic_message(), A) -> A), A, state())
- -> {A, state()}.
+-callback fold(fun((rabbit_types:basic_message(),
+ rabbit_types:message_properties(), A) -> A),
+ A, state()) -> {A, state()}.
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 0380885941..f258e15e0a 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -331,8 +331,8 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) ->
#state{messages = Messages} = S,
- lists:foldl(fun ({_SeqId, {_MsgProps, Msg}}, Acc) ->
- foldfun(Msg, Acc)
+ lists:foldl(fun ({_SeqId, {MsgProps, Msg}}, Acc) ->
+ foldfun(Msg, MsgProps, Acc)
end, foldacc(), gb_trees:to_list(Messages)) =:= Res;
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 81180ebe15..cbc876871f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2319,7 +2319,8 @@ test_variable_queue_fold(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(
true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
- {Acc, VQ3} = rabbit_variable_queue:fold(fun (M, A) -> [M | A] end, [], VQ2),
+ {Acc, VQ3} = rabbit_variable_queue:fold(
+ fun (M, _, A) -> [M | A] end, [], VQ2),
true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] ==
[list_to_binary(lists:reverse(P)) ||
#basic_message{ content = #content{ payload_fragments_rev = P}} <-
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4f2668d9b4..8ad4bf9d7b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -685,9 +685,9 @@ fold(Fun, Acc, #vqstate { q1 = Q1,
q3 = Q3,
q4 = Q4 } = State) ->
QFun = fun(MsgStatus, {Acc0, State0}) ->
- {#msg_status { msg = Msg }, State1 } =
+ {#msg_status { msg = Msg, msg_props = MsgProps }, State1 } =
read_msg(MsgStatus, false, State0),
- {Fun(Msg, Acc0), State1}
+ {Fun(Msg, MsgProps, Acc0), State1}
end,
{Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
{Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
@@ -1449,11 +1449,11 @@ delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
{Acc1, MSCState1} =
- lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent,
+ lists:foldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent,
_IsDelivered}, {Acc0, MSCState0}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState0, IsPersistent, MsgId),
- {Fun(Msg, Acc0), MSCState1}
+ {Fun(Msg, MsgProps, Acc0), MSCState1}
end, {Acc, MSCState}, List),
delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd,
State #vqstate { index_state = IndexState1,