summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-25 12:47:43 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-25 12:47:43 +0100
commit85586512a898bd0a2757a586d274e26137d185a8 (patch)
treeb8949a88a732efa53916ff3a258c9ae8c7740691 /src
parent1b5c900842fa6845d343fa7856197ae893ac714a (diff)
parentc3a6ed3d284c59b2f313e31b5f7d8a4d9c6dc1ef (diff)
downloadrabbitmq-server-git-85586512a898bd0a2757a586d274e26137d185a8.tar.gz
Merging bug21673 into bug22896
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl16
1 files changed, 9 insertions, 7 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 06a9f9e764..ef63ff8ee1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -510,7 +510,9 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
end.
ack(AckTags, State) ->
- a(ack(remove, fun (_AckEntry, State1) -> State1 end, AckTags, State)).
+ a(ack(fun rabbit_msg_store:remove/2,
+ fun (_AckEntry, State1) -> State1 end,
+ AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
State = #vqstate { durable = IsDurable,
@@ -562,7 +564,7 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
requeue(AckTags, State) ->
a(reduce_memory_use(
- ack(release,
+ ack(fun rabbit_msg_store:release/2,
fun (#msg_status { msg = Msg }, State1) ->
{_SeqId, State2} = publish(Msg, true, false, State1),
State2;
@@ -573,7 +575,8 @@ requeue(AckTags, State) ->
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
{_SeqId, State3} = publish(Msg, true, true, State2),
State3
- end, AckTags, State))).
+ end,
+ AckTags, State))).
len(#vqstate { len = Len }) -> Len.
@@ -852,10 +855,9 @@ beta_fold(Fun, Init, Q) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-ack(_ReleaseOrRemove, _Fun, [], State) ->
+ack(_MsgStoreFun, _Fun, [], State) ->
State;
-ack(ReleaseOrRemove, Fun, AckTags, State) when ReleaseOrRemove =:= remove orelse
- ReleaseOrRemove =:= release ->
+ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
@@ -873,7 +875,7 @@ ack(ReleaseOrRemove, Fun, AckTags, State) when ReleaseOrRemove =:= remove orelse
end, {{[], dict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:ReleaseOrRemove(MsgStore, Guids)
+ MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;