summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-22 15:58:06 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-22 15:58:06 +0100
commita3e0f937bf85c5979e5883216e4be4a441e503ed (patch)
treeafb4488830e148624905bc7ca479c51cdad7566f
parentb573cde7ceee9b1eba416a11aa3087aafcacd1d0 (diff)
downloadrabbitmq-server-git-a3e0f937bf85c5979e5883216e4be4a441e503ed.tar.gz
refactoring: get rid of MnesiaDelete flag to remove_messages
-rw-r--r--src/rabbit_disk_queue.erl37
1 files changed, 14 insertions, 23 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 02c20e300c..af5d808acb 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -425,22 +425,16 @@ internal_foldl(Q, WriteSeqId, Fun, State = #dqstate { store = Store },
Acc1, ReadSeqId + 1).
internal_ack(Q, MsgSeqIds, State) ->
- remove_messages(Q, MsgSeqIds, true, State).
+ remove_messages(Q, MsgSeqIds, State).
-%% Q is only needed if MnesiaDelete /= false
-remove_messages(Q, MsgSeqIds, MnesiaDelete,
- State = #dqstate { store = Store } ) ->
+remove_messages(Q, MsgSeqIds, State = #dqstate { store = Store } ) ->
MsgIds = lists:foldl(
fun ({MsgId, SeqId}, MsgIdAcc) ->
- ok = case MnesiaDelete of
- true -> mnesia:dirty_delete(rabbit_disk_queue,
- {Q, SeqId});
- _ -> ok
- end,
+ ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}),
[MsgId | MsgIdAcc]
end, [], MsgSeqIds),
Store1 = rabbit_msg_store:remove(MsgIds, Store),
- {ok, State #dqstate { store = Store1}}.
+ {ok, State #dqstate { store = Store1 }}.
internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
guid = MsgId,
@@ -481,7 +475,7 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
end, {ok, InitWriteSeqId}, PubMsgIds),
WriteSeqId1
end),
- {ok, State1} = remove_messages(Q, AckSeqIds, true, State),
+ {ok, State1} = remove_messages(Q, AckSeqIds, State),
true = case PubMsgIds of
[] -> true;
_ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId})
@@ -501,11 +495,9 @@ internal_publish(Q, Message = #basic_message { guid = MsgId },
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}),
{ok, {MsgId, WriteSeqId}, State1}.
-internal_tx_rollback(MsgIds, State) ->
- %% we don't need seq ids because we're not touching mnesia,
- %% because seqids were never assigned
- MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
- remove_messages(undefined, MsgSeqIds, false, State).
+internal_tx_rollback(MsgIds, State = #dqstate { store = Store }) ->
+ Store1 = rabbit_msg_store:remove(MsgIds, Store),
+ {ok, State #dqstate { store = Store1 }}.
internal_requeue(_Q, [], State) ->
{ok, State};
@@ -599,7 +591,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
{true, {MsgId, SeqId}, SeqId + 1}
end, ReadSeqId),
true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
- {ok, State1} = remove_messages(Q, MsgSeqIds, true, State),
+ {ok, State1} = remove_messages(Q, MsgSeqIds, State),
{ok, WriteSeqId - ReadSeqId, State1}
end.
@@ -612,12 +604,11 @@ internal_delete_queue(Q, State) ->
Objs = mnesia:dirty_match_object(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, '_'}, _ = '_' }),
- MsgSeqIds =
- lists:map(
- fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
- msg_id = MsgId }) ->
- {MsgId, SeqId} end, Objs),
- remove_messages(Q, MsgSeqIds, true, State2).
+ MsgSeqIds = lists:map(fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
+ msg_id = MsgId }) ->
+ {MsgId, SeqId}
+ end, Objs),
+ remove_messages(Q, MsgSeqIds, State2).
internal_delete_non_durable_queues(
DurableQueues, State = #dqstate { sequences = Sequences }) ->