summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-11 23:14:59 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-11 23:14:59 +0100
commit9d4b466cd633593bb0938122d71544c024750f5d (patch)
treeeaad030f599262899f4dcddeaf9a7f34ca016015 /src
parentb5d282cc9ac3a734331f86f2a9b9511c0c5a3c06 (diff)
downloadrabbitmq-server-git-9d4b466cd633593bb0938122d71544c024750f5d.tar.gz
changed the mnesia table to be a bag of MsgId -> {Q, Delivered}
This makes startup MUCH faster, but delivery could be fractionally slower.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl25
-rw-r--r--src/rabbit_mnesia.erl1
2 files changed, 16 insertions, 10 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 765380b605..ebf0561b8a 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -190,9 +190,11 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
end,
% read the message
{ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset),
- [#dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = Delivered}] = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}),
+ [Obj = #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = Delivered}]
+ = mnesia:dirty_index_match_object(rabbit_disk_queue, #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}, 1),
if Delivered -> ok;
- true -> ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = true})
+ true -> ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj),
+ ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
end,
{ok, {MsgBody, BodySize, Delivered},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
@@ -215,7 +217,9 @@ internal_ack(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
true = ets:insert(FileSummary,
{File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
contiguous_prefix = ContiguousTop1}}),
- ok = mnesia:dirty_delete({rabbit_disk_queue, {MsgId, Q}}),
+ [Obj] = mnesia:dirty_match_object(rabbit_disk_queue,
+ #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}),
+ ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj),
sets:add_element(File, Files2);
1 < RefCount ->
true = ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
@@ -267,7 +271,8 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
lists:foldl(fun (MsgId, Acc) ->
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
ets:lookup(MsgLocation, MsgId),
- ok = mnesia:write(rabbit_disk_queue, #dq_msg_loc { queue_and_msg_id = {MsgId, Q}, is_delivered = false}, write),
+ ok = mnesia:write(rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}, write),
Acc or (CurName =:= File)
end, false, MsgIds)
end),
@@ -278,7 +283,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
internal_publish(Q, MsgId, MsgBody, State) ->
{ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
- ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_msg_id = {MsgId, Q}, is_delivered = false}),
+ ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}),
{ok, State1}.
internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation,
@@ -349,8 +354,8 @@ load_from_disk(State) ->
% There should be no more tmp files now, so go ahead and load the whole lot
(State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State),
% Finally, check there is nothing in mnesia which we haven't loaded
- true = lists:foldl(fun ({MsgId, _Q}, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
- true, mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)),
+ true = lists:foldl(fun (MsgId, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
+ true, mnesia:dirty_all_keys(rabbit_disk_queue)),
{ok, State1}.
load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) ->
@@ -377,7 +382,7 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'})) of
+ case length(mnesia:dirty_read(rabbit_disk_queue, MsgId)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
@@ -417,7 +422,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'}))
+ true = 0 < length(mnesia:dirty_read(rabbit_disk_queue, MsgId))
end, MsgIdsTmp),
{ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
@@ -448,7 +453,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
% we're in case 4 above.
% check that everything in the main file is a valid message in mnesia
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {MsgId, '_'}, '_'}))
+ true = 0 < length(mnesia:dirty_read(rabbit_disk_queue, MsgId))
end, MsgIds),
% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 7179b637d1..be58581ae8 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -146,6 +146,7 @@ table_definitions() ->
{attributes, record_info(fields, amqqueue)}]},
{rabbit_disk_queue,
[{record_name, dq_msg_loc},
+ {type, bag},
{attributes, record_info(fields, dq_msg_loc)},
{disc_copies, [node()]}]}
].