summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-13 14:44:29 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-13 14:44:29 +0100
commit885575462f77893da94faccdaa8c490758d5bde2 (patch)
tree5d3b1fe452b8477d3fbefb66d446642b1121e51f
parent25995387be653239513a9e00ae2abe885b167093 (diff)
downloadrabbitmq-server-git-885575462f77893da94faccdaa8c490758d5bde2.tar.gz
Made tx_commit take a list of seq_ids which are things to be ack'd. This means that some external thing should keep track of exactly what is in a transaction (this is already the case for publishes, it just needs to be extended for acks), and then present them for the commit. Also, fixed a stupid bug in the stress_gc test which was previously acking everything at once (albeit in a weird order as desired) which meant all files got emptied before the gc ran, not quite what was desired.
-rw-r--r--src/rabbit_disk_queue.erl86
-rw-r--r--src/rabbit_tests.erl19
2 files changed, 61 insertions, 44 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 803f358b9c..c08258c846 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]).
+-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1]).
-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
@@ -152,7 +152,12 @@
%% MsgLocation is deliberately a dets table, and the mnesia table is
%% set to be a disk_only_table in order to ensure that we are not RAM
-%% constrained.
+%% constrained. However, for performance reasons, it is possible to
+%% call to_ram_disk_mode/0 which will alter the mnesia table to
+%% disc_copies and convert MsgLocation to an ets table. This results
+%% in a massive performance improvement, at the expense of greater RAM
+%% usage. The idea is that when memory gets tight, we switch to
+%% disk_only mode but otherwise try to run in ram_disk mode.
%% So, with this design, messages move to the left. Eventually, they
%% should end up in a contiguous block on the left and are then never
@@ -176,21 +181,21 @@
%% the data the size of which is tracked by the ContiguousTop
%% variable. Judicious use of a mirror is required).
%%
-%% --------- --------- ---------
+%% +-------+ +-------- ---------
%% | X | | G | | G |
-%% --------- --------- ---------
+%% +-------+ +-------- ---------
%% | D | | X | | F |
-%% --------- --------- ---------
+%% +-------+ +-------- ---------
%% | X | | X | | E |
-%% --------- --------- ---------
+%% +-------+ +-------- ---------
%% | C | | F | ===> | D |
-%% --------- --------- ---------
+%% +-------+ +-------- ---------
%% | X | | X | | C |
-%% --------- --------- ---------
+%% +-------+ +-------- ---------
%% | B | | X | | B |
-%% --------- --------- ---------
+%% +-------+ +-------- ---------
%% | A | | E | | A |
-%% --------- --------- ---------
+%% +-------+ +-------- ---------
%% left right left
%%
%% From this reasoning, we do have a bound on the number of times the
@@ -224,7 +229,7 @@
bool(), {msg_id(), seq_id()}}}).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
--spec(tx_commit/2 :: (queue_name(), [msg_id()]) -> 'ok').
+-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
@@ -251,8 +256,8 @@ ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
tx_publish(MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}).
-tx_commit(Q, MsgIds) when is_list(MsgIds) ->
- gen_server:call(?SERVER, {tx_commit, Q, MsgIds}, infinity).
+tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
+ gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
tx_cancel(MsgIds) when is_list(MsgIds) ->
gen_server:cast(?SERVER, {tx_cancel, MsgIds}).
@@ -329,8 +334,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, State),
{reply, Result, State1};
-handle_call({tx_commit, Q, MsgIds}, _From, State) ->
- {ok, State1} = internal_tx_commit(Q, MsgIds, State),
+handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
+ {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State),
{reply, ok, State1};
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
@@ -506,11 +511,12 @@ getReadHandle(File, State =
ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
{FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}.
-internal_ack(Q, MsgIds, State) ->
- remove_messages(Q, MsgIds, true, State).
+internal_ack(Q, MsgSeqIds, State) ->
+ remove_messages(Q, MsgSeqIds, true, State).
-%% Q is only needed if MnesiaDelete = true
+%% Q is only needed if MnesiaDelete /= false
%% called from tx_cancel with MnesiaDelete = false
+%% called from internal_tx_cancel with MnesiaDelete = txn
%% called from ack with MnesiaDelete = true
remove_messages(Q, MsgSeqIds, MnesiaDelete,
State = #dqstate { file_summary = FileSummary,
@@ -539,10 +545,12 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
File, Offset, TotalSize}),
Files2
end,
- if MnesiaDelete ->
- ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId});
- true -> ok
- end,
+ ok = if MnesiaDelete ->
+ mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId});
+ MnesiaDelete =:= txn ->
+ mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write);
+ true -> ok
+ end,
Files3
end, sets:new(), MsgSeqIds),
State2 = compact(Files, State),
@@ -581,7 +589,7 @@ internal_tx_publish(MsgId, MsgBody,
{ok, State}
end.
-internal_tx_commit(Q, MsgIds,
+internal_tx_commit(Q, PubMsgIds, AckSeqIds,
State = #dqstate { current_file_handle = CurHdl,
current_file_name = CurName,
sequences = Sequences
@@ -594,18 +602,26 @@ internal_tx_commit(Q, MsgIds,
{atomic, {Sync, WriteSeqId}} =
mnesia:transaction(
fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(
- fun (MsgId, {Acc, NextWriteSeqId}) ->
- [{MsgId, _RefCount, File, _Offset, _TotalSize}] =
- dets_ets_lookup(State, MsgId),
- ok = mnesia:write(rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id =
- {Q, NextWriteSeqId},
- msg_id = MsgId,
- is_delivered = false},
- write),
- {Acc or (CurName =:= File), NextWriteSeqId + 1}
- end, {false, InitWriteSeqId}, MsgIds)
+ %% must deal with publishes first, if we didn't
+ %% then we could end up acking a message before
+ %% it's been published, which is clearly
+ %% nonsense. I.e. in commit, do not do things in an
+ %% order which _could_not_ have happened.
+ {Sync2, WriteSeqId3} =
+ lists:foldl(
+ fun (MsgId, {Acc, NextWriteSeqId}) ->
+ [{MsgId, _RefCount, File, _Offset, _TotalSize}] =
+ dets_ets_lookup(State, MsgId),
+ ok = mnesia:write(rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id =
+ {Q, NextWriteSeqId},
+ msg_id = MsgId,
+ is_delivered = false},
+ write),
+ {Acc or (CurName =:= File), NextWriteSeqId + 1}
+ end, {false, InitWriteSeqId}, PubMsgIds),
+ remove_messages(Q, AckSeqIds, txn, State),
+ {Sync2, WriteSeqId3}
end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}),
if Sync -> ok = file:sync(CurHdl);
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index fcd3d5f668..2640439e9f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -698,6 +698,7 @@ test_disk_queue() ->
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
Startup = rdq_virgin(),
rdq_start(),
+ rabbit_disk_queue:to_ram_disk_mode(),
QCount = length(Qs),
Msg = <<0:(8*MsgSizeBytes)>>,
List = lists:seq(1, MsgCount),
@@ -705,7 +706,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
timer:tc(?MODULE, rdq_time_commands,
[[fun() -> [rabbit_disk_queue:tx_publish(N, Msg)
|| N <- List, _ <- Qs] end,
- fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List)
+ fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, [])
|| Q <- Qs] end
]]),
{Deliver, ok} =
@@ -714,8 +715,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
[begin {N, Msg, MsgSizeBytes, false, SeqId} =
rabbit_disk_queue:deliver(Q), SeqId end
|| N <- List],
- rabbit_disk_queue:ack(Q, SeqIds),
- ok = rabbit_disk_queue:tx_commit(Q, [])
+ ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds)
end || Q <- Qs]
end]]),
io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n",
@@ -735,7 +735,7 @@ rdq_stress_gc(MsgCount) ->
Msg = <<0:(8*MsgSizeBytes)>>, % 256KB
List = lists:seq(1, MsgCount),
[rabbit_disk_queue:tx_publish(N, Msg) || N <- List],
- rabbit_disk_queue:tx_commit(q, List),
+ rabbit_disk_queue:tx_commit(q, List, []),
StartChunk = round(MsgCount / 20), % 5%
AckList =
lists:reverse(
@@ -759,10 +759,11 @@ rdq_stress_gc(MsgCount) ->
rabbit_disk_queue:deliver(q),
dict:store(MsgId, SeqId, Acc)
end, dict:new(), List),
- rabbit_disk_queue:ack(q, [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict),
- SeqId end
- || MsgId <- AckList]),
- rabbit_disk_queue:tx_commit(q, []),
+ %% we really do want to ack each of this individually
+ [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict),
+ rabbit_disk_queue:ack(q, [SeqId]) end
+ || MsgId <- AckList],
+ rabbit_disk_queue:tx_commit(q, [], []),
rdq_stop(),
passed.
@@ -778,7 +779,7 @@ rdq_time_insane_startup() ->
%% within 1GB and thus in a single file
io:format("Publishing ~p empty messages...~n",[Count]),
[rabbit_disk_queue:tx_publish(N, Msg) || N <- List],
- rabbit_disk_queue:tx_commit(q, List),
+ rabbit_disk_queue:tx_commit(q, List, []),
io:format("...done. Timing restart...~n", []),
rdq_stop(),
Micros = rdq_virgin(),