diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-13 14:44:29 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-13 14:44:29 +0100 |
| commit | 885575462f77893da94faccdaa8c490758d5bde2 (patch) | |
| tree | 5d3b1fe452b8477d3fbefb66d446642b1121e51f | |
| parent | 25995387be653239513a9e00ae2abe885b167093 (diff) | |
| download | rabbitmq-server-git-885575462f77893da94faccdaa8c490758d5bde2.tar.gz | |
Made tx_commit take a list of seq_ids which are things to be ack'd. This means that some external thing should keep track of exactly what is in a transaction (this is already the case for publishes, it just needs to be extended for acks), and then present them for the commit. Also, fixed a stupid bug in the stress_gc test which was previously acking everything at once (albeit in a weird order as desired) which meant all files got emptied before the gc ran, not quite what was desired.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 19 |
2 files changed, 61 insertions, 44 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 803f358b9c..c08258c846 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]). +-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1]). -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). @@ -152,7 +152,12 @@ %% MsgLocation is deliberately a dets table, and the mnesia table is %% set to be a disk_only_table in order to ensure that we are not RAM -%% constrained. +%% constrained. However, for performance reasons, it is possible to +%% call to_ram_disk_mode/0 which will alter the mnesia table to +%% disc_copies and convert MsgLocation to an ets table. This results +%% in a massive performance improvement, at the expense of greater RAM +%% usage. The idea is that when memory gets tight, we switch to +%% disk_only mode but otherwise try to run in ram_disk mode. %% So, with this design, messages move to the left. Eventually, they %% should end up in a contiguous block on the left and are then never @@ -176,21 +181,21 @@ %% the data the size of which is tracked by the ContiguousTop %% variable. Judicious use of a mirror is required). %% -%% --------- --------- --------- +%% +-------+ +-------- --------- %% | X | | G | | G | -%% --------- --------- --------- +%% +-------+ +-------- --------- %% | D | | X | | F | -%% --------- --------- --------- +%% +-------+ +-------- --------- %% | X | | X | | E | -%% --------- --------- --------- +%% +-------+ +-------- --------- %% | C | | F | ===> | D | -%% --------- --------- --------- +%% +-------+ +-------- --------- %% | X | | X | | C | -%% --------- --------- --------- +%% +-------+ +-------- --------- %% | B | | X | | B | -%% --------- --------- --------- +%% +-------+ +-------- --------- %% | A | | E | | A | -%% --------- --------- --------- +%% +-------+ +-------- --------- %% left right left %% %% From this reasoning, we do have a bound on the number of times the @@ -224,7 +229,7 @@ bool(), {msg_id(), seq_id()}}}). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). --spec(tx_commit/2 :: (queue_name(), [msg_id()]) -> 'ok'). +-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). @@ -251,8 +256,8 @@ ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> tx_publish(MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}). -tx_commit(Q, MsgIds) when is_list(MsgIds) -> - gen_server:call(?SERVER, {tx_commit, Q, MsgIds}, infinity). +tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> + gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server:cast(?SERVER, {tx_cancel, MsgIds}). @@ -329,8 +334,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, State), {reply, Result, State1}; -handle_call({tx_commit, Q, MsgIds}, _From, State) -> - {ok, State1} = internal_tx_commit(Q, MsgIds, State), +handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> + {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State), {reply, ok, State1}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate @@ -506,11 +511,12 @@ getReadHandle(File, State = ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), {FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}. -internal_ack(Q, MsgIds, State) -> - remove_messages(Q, MsgIds, true, State). +internal_ack(Q, MsgSeqIds, State) -> + remove_messages(Q, MsgSeqIds, true, State). -%% Q is only needed if MnesiaDelete = true +%% Q is only needed if MnesiaDelete /= false %% called from tx_cancel with MnesiaDelete = false +%% called from internal_tx_cancel with MnesiaDelete = txn %% called from ack with MnesiaDelete = true remove_messages(Q, MsgSeqIds, MnesiaDelete, State = #dqstate { file_summary = FileSummary, @@ -539,10 +545,12 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, File, Offset, TotalSize}), Files2 end, - if MnesiaDelete -> - ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); - true -> ok - end, + ok = if MnesiaDelete -> + mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); + MnesiaDelete =:= txn -> + mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write); + true -> ok + end, Files3 end, sets:new(), MsgSeqIds), State2 = compact(Files, State), @@ -581,7 +589,7 @@ internal_tx_publish(MsgId, MsgBody, {ok, State} end. -internal_tx_commit(Q, MsgIds, +internal_tx_commit(Q, PubMsgIds, AckSeqIds, State = #dqstate { current_file_handle = CurHdl, current_file_name = CurName, sequences = Sequences @@ -594,18 +602,26 @@ internal_tx_commit(Q, MsgIds, {atomic, {Sync, WriteSeqId}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl( - fun (MsgId, {Acc, NextWriteSeqId}) -> - [{MsgId, _RefCount, File, _Offset, _TotalSize}] = - dets_ets_lookup(State, MsgId), - ok = mnesia:write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = - {Q, NextWriteSeqId}, - msg_id = MsgId, - is_delivered = false}, - write), - {Acc or (CurName =:= File), NextWriteSeqId + 1} - end, {false, InitWriteSeqId}, MsgIds) + %% must deal with publishes first, if we didn't + %% then we could end up acking a message before + %% it's been published, which is clearly + %% nonsense. I.e. in commit, do not do things in an + %% order which _could_not_ have happened. + {Sync2, WriteSeqId3} = + lists:foldl( + fun (MsgId, {Acc, NextWriteSeqId}) -> + [{MsgId, _RefCount, File, _Offset, _TotalSize}] = + dets_ets_lookup(State, MsgId), + ok = mnesia:write(rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = + {Q, NextWriteSeqId}, + msg_id = MsgId, + is_delivered = false}, + write), + {Acc or (CurName =:= File), NextWriteSeqId + 1} + end, {false, InitWriteSeqId}, PubMsgIds), + remove_messages(Q, AckSeqIds, txn, State), + {Sync2, WriteSeqId3} end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}), if Sync -> ok = file:sync(CurHdl); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index fcd3d5f668..2640439e9f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -698,6 +698,7 @@ test_disk_queue() -> rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> Startup = rdq_virgin(), rdq_start(), + rabbit_disk_queue:to_ram_disk_mode(), QCount = length(Qs), Msg = <<0:(8*MsgSizeBytes)>>, List = lists:seq(1, MsgCount), @@ -705,7 +706,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> timer:tc(?MODULE, rdq_time_commands, [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- List, _ <- Qs] end, - fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) + fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, []) || Q <- Qs] end ]]), {Deliver, ok} = @@ -714,8 +715,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(Q), SeqId end || N <- List], - rabbit_disk_queue:ack(Q, SeqIds), - ok = rabbit_disk_queue:tx_commit(Q, []) + ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds) end || Q <- Qs] end]]), io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n", @@ -735,7 +735,7 @@ rdq_stress_gc(MsgCount) -> Msg = <<0:(8*MsgSizeBytes)>>, % 256KB List = lists:seq(1, MsgCount), [rabbit_disk_queue:tx_publish(N, Msg) || N <- List], - rabbit_disk_queue:tx_commit(q, List), + rabbit_disk_queue:tx_commit(q, List, []), StartChunk = round(MsgCount / 20), % 5% AckList = lists:reverse( @@ -759,10 +759,11 @@ rdq_stress_gc(MsgCount) -> rabbit_disk_queue:deliver(q), dict:store(MsgId, SeqId, Acc) end, dict:new(), List), - rabbit_disk_queue:ack(q, [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), - SeqId end - || MsgId <- AckList]), - rabbit_disk_queue:tx_commit(q, []), + %% we really do want to ack each of this individually + [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), + rabbit_disk_queue:ack(q, [SeqId]) end + || MsgId <- AckList], + rabbit_disk_queue:tx_commit(q, [], []), rdq_stop(), passed. @@ -778,7 +779,7 @@ rdq_time_insane_startup() -> %% within 1GB and thus in a single file io:format("Publishing ~p empty messages...~n",[Count]), [rabbit_disk_queue:tx_publish(N, Msg) || N <- List], - rabbit_disk_queue:tx_commit(q, List), + rabbit_disk_queue:tx_commit(q, List, []), io:format("...done. Timing restart...~n", []), rdq_stop(), Micros = rdq_virgin(), |
