diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-11 23:14:59 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-11 23:14:59 +0100 |
| commit | 9d4b466cd633593bb0938122d71544c024750f5d (patch) | |
| tree | eaad030f599262899f4dcddeaf9a7f34ca016015 /src | |
| parent | b5d282cc9ac3a734331f86f2a9b9511c0c5a3c06 (diff) | |
| download | rabbitmq-server-git-9d4b466cd633593bb0938122d71544c024750f5d.tar.gz | |
changed the mnesia table to be a bag of MsgId -> {Q, Delivered}
This makes startup MUCH faster, but delivery could be fractionally slower.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 1 |
2 files changed, 16 insertions, 10 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 765380b605..ebf0561b8a 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -190,9 +190,11 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, end, % read the message {ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset), - [#dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = Delivered}] = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}), + [Obj = #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = Delivered}] + = mnesia:dirty_index_match_object(rabbit_disk_queue, #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}, 1), if Delivered -> ok; - true -> ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = true}) + true -> ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj), + ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) end, {ok, {MsgBody, BodySize, Delivered}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. @@ -215,7 +217,9 @@ internal_ack(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), contiguous_prefix = ContiguousTop1}}), - ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}), + [Obj] = mnesia:dirty_match_object(rabbit_disk_queue, + #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}), + ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj), sets:add_element(File, Files2); 1 < RefCount -> true = ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), @@ -267,7 +271,8 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, lists:foldl(fun (MsgId, Acc) -> [{MsgId, _RefCount, File, _Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId), - ok = mnesia:write(rabbit_disk_queue, #dq_msg_loc { queue_and_msg_id = {MsgId, Q}, is_delivered = false}, write), + ok = mnesia:write(rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}, write), Acc or (CurName =:= File) end, false, MsgIds) end), @@ -278,7 +283,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, internal_publish(Q, MsgId, MsgBody, State) -> {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), - ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_msg_id = {MsgId, Q}, is_delivered = false}), + ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}), {ok, State1}. internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation, @@ -349,8 +354,8 @@ load_from_disk(State) -> % There should be no more tmp files now, so go ahead and load the whole lot (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), % Finally, check there is nothing in mnesia which we haven't loaded - true = lists:foldl(fun ({MsgId, _Q}, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, - true, mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)), + true = lists:foldl(fun (MsgId, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, + true, mnesia:dirty_all_keys(rabbit_disk_queue)), {ok, State1}. load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> @@ -377,7 +382,7 @@ load_messages(Left, [File|Files], {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'})) of + case length(mnesia:dirty_read(rabbit_disk_queue, MsgId)) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), @@ -417,7 +422,7 @@ recover_crashed_compactions1(Files, TmpFile) -> MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), % all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'})) + true = 0 < length(mnesia:dirty_read(rabbit_disk_queue, MsgId)) end, MsgIdsTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), MsgIds = lists:map(GrabMsgId, UncorruptedMessages), @@ -448,7 +453,7 @@ recover_crashed_compactions1(Files, TmpFile) -> % we're in case 4 above. % check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'})) + true = 0 < length(mnesia:dirty_read(rabbit_disk_queue, MsgId)) end, MsgIds), % The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 7179b637d1..be58581ae8 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -146,6 +146,7 @@ table_definitions() -> {attributes, record_info(fields, amqqueue)}]}, {rabbit_disk_queue, [{record_name, dq_msg_loc}, + {type, bag}, {attributes, record_info(fields, dq_msg_loc)}, {disc_copies, [node()]}]} ]. |
