summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-11-23 16:28:26 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-11-23 16:28:26 +0000
commit18773747748160b411fd4cd0cbd8abcf274fc40d (patch)
tree43b6cfe78f99717ab3c0dbe3d65d264ed7c8f758 /src
parent583e60d984c8eabb9de3d9e5eda9d2f1841fcca2 (diff)
downloadrabbitmq-server-git-18773747748160b411fd4cd0cbd8abcf274fc40d.tar.gz
Add message properties to backing queue fold
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_backing_queue_qc.erl4
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_variable_queue.erl8
4 files changed, 12 insertions, 9 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2fc10bb2b3..24dd36e13d 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -161,8 +161,9 @@
%% Fold over all the messages in a queue and return the accumulated
%% results, leaving the queue undisturbed.
--callback fold(fun((rabbit_types:basic_message(), A) -> A), A, state())
- -> {A, state()}.
+-callback fold(fun(({rabbit_types:basic_message(),
+ rabbit_types:message_properties()}, A) -> A),
+ A, state()) -> {A, state()}.
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 00e17d02b2..a6d9b59a8b 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -331,8 +331,8 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) ->
#state{messages = Messages} = S,
- lists:foldl(fun ({_SeqId, {_MsgProps, Msg}}, Acc) ->
- foldfun(Msg, Acc)
+ lists:foldl(fun ({_SeqId, {MsgProps, Msg}}, Acc) ->
+ foldfun({Msg, MsgProps}, Acc)
end, foldacc(), gb_trees:to_list(Messages)) =:= Res;
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e9d923ac34..dffca79dc4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2319,7 +2319,9 @@ test_variable_queue_fold(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(
true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
- {Acc, VQ3} = rabbit_variable_queue:fold(fun (M, A) -> [M | A] end, [], VQ2),
+ {Acc, VQ3} = rabbit_variable_queue:fold(fun ({M, _}, A) ->
+ [M | A]
+ end, [], VQ2),
true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] ==
[list_to_binary(lists:reverse(P)) ||
#basic_message{ content = #content{ payload_fragments_rev = P}} <-
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6aab6bf03c..644ba182e8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -685,9 +685,9 @@ fold(Fun, Acc, #vqstate { q1 = Q1,
q3 = Q3,
q4 = Q4 } = State) ->
QFun = fun(MsgStatus, {Acc0, State0}) ->
- {#msg_status { msg = Msg }, State1 } =
+ {#msg_status { msg = Msg, msg_props = MsgProps }, State1 } =
read_msg(MsgStatus, false, State0),
- {Fun(Msg, Acc0), State1}
+ {Fun({Msg, MsgProps}, Acc0), State1}
end,
{Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
{Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
@@ -1453,11 +1453,11 @@ delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
{Acc1, MSCState1} =
- lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent,
+ lists:foldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent,
_IsDelivered}, {Acc0, MSCState0}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState0, IsPersistent, MsgId),
- {Fun(Msg, Acc0), MSCState1}
+ {Fun({Msg, MsgProps}, Acc0), MSCState1}
end, {Acc, MSCState}, List),
delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd,
State #vqstate { index_state = IndexState1,