diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-10 17:21:23 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-10 17:21:23 +0100 |
| commit | 3b5a7e084e11ca53b8c25ee36c2ffb071aab9ac7 (patch) | |
| tree | 47bfb9c519e2f5eea195807114cddb190b5e053b /src | |
| parent | a1c140505e6bd32d57fabc37597981bc34f11e53 (diff) | |
| download | rabbitmq-server-git-3b5a7e084e11ca53b8c25ee36c2ffb071aab9ac7.tar.gz | |
many bugs fixed. Still no compaction. However, performance is horrible, because the test which established ets:insert_new did what we wanted was actually flawed, and we can't do that, and ets:insert is not fast enough in a bag with many similar keys. What I want is to be able to say "yes, just insert this, I guarantee that whilst there are other equal keys in this bag, you don't need to go and try to find matching values". Hmph
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 47 |
1 files changed, 25 insertions, 22 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index dbd81215f6..d50006836b 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -74,7 +74,7 @@ publish(Q, MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). deliver(Q, MsgId) -> - gen_server:call(?SERVER, {deliver, Q, MsgId}). + gen_server:call(?SERVER, {deliver, Q, MsgId}, infinity). ack(Q, MsgIds) when is_list(MsgIds) -> gen_server:cast(?SERVER, {ack, Q, MsgIds}). @@ -83,7 +83,7 @@ tx_publish(MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}). tx_commit(Q, MsgIds) when is_list(MsgIds) -> - gen_server:call(?SERVER, {tx_commit, Q, MsgIds}). + gen_server:call(?SERVER, {tx_commit, Q, MsgIds}, infinity). tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server:cast(?SERVER, {tx_cancel, MsgIds}). @@ -92,11 +92,12 @@ tx_cancel(MsgIds) when is_list(MsgIds) -> init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), + InitName = "0" ++ (?FILE_EXTENSION), State = #dqstate { msg_location = ets:new((?MSG_LOC_ETS_NAME), [set, private]), - file_summary = dict:new(), + file_summary = dict:store(InitName, {0, 0, undefined, undefined}, dict:new()), file_detail = ets:new((?FILE_DETAIL_ETS_NAME), [bag, private]), current_file_num = 0, - current_file_name = "0" ++ (?FILE_EXTENSION), + current_file_name = InitName, current_file_handle = undefined, file_size_limit = FileSizeLimit, read_file_handles = {dict:new(), gb_trees:empty()}, @@ -121,7 +122,7 @@ handle_cast({publish, Q, MsgId, MsgBody}, State) -> handle_cast({ack, Q, MsgIds}, State) -> {ok, State1} = lists:foldl(fun (MsgId, {ok, State2}) -> internal_ack(Q, MsgId, State2) - end, State, MsgIds), + end, {ok, State}, MsgIds), {noreply, State1}; handle_cast({tx_publish, MsgId, MsgBody}, State) -> {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), @@ -136,11 +137,12 @@ handle_info(_Info, State) -> terminate(_Reason, #dqstate { current_file_handle = FileHdl, read_file_handles = {ReadHdls, _ReadHdlsAge} }) -> + io:format("DYING~n", []), ok = file:sync(FileHdl), ok = file:close(FileHdl), - dict:map(fun (_File, Hdl) -> - ok = file:close(Hdl) - end, ReadHdls). + dict:fold(fun (_File, Hdl, _Acc) -> + file:close(Hdl) + end, ok, ReadHdls). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -191,8 +193,8 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, end, % read the message {ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset), - [{{Q, MsgId}, Delivered}] = mnesia:read(rabbit_disk_queue, {Q, MsgId}, read), - ok = mnesia:write(rabbit_disk_queue, {{Q, MsgId}, true}, write), + [#dq_msg_loc {queue_and_msg_id = {Q, MsgId}, is_delivered = Delivered}] = mnesia:dirty_read(rabbit_disk_queue, {Q, MsgId}), + ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc {queue_and_msg_id = {Q, MsgId}, is_delivered = true}), {ok, {MsgBody, BodySize, Delivered}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. @@ -210,7 +212,7 @@ internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation, ContiguousTop1 = lists:min([ContiguousTop, Offset]), FileSummary2 = dict:store(File, {ValidTotalSize - TotalSize - file_packing_adjustment_bytes(), ContiguousTop1, Left, Right}, FileSummary), - ok = mnesia:delete({rabbit_disk_queue, {Q, MsgId}}), + ok = mnesia:dirty_delete({rabbit_disk_queue, {Q, MsgId}}), FileSummary2; 1 < RefCount -> ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), @@ -229,9 +231,10 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio [] -> % New message, lots to do {ok, Offset} = file:position(CurHdl, cur), + io:format("Reported file position: ~p~n", [Offset]), {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}), - true = ets:insert_new(FileDetail, {CurName, Offset, TotalSize}), + true = ets:insert(FileDetail, {CurName, Offset, TotalSize}), {ok, {ValidTotalSize, ContiguousTop, Left, undefined}} = dict:find(CurName, FileSummary), ValidTotalSize1 = ValidTotalSize + TotalSize + file_packing_adjustment_bytes(), ContiguousTop1 = if Offset =:= ContiguousTop -> @@ -243,7 +246,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio State # dqstate { file_summary = FileSummary2 }); [{MsgId, RefCount, File, Offset, TotalSize}] -> % We already know about it, just update counter - ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), + true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), {ok, State} end. @@ -256,7 +259,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, fun() -> lists:foldl(fun (MsgId, Acc) -> [{MsgId, _RefCount, File, _Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId), - ok = mnesia:write(rabbit_disk_queue, {{Q, MsgId}, false}, write), + ok = mnesia:write(rabbit_disk_queue, #dq_msg_loc { queue_and_msg_id = {Q, MsgId}, is_delivered = false}, write), Acc or (CurName =:= File) end, false, MsgIds) end), @@ -353,7 +356,7 @@ load_messages(Left, [File|Files], 0 -> {VMAcc, VTSAcc}; RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), - true = ets:insert_new(FileDetail, {File, Offset, TotalSize}), + true = ets:insert(FileDetail, {File, Offset, TotalSize}), {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + file_packing_adjustment_bytes()} end end, {[], 0}, Messages), @@ -373,15 +376,18 @@ load_messages(Left, [File|Files], recover_crashed_compactions(_Files, []) -> ok; recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> + GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end, NonTmpRelatedFile = filename:rootname(TmpFile) ++ (?FILE_EXTENSION), true = lists:member(NonTmpRelatedFile, Files), % [{MsgId, TotalSize, FileOffset}] {ok, UncorruptedMessagesTmp} = scan_file_for_valid_messages(form_filename(TmpFile)), + MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), % all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out - lists:foreach(fun ({MsgId, _TotalSize, _FileOffset}) -> - 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) - end, UncorruptedMessagesTmp), + lists:foreach(fun (MsgId) -> + true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) + end, MsgIdsTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + MsgIds = lists:map(GrabMsgId, UncorruptedMessages), %% 1) It's possible that everything in the tmp file is also in the main file %% such that the main file is (prefix ++ tmpfile). This means that compaction %% failed immediately prior to the final step of deleting the tmp file. @@ -401,9 +407,6 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> %% As the main file has already been truncated, it should consist only of valid messages %% Plan: Truncate the main file back to before any of the files in the tmp file and copy %% them over again - GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end, - MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), - MsgIds = lists:map(GrabMsgId, UncorruptedMessages), case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of true -> % we're in case 1, 2 or 3 above. Just delete the tmp file % note this also catches the case when the tmp file is empty @@ -412,7 +415,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> % we're in case 4 above. % check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) + true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) end, MsgIds), % The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), |
