diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-12 12:13:46 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-12 12:13:46 +0100 |
| commit | d56092143a04fffce12dbd6b93bfcd0fdc4d3cfd (patch) | |
| tree | 304d16f0a71e14f1c76101f673b3cf40a46edf06 | |
| parent | 9d4b466cd633593bb0938122d71544c024750f5d (diff) | |
| download | rabbitmq-server-git-d56092143a04fffce12dbd6b93bfcd0fdc4d3cfd.tar.gz | |
refactored out common functionality between ack and cancel
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 87 |
2 files changed, 37 insertions, 54 deletions
@@ -1,7 +1,7 @@ RABBITMQ_NODENAME=rabbit RABBITMQ_SERVER_START_ARGS= -RABBITMQ_MNESIA_DIR=/tmp/rabbitmq-$(RABBITMQ_NODENAME)-mnesia -RABBITMQ_LOG_BASE=/tmp +RABBITMQ_MNESIA_DIR=/data/tmp/rabbitmq-$(RABBITMQ_NODENAME)-mnesia +RABBITMQ_LOG_BASE=/data/tmp SOURCE_DIR=src EBIN_DIR=ebin diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index ebf0561b8a..90959b4020 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -199,33 +199,39 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, {ok, {MsgBody, BodySize, Delivered}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. -internal_ack(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary, - file_detail = FileDetail - }) -> - Files - = lists:foldl(fun (MsgId, Files2) -> - [{MsgId, RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), - % is this the last time we need the message, in which case tidy up - if 1 =:= RefCount -> - true = ets:delete(MsgLocation, MsgId), - [{File, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop }}] - = ets:lookup(FileSummary, File), - true = ets:delete(FileDetail, {File, Offset}), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - true = ets:insert(FileSummary, - {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), - contiguous_prefix = ContiguousTop1}}), - [Obj] = mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}), - ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj), - sets:add_element(File, Files2); - 1 < RefCount -> - true = ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), - Files2 - end - end, sets:new(), MsgIds), +internal_ack(Q, MsgIds, State) -> + remove_messages(Q, MsgIds, true, State). + +%% Q is only needed if MnesiaDelete = true +remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation, + file_summary = FileSummary, + file_detail = FileDetail + }) -> + Files = lists:foldl(fun (MsgId, Files2) -> + [{MsgId, RefCount, File, Offset, TotalSize}] + = ets:lookup(MsgLocation, MsgId), + if 1 =:= RefCount -> + true = ets:delete(MsgLocation, MsgId), + [{File, FileSum = #dqfile { valid_data = ValidTotalSize, + contiguous_prefix = ContiguousTop }}] + = ets:lookup(FileSummary, File), + true = ets:delete(FileDetail, {File, Offset}), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), + contiguous_prefix = ContiguousTop1}}), + if MnesiaDelete -> + [Obj] = mnesia:dirty_match_object(rabbit_disk_queue, + #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}), + ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj); + true -> + ok + end, + sets:add_element(File, Files2); + 1 < RefCount -> + ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), + Files2 + end + end, sets:new(), MsgIds), State2 = compact(Files, State), {ok, State2}. @@ -286,31 +292,8 @@ internal_publish(Q, MsgId, MsgBody, State) -> ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}), {ok, State1}. -internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary, - file_detail = FileDetail - }) -> - Files = - lists:foldl(fun (MsgId, Files2) -> - [{MsgId, RefCount, File, Offset, TotalSize}] - = ets:lookup(MsgLocation, MsgId), - if 1 =:= RefCount -> - true = ets:delete(MsgLocation, MsgId), - [{File, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop }}] - = ets:lookup(FileSummary, File), - true = ets:delete(FileDetail, {File, Offset}), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), - contiguous_prefix = ContiguousTop1}}), - sets:add_element(File, Files2); - 1 < RefCount -> - ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), - Files2 - end - end, sets:new(), MsgIds), - State2 = compact(Files, State), - {ok, State2}. +internal_tx_cancel(MsgIds, State) -> + remove_messages(undefined, MsgIds, false, State). %% ---- ROLLING OVER THE APPEND FILE ---- |
