diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-09-22 15:58:06 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-09-22 15:58:06 +0100 |
| commit | a3e0f937bf85c5979e5883216e4be4a441e503ed (patch) | |
| tree | afb4488830e148624905bc7ca479c51cdad7566f | |
| parent | b573cde7ceee9b1eba416a11aa3087aafcacd1d0 (diff) | |
| download | rabbitmq-server-git-a3e0f937bf85c5979e5883216e4be4a441e503ed.tar.gz | |
refactoring: get rid of MnesiaDelete flag to remove_messages
| -rw-r--r-- | src/rabbit_disk_queue.erl | 37 |
1 files changed, 14 insertions, 23 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 02c20e300c..af5d808acb 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -425,22 +425,16 @@ internal_foldl(Q, WriteSeqId, Fun, State = #dqstate { store = Store }, Acc1, ReadSeqId + 1). internal_ack(Q, MsgSeqIds, State) -> - remove_messages(Q, MsgSeqIds, true, State). + remove_messages(Q, MsgSeqIds, State). -%% Q is only needed if MnesiaDelete /= false -remove_messages(Q, MsgSeqIds, MnesiaDelete, - State = #dqstate { store = Store } ) -> +remove_messages(Q, MsgSeqIds, State = #dqstate { store = Store } ) -> MsgIds = lists:foldl( fun ({MsgId, SeqId}, MsgIdAcc) -> - ok = case MnesiaDelete of - true -> mnesia:dirty_delete(rabbit_disk_queue, - {Q, SeqId}); - _ -> ok - end, + ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}), [MsgId | MsgIdAcc] end, [], MsgSeqIds), Store1 = rabbit_msg_store:remove(MsgIds, Store), - {ok, State #dqstate { store = Store1}}. + {ok, State #dqstate { store = Store1 }}. internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, guid = MsgId, @@ -481,7 +475,7 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, end, {ok, InitWriteSeqId}, PubMsgIds), WriteSeqId1 end), - {ok, State1} = remove_messages(Q, AckSeqIds, true, State), + {ok, State1} = remove_messages(Q, AckSeqIds, State), true = case PubMsgIds of [] -> true; _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId}) @@ -501,11 +495,9 @@ internal_publish(Q, Message = #basic_message { guid = MsgId }, true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}), {ok, {MsgId, WriteSeqId}, State1}. -internal_tx_rollback(MsgIds, State) -> - %% we don't need seq ids because we're not touching mnesia, - %% because seqids were never assigned - MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), - remove_messages(undefined, MsgSeqIds, false, State). +internal_tx_rollback(MsgIds, State = #dqstate { store = Store }) -> + Store1 = rabbit_msg_store:remove(MsgIds, Store), + {ok, State #dqstate { store = Store1 }}. internal_requeue(_Q, [], State) -> {ok, State}; @@ -599,7 +591,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> {true, {MsgId, SeqId}, SeqId + 1} end, ReadSeqId), true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}), - {ok, State1} = remove_messages(Q, MsgSeqIds, true, State), + {ok, State1} = remove_messages(Q, MsgSeqIds, State), {ok, WriteSeqId - ReadSeqId, State1} end. @@ -612,12 +604,11 @@ internal_delete_queue(Q, State) -> Objs = mnesia:dirty_match_object( rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, '_'}, _ = '_' }), - MsgSeqIds = - lists:map( - fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, - msg_id = MsgId }) -> - {MsgId, SeqId} end, Objs), - remove_messages(Q, MsgSeqIds, true, State2). + MsgSeqIds = lists:map(fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, + msg_id = MsgId }) -> + {MsgId, SeqId} + end, Objs), + remove_messages(Q, MsgSeqIds, State2). internal_delete_non_durable_queues( DurableQueues, State = #dqstate { sequences = Sequences }) -> |
