summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-12 12:13:46 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-12 12:13:46 +0100
commitd56092143a04fffce12dbd6b93bfcd0fdc4d3cfd (patch)
tree304d16f0a71e14f1c76101f673b3cf40a46edf06
parent9d4b466cd633593bb0938122d71544c024750f5d (diff)
downloadrabbitmq-server-git-d56092143a04fffce12dbd6b93bfcd0fdc4d3cfd.tar.gz
refactored out common functionality between ack and cancel
-rw-r--r--Makefile4
-rw-r--r--src/rabbit_disk_queue.erl87
2 files changed, 37 insertions, 54 deletions
diff --git a/Makefile b/Makefile
index b7464244c2..8744f637f9 100644
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,7 @@
RABBITMQ_NODENAME=rabbit
RABBITMQ_SERVER_START_ARGS=
-RABBITMQ_MNESIA_DIR=/tmp/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
-RABBITMQ_LOG_BASE=/tmp
+RABBITMQ_MNESIA_DIR=/data/tmp/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
+RABBITMQ_LOG_BASE=/data/tmp
SOURCE_DIR=src
EBIN_DIR=ebin
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index ebf0561b8a..90959b4020 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -199,33 +199,39 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
{ok, {MsgBody, BodySize, Delivered},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
-internal_ack(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary,
- file_detail = FileDetail
- }) ->
- Files
- = lists:foldl(fun (MsgId, Files2) ->
- [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId),
- % is this the last time we need the message, in which case tidy up
- if 1 =:= RefCount ->
- true = ets:delete(MsgLocation, MsgId),
- [{File, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop }}]
- = ets:lookup(FileSummary, File),
- true = ets:delete(FileDetail, {File, Offset}),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- true = ets:insert(FileSummary,
- {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
- contiguous_prefix = ContiguousTop1}}),
- [Obj] = mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}),
- ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj),
- sets:add_element(File, Files2);
- 1 < RefCount ->
- true = ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
- Files2
- end
- end, sets:new(), MsgIds),
+internal_ack(Q, MsgIds, State) ->
+ remove_messages(Q, MsgIds, true, State).
+
+%% Q is only needed if MnesiaDelete = true
+remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
+ file_summary = FileSummary,
+ file_detail = FileDetail
+ }) ->
+ Files = lists:foldl(fun (MsgId, Files2) ->
+ [{MsgId, RefCount, File, Offset, TotalSize}]
+ = ets:lookup(MsgLocation, MsgId),
+ if 1 =:= RefCount ->
+ true = ets:delete(MsgLocation, MsgId),
+ [{File, FileSum = #dqfile { valid_data = ValidTotalSize,
+ contiguous_prefix = ContiguousTop }}]
+ = ets:lookup(FileSummary, File),
+ true = ets:delete(FileDetail, {File, Offset}),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
+ contiguous_prefix = ContiguousTop1}}),
+ if MnesiaDelete ->
+ [Obj] = mnesia:dirty_match_object(rabbit_disk_queue,
+ #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}),
+ ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj);
+ true ->
+ ok
+ end,
+ sets:add_element(File, Files2);
+ 1 < RefCount ->
+ ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
+ Files2
+ end
+ end, sets:new(), MsgIds),
State2 = compact(Files, State),
{ok, State2}.
@@ -286,31 +292,8 @@ internal_publish(Q, MsgId, MsgBody, State) ->
ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}),
{ok, State1}.
-internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary,
- file_detail = FileDetail
- }) ->
- Files =
- lists:foldl(fun (MsgId, Files2) ->
- [{MsgId, RefCount, File, Offset, TotalSize}]
- = ets:lookup(MsgLocation, MsgId),
- if 1 =:= RefCount ->
- true = ets:delete(MsgLocation, MsgId),
- [{File, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop }}]
- = ets:lookup(FileSummary, File),
- true = ets:delete(FileDetail, {File, Offset}),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
- contiguous_prefix = ContiguousTop1}}),
- sets:add_element(File, Files2);
- 1 < RefCount ->
- ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
- Files2
- end
- end, sets:new(), MsgIds),
- State2 = compact(Files, State),
- {ok, State2}.
+internal_tx_cancel(MsgIds, State) ->
+ remove_messages(undefined, MsgIds, false, State).
%% ---- ROLLING OVER THE APPEND FILE ----