diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-12 12:30:38 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-12 12:30:38 +0100 |
| commit | 383558344a9103b4a250f729cc4380534bc07da8 (patch) | |
| tree | ec7aa79637747257e41b0f8dc4c8d067224c557d /src | |
| parent | d56092143a04fffce12dbd6b93bfcd0fdc4d3cfd (diff) | |
| download | rabbitmq-server-git-383558344a9103b4a250f729cc4380534bc07da8.tar.gz | |
switched mnesia to an ordered_set.
Seems to match performance with bag for startup and without the issues on fanout.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 2 |
2 files changed, 14 insertions, 14 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 90959b4020..db719ef3b3 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -190,11 +190,10 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, end, % read the message {ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset), - [Obj = #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = Delivered}] - = mnesia:dirty_index_match_object(rabbit_disk_queue, #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}, 1), + [Obj = #dq_msg_loc {is_delivered = Delivered}] + = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}), if Delivered -> ok; - true -> ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj), - ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) + true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) end, {ok, {MsgBody, BodySize, Delivered}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. @@ -220,9 +219,7 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), contiguous_prefix = ContiguousTop1}}), if MnesiaDelete -> - [Obj] = mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc {msg_id = MsgId, queue = Q, is_delivered = '_'}), - ok = mnesia:dirty_delete_object(rabbit_disk_queue, Obj); + ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q}); true -> ok end, @@ -278,7 +275,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, [{MsgId, _RefCount, File, _Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId), ok = mnesia:write(rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}, write), + #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, is_delivered = false}, write), Acc or (CurName =:= File) end, false, MsgIds) end), @@ -289,7 +286,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, internal_publish(Q, MsgId, MsgBody, State) -> {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), - ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue = Q, is_delivered = false}), + ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, is_delivered = false}), {ok, State1}. internal_tx_cancel(MsgIds, State) -> @@ -337,7 +334,7 @@ load_from_disk(State) -> % There should be no more tmp files now, so go ahead and load the whole lot (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), % Finally, check there is nothing in mnesia which we haven't loaded - true = lists:foldl(fun (MsgId, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, + true = lists:foldl(fun ({MsgId, _Q}, true) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, true, mnesia:dirty_all_keys(rabbit_disk_queue)), {ok, State1}. @@ -365,7 +362,8 @@ load_messages(Left, [File|Files], {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:dirty_read(rabbit_disk_queue, MsgId)) of + case length(mnesia:dirty_match_object(rabbit_disk_queue, + #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'})) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), @@ -405,7 +403,8 @@ recover_crashed_compactions1(Files, TmpFile) -> MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), % all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_read(rabbit_disk_queue, MsgId)) + true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, + #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'})) end, MsgIdsTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), MsgIds = lists:map(GrabMsgId, UncorruptedMessages), @@ -436,7 +435,8 @@ recover_crashed_compactions1(Files, TmpFile) -> % we're in case 4 above. % check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_read(rabbit_disk_queue, MsgId)) + true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, + #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'})) end, MsgIds), % The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index be58581ae8..858b024acf 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -146,7 +146,7 @@ table_definitions() -> {attributes, record_info(fields, amqqueue)}]}, {rabbit_disk_queue, [{record_name, dq_msg_loc}, - {type, bag}, + {type, ordered_set}, {attributes, record_info(fields, dq_msg_loc)}, {disc_copies, [node()]}]} ]. |
