summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-10 17:21:23 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-10 17:21:23 +0100
commit3b5a7e084e11ca53b8c25ee36c2ffb071aab9ac7 (patch)
tree47bfb9c519e2f5eea195807114cddb190b5e053b /src
parenta1c140505e6bd32d57fabc37597981bc34f11e53 (diff)
downloadrabbitmq-server-git-3b5a7e084e11ca53b8c25ee36c2ffb071aab9ac7.tar.gz
many bugs fixed. Still no compaction. However, performance is horrible, because the test which established ets:insert_new did what we wanted was actually flawed, and we can't do that, and ets:insert is not fast enough in a bag with many similar keys. What I want is to be able to say "yes, just insert this, I guarantee that whilst there are other equal keys in this bag, you don't need to go and try to find matching values". Hmph
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl47
1 files changed, 25 insertions, 22 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index dbd81215f6..d50006836b 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -74,7 +74,7 @@ publish(Q, MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
deliver(Q, MsgId) ->
- gen_server:call(?SERVER, {deliver, Q, MsgId}).
+ gen_server:call(?SERVER, {deliver, Q, MsgId}, infinity).
ack(Q, MsgIds) when is_list(MsgIds) ->
gen_server:cast(?SERVER, {ack, Q, MsgIds}).
@@ -83,7 +83,7 @@ tx_publish(MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}).
tx_commit(Q, MsgIds) when is_list(MsgIds) ->
- gen_server:call(?SERVER, {tx_commit, Q, MsgIds}).
+ gen_server:call(?SERVER, {tx_commit, Q, MsgIds}, infinity).
tx_cancel(MsgIds) when is_list(MsgIds) ->
gen_server:cast(?SERVER, {tx_cancel, MsgIds}).
@@ -92,11 +92,12 @@ tx_cancel(MsgIds) when is_list(MsgIds) ->
init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
+ InitName = "0" ++ (?FILE_EXTENSION),
State = #dqstate { msg_location = ets:new((?MSG_LOC_ETS_NAME), [set, private]),
- file_summary = dict:new(),
+ file_summary = dict:store(InitName, {0, 0, undefined, undefined}, dict:new()),
file_detail = ets:new((?FILE_DETAIL_ETS_NAME), [bag, private]),
current_file_num = 0,
- current_file_name = "0" ++ (?FILE_EXTENSION),
+ current_file_name = InitName,
current_file_handle = undefined,
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
@@ -121,7 +122,7 @@ handle_cast({publish, Q, MsgId, MsgBody}, State) ->
handle_cast({ack, Q, MsgIds}, State) ->
{ok, State1} = lists:foldl(fun (MsgId, {ok, State2}) ->
internal_ack(Q, MsgId, State2)
- end, State, MsgIds),
+ end, {ok, State}, MsgIds),
{noreply, State1};
handle_cast({tx_publish, MsgId, MsgBody}, State) ->
{ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
@@ -136,11 +137,12 @@ handle_info(_Info, State) ->
terminate(_Reason, #dqstate { current_file_handle = FileHdl,
read_file_handles = {ReadHdls, _ReadHdlsAge}
}) ->
+ io:format("DYING~n", []),
ok = file:sync(FileHdl),
ok = file:close(FileHdl),
- dict:map(fun (_File, Hdl) ->
- ok = file:close(Hdl)
- end, ReadHdls).
+ dict:fold(fun (_File, Hdl, _Acc) ->
+ file:close(Hdl)
+ end, ok, ReadHdls).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -191,8 +193,8 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
end,
% read the message
{ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset),
- [{{Q, MsgId}, Delivered}] = mnesia:read(rabbit_disk_queue, {Q, MsgId}, read),
- ok = mnesia:write(rabbit_disk_queue, {{Q, MsgId}, true}, write),
+ [#dq_msg_loc {queue_and_msg_id = {Q, MsgId}, is_delivered = Delivered}] = mnesia:dirty_read(rabbit_disk_queue, {Q, MsgId}),
+ ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc {queue_and_msg_id = {Q, MsgId}, is_delivered = true}),
{ok, {MsgBody, BodySize, Delivered},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
@@ -210,7 +212,7 @@ internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
FileSummary2 = dict:store(File, {ValidTotalSize - TotalSize - file_packing_adjustment_bytes(),
ContiguousTop1, Left, Right}, FileSummary),
- ok = mnesia:delete({rabbit_disk_queue, {Q, MsgId}}),
+ ok = mnesia:dirty_delete({rabbit_disk_queue, {Q, MsgId}}),
FileSummary2;
1 < RefCount ->
ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
@@ -229,9 +231,10 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
[] ->
% New message, lots to do
{ok, Offset} = file:position(CurHdl, cur),
+ io:format("Reported file position: ~p~n", [Offset]),
{ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}),
- true = ets:insert_new(FileDetail, {CurName, Offset, TotalSize}),
+ true = ets:insert(FileDetail, {CurName, Offset, TotalSize}),
{ok, {ValidTotalSize, ContiguousTop, Left, undefined}} = dict:find(CurName, FileSummary),
ValidTotalSize1 = ValidTotalSize + TotalSize + file_packing_adjustment_bytes(),
ContiguousTop1 = if Offset =:= ContiguousTop ->
@@ -243,7 +246,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
State # dqstate { file_summary = FileSummary2 });
[{MsgId, RefCount, File, Offset, TotalSize}] ->
% We already know about it, just update counter
- ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}),
+ true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}),
{ok, State}
end.
@@ -256,7 +259,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
fun() -> lists:foldl(fun (MsgId, Acc) ->
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
ets:lookup(MsgLocation, MsgId),
- ok = mnesia:write(rabbit_disk_queue, {{Q, MsgId}, false}, write),
+ ok = mnesia:write(rabbit_disk_queue, #dq_msg_loc { queue_and_msg_id = {Q, MsgId}, is_delivered = false}, write),
Acc or (CurName =:= File)
end, false, MsgIds)
end),
@@ -353,7 +356,7 @@ load_messages(Left, [File|Files],
0 -> {VMAcc, VTSAcc};
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
- true = ets:insert_new(FileDetail, {File, Offset, TotalSize}),
+ true = ets:insert(FileDetail, {File, Offset, TotalSize}),
{[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + file_packing_adjustment_bytes()}
end
end, {[], 0}, Messages),
@@ -373,15 +376,18 @@ load_messages(Left, [File|Files],
recover_crashed_compactions(_Files, []) ->
ok;
recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
+ GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end,
NonTmpRelatedFile = filename:rootname(TmpFile) ++ (?FILE_EXTENSION),
true = lists:member(NonTmpRelatedFile, Files),
% [{MsgId, TotalSize, FileOffset}]
{ok, UncorruptedMessagesTmp} = scan_file_for_valid_messages(form_filename(TmpFile)),
+ MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
- lists:foreach(fun ({MsgId, _TotalSize, _FileOffset}) ->
- 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}))
- end, UncorruptedMessagesTmp),
+ lists:foreach(fun (MsgId) ->
+ true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}))
+ end, MsgIdsTmp),
{ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
+ MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
%% 1) It's possible that everything in the tmp file is also in the main file
%% such that the main file is (prefix ++ tmpfile). This means that compaction
%% failed immediately prior to the final step of deleting the tmp file.
@@ -401,9 +407,6 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
%% As the main file has already been truncated, it should consist only of valid messages
%% Plan: Truncate the main file back to before any of the files in the tmp file and copy
%% them over again
- GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end,
- MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
- MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of
true -> % we're in case 1, 2 or 3 above. Just delete the tmp file
% note this also catches the case when the tmp file is empty
@@ -412,7 +415,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
% we're in case 4 above.
% check that everything in the main file is a valid message in mnesia
lists:foreach(fun (MsgId) ->
- 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}))
+ true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}))
end, MsgIds),
% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),