summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-12 12:30:38 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-12 12:30:38 +0100
commit383558344a9103b4a250f729cc4380534bc07da8 (patch)
treeec7aa79637747257e41b0f8dc4c8d067224c557d /src
parentd56092143a04fffce12dbd6b93bfcd0fdc4d3cfd (diff)
downloadrabbitmq-server-git-383558344a9103b4a250f729cc4380534bc07da8.tar.gz
switched mnesia to an ordered_set.
Seems to match performance with bag for startup and without the issues on fanout.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl26
-rw-r--r--src/rabbit_mnesia.erl2
2 files changed, 14 insertions, 14 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 90959b4020..db719ef3b3 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -190,11 +190,10 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
end,
% read the message
{ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset),
- [Obj = #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = Delivered}]
- = mnesia:dirty_index_match_object(rabbit_disk_queue, #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}, 1),
+ [Obj = #dq_msg_loc {is_delivered = Delivered}]
+ = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}),
if Delivered -> ok;
- true -> ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj),
- ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
+ true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
end,
{ok, {MsgBody, BodySize, Delivered},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
@@ -220,9 +219,7 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
contiguous_prefix = ContiguousTop1}}),
if MnesiaDelete ->
- [Obj] = mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}),
- ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj);
+ ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q});
true ->
ok
end,
@@ -278,7 +275,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
ets:lookup(MsgLocation, MsgId),
ok = mnesia:write(rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}, write),
+ #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, is_delivered = false}, write),
Acc or (CurName =:= File)
end, false, MsgIds)
end),
@@ -289,7 +286,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
internal_publish(Q, MsgId, MsgBody, State) ->
{ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
- ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}),
+ ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, is_delivered = false}),
{ok, State1}.
internal_tx_cancel(MsgIds, State) ->
@@ -337,7 +334,7 @@ load_from_disk(State) ->
% There should be no more tmp files now, so go ahead and load the whole lot
(State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State),
% Finally, check there is nothing in mnesia which we haven't loaded
- true = lists:foldl(fun (MsgId, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
+ true = lists:foldl(fun ({MsgId, _Q}, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
true, mnesia:dirty_all_keys(rabbit_disk_queue)),
{ok, State1}.
@@ -365,7 +362,8 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:dirty_read(rabbit_disk_queue, MsgId)) of
+ case length(mnesia:dirty_match_object(rabbit_disk_queue,
+ #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'})) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
@@ -405,7 +403,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_read(rabbit_disk_queue, MsgId))
+ true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue,
+ #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'}))
end, MsgIdsTmp),
{ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
@@ -436,7 +435,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
% we're in case 4 above.
% check that everything in the main file is a valid message in mnesia
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_read(rabbit_disk_queue, MsgId))
+ true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue,
+ #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'}))
end, MsgIds),
% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index be58581ae8..858b024acf 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -146,7 +146,7 @@ table_definitions() ->
{attributes, record_info(fields, amqqueue)}]},
{rabbit_disk_queue,
[{record_name, dq_msg_loc},
- {type, bag},
+ {type, ordered_set},
{attributes, record_info(fields, dq_msg_loc)},
{disc_copies, [node()]}]}
].