diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-10 15:42:40 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-10 15:42:40 +0100 |
| commit | 41376d3da57588fc5c3536e80edc048bbfaf6434 (patch) | |
| tree | 8d4d16a68268c8fddfef7c0f60291e7a52f33af7 /src | |
| parent | 056b4b013476b0a6d447c81ec5fb598048241ac9 (diff) | |
| download | rabbitmq-server-git-41376d3da57588fc5c3536e80edc048bbfaf6434.tar.gz | |
All but the compaction/GC done now. Not tested at all.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 224 |
1 files changed, 171 insertions, 53 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index b1aaa8dbf4..5dc2095334 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -31,7 +31,14 @@ -module(rabbit_disk_queue). --compile(export_all). %% CHANGE ME +-behaviour(gen_server). + +-export([start_link/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([publish/3, deliver/2, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). @@ -45,6 +52,8 @@ -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). +-define(SERVER, ?MODULE). + -record(dqstate, {msg_location, file_summary, file_detail, @@ -56,7 +65,32 @@ read_file_handles_limit }). -init(FileSizeLimit, ReadFileHandlesLimit) -> +%% ---- PUBLIC API ---- + +start_link(FileSizeLimit, ReadFileHandlesLimit) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [FileSizeLimit, ReadFileHandlesLimit], []). + +publish(Q, MsgId, Msg) when is_binary(Msg) -> + gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). + +deliver(Q, MsgId) -> + gen_server:call(?SERVER, {deliver, Q, MsgId}). + +ack(Q, MsgIds) when is_list(MsgIds) -> + gen_server:cast(?SERVER, {ack, Q, MsgIds}). + +tx_publish(MsgId, Msg) when is_binary(Msg) -> + gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}). + +tx_commit(Q, MsgIds) when is_list(MsgIds) -> + gen_server:call(?SERVER, {tx_commit, Q, MsgIds}). + +tx_cancel(MsgIds) when is_list(MsgIds) -> + gen_server:cast(?SERVER, {tx_cancel, MsgIds}). + +%% ---- GEN-SERVER INTERNAL API ---- + +init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), Dir = base_directory(), ok = filelib:ensure_dir(Dir), @@ -74,12 +108,54 @@ init(FileSizeLimit, ReadFileHandlesLimit) -> {ok, FileHdl} = file:open(form_filename(CurrentName), [append, raw, binary]), {ok, State1 # dqstate { current_file_handle = FileHdl }}. +handle_call({deliver, Q, MsgId}, _From, State) -> + {ok, {MsgBody, BodySize, Delivered}, State1} = internal_deliver(Q, MsgId, State), + {reply, {MsgBody, BodySize, Delivered}, State1}; +handle_call({tx_commit, Q, MsgIds}, _From, State) -> + {ok, State1} = internal_tx_commit(Q, MsgIds, State), + {reply, ok, State1}. + +handle_cast({publish, Q, MsgId, MsgBody}, State) -> + {ok, State1} = internal_publish(Q, MsgId, MsgBody, State), + {noreply, State1}; +handle_cast({ack, Q, MsgIds}, State) -> + {ok, State1} = lists:foldl(fun (MsgId, {ok, State2}) -> + internal_ack(Q, MsgId, State2) + end, State, MsgIds), + {noreply, State1}; +handle_cast({tx_publish, MsgId, MsgBody}, State) -> + {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), + {noreply, State1}; +handle_cast({tx_cancel, MsgIds}, State) -> + {ok, State1} = internal_tx_cancel(MsgIds, State), + {noreply, State1}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #dqstate { current_file_handle = FileHdl, + read_file_handles = {ReadHdls, _ReadHdlsAge} + }) -> + ok = file:sync(FileHdl), + ok = file:close(FileHdl), + dict:map(fun (_File, Hdl) -> + ok = file:close(Hdl) + end, ReadHdls). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ---- UTILITY FUNCTIONS ---- + form_filename(Name) -> filename:join(base_directory(), Name). base_directory() -> filename:join(mnesia:system_info(directory), "/rabbit_disk_queue/"). +file_packing_adjustment_bytes() -> + 1 + (2* (?INTEGER_SIZE_BYTES)). + %% ---- INTERNAL RAW FUNCTIONS ---- internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, @@ -88,9 +164,9 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, read_file_handles_limit = ReadFileHandlesLimit, read_file_handles = {ReadHdls, ReadHdlsAge} }) -> - [{MsgId, _RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), - if CurName =:= File -> - ok = file:sync(CurHdl) + [{MsgId, _RefCount, File, Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId), + if CurName =:= File -> ok = file:sync(CurHdl); + true -> ok end, % so this next bit implements an LRU for file handles. But it's a bit insane, and smells % of premature optimisation. So I might remove it and dump it overboard @@ -104,7 +180,7 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, ReadHdlsAge)}; _False -> {_Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge), - OldHdl = dict:find(OldFile, ReadHdls), + {ok, OldHdl} = dict:find(OldFile, ReadHdls), ok = file:close(OldHdl), ReadHdls2 = dict:erase(OldFile, ReadHdls), {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), gb_trees:enter(Now, File, ReadHdlsAge2)} @@ -114,9 +190,11 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))} end, % read the message - {ok, {MsgBody, BodySize, TotalSize}} = read_message_at_offset(FileHdl, Offset), - ok = mnesia:write(rabbit_disk_queue, {Q, MsgId}, write), - {ok, {MsgBody, BodySize, TotalSize}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. + {ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset), + [{{Q, MsgId}, Delivered}] = mnesia:read(rabbit_disk_queue, {Q, MsgId}, read), + ok = mnesia:write(rabbit_disk_queue, {{Q, MsgId}, true}, write), + {ok, {MsgBody, BodySize, Delivered}, + State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation, file_summary = FileSummary, @@ -130,48 +208,87 @@ internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation, true = ets:delete_object(FileDetail, {File, Offset, TotalSize}), {ok, {ValidTotalSize, ContiguousTop, Left, Right}} = dict:find(File, FileSummary), ContiguousTop1 = lists:min([ContiguousTop, Offset]), - FileSummary2 = dict:store(File, {ValidTotalSize - TotalSize - 1 - (2* (?INTEGER_SIZE_BYTES)), + FileSummary2 = dict:store(File, {ValidTotalSize - TotalSize - file_packing_adjustment_bytes(), ContiguousTop1, Left, Right}, FileSummary), ok = mnesia:delete({rabbit_disk_queue, {Q, MsgId}}), FileSummary2; - true -> + 1 < RefCount -> ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), FileSummary end, State1 = compact(File, State # dqstate { file_summary = FileSummary1 } ), {ok, State1}. -internal_publish(Q, MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation, +internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation, current_file_handle = CurHdl, current_file_name = CurName, file_summary = FileSummary, file_detail = FileDetail - } - ) when is_binary(MsgBody) -> - {ok, State1} = - case ets:lookup(MsgLocation, MsgId) of - [] -> - % New message, lots to do - {ok, Offset} = file:position(CurHdl, cur), - {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), - true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}), - true = ets:insert_new(FileDetail, {CurName, Offset, TotalSize}), - {ok, {ValidTotalSize, ContiguousTop, Left, undefined}} = dict:find(CurName, FileSummary), - ValidTotalSize1 = ValidTotalSize + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)), - ContiguousTop1 = if Offset =:= ContiguousTop -> - ValidTotalSize; % can't be any holes in this file - true -> ContiguousTop - end, - FileSummary2 = dict:store(CurName, {ValidTotalSize1, ContiguousTop1, Left, undefined}, FileSummary), - maybe_roll_to_new_file(Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)), - State # dqstate { file_summary = FileSummary2 }); - [{MsgId, RefCount, File, Offset, TotalSize}] -> - % We already know about it, just update counter - ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), - {ok, State} - end, - ok = mnesia:write(rabbit_disk_queue, {Q, MsgId}, write), - {ok, State1}. + }) -> + case ets:lookup(MsgLocation, MsgId) of + [] -> + % New message, lots to do + {ok, Offset} = file:position(CurHdl, cur), + {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), + true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}), + true = ets:insert_new(FileDetail, {CurName, Offset, TotalSize}), + {ok, {ValidTotalSize, ContiguousTop, Left, undefined}} = dict:find(CurName, FileSummary), + ValidTotalSize1 = ValidTotalSize + TotalSize + file_packing_adjustment_bytes(), + ContiguousTop1 = if Offset =:= ContiguousTop -> + ValidTotalSize; % can't be any holes in this file + true -> ContiguousTop + end, + FileSummary2 = dict:store(CurName, {ValidTotalSize1, ContiguousTop1, Left, undefined}, FileSummary), + maybe_roll_to_new_file(Offset + TotalSize + file_packing_adjustment_bytes(), + State # dqstate { file_summary = FileSummary2 }); + [{MsgId, RefCount, File, Offset, TotalSize}] -> + % We already know about it, just update counter + ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), + {ok, State} + end. + +internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, + current_file_handle = CurHdl, + current_file_name = CurName + }) -> + {atomic, Sync} + = mnesia:transaction( + fun() -> lists:foldl(fun (MsgId, Acc) -> + [{MsgId, _RefCount, File, _Offset, _TotalSize}] = + ets:lookup(MsgLocation, MsgId), + ok = mnesia:write(rabbit_disk_queue, {{Q, MsgId}, false}, write), + Acc or (CurName =:= File) + end, false, MsgIds) + end), + if Sync -> ok = file:sync(CurHdl); + true -> ok + end, + {ok, State}. + +internal_publish(Q, MsgId, MsgBody, State) -> + {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), + internal_tx_commit(Q, [MsgId], State1). + +internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation, + file_summary = FileSummary, + file_detail = FileDetail }) -> + FileSummary1 = + lists:foldl(fun (MsgId, FileSummary2) -> + [{MsgId, RefCount, File, Offset, TotalSize}] + = ets:lookup(MsgLocation, MsgId), + if 1 =:= RefCount -> + true = ets:delete(MsgLocation, MsgId), + true = ets:delete_object(FileDetail, {File, Offset, TotalSize}), + {ok, {ValidTotalSize, ContiguousTop, Left, Right}} = dict:find(File, FileSummary2), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + dict:store(File, {ValidTotalSize - TotalSize - file_packing_adjustment_bytes(), + ContiguousTop1, Left, Right}, FileSummary2); + 1 < RefCount -> + ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), + FileSummary2 + end + end, FileSummary, MsgIds), + {ok, State #dqstate { file_summary = FileSummary1 }}. %% ---- ROLLING OVER THE APPEND FILE ---- @@ -188,7 +305,7 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi NextNum = CurNum + 1, NextName = integer_to_list(NextNum) ++ (?FILE_EXTENSION), [] = ets:lookup(FileDetail, NextName), - {ok, NextHdl} = file:open(form_filename(NextNum), [write, raw, binary]), + {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary]), {ok, {ValidTotalSize, ContiguousTop, Left, undefined}} = dict:find(CurName, FileSummary), FileSummary1 = dict:store(CurName, {ValidTotalSize, ContiguousTop, Left, NextName}, FileSummary), {ok, State # dqstate { current_file_name = NextName, @@ -212,7 +329,7 @@ load_from_disk(State) -> {Files, TmpFiles} = get_disk_queue_files(), ok = recover_crashed_compactions(Files, TmpFiles), % There should be no more tmp files now, so go ahead and load the whole lot - {ok, State1 = #dqstate{ msg_location = MsgLocation }} = load_messages(undefined, Files, State), + (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), % Finally, check there is nothing in mnesia which we haven't loaded true = lists:all(fun ({_Q, MsgId}) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, mnesia:all_keys(rabbit_disk_queue)), @@ -232,12 +349,12 @@ load_messages(Left, [File|Files], {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) of + case length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read)) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), true = ets:insert_new(FileDetail, {File, Offset, TotalSize}), - {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES))} + {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + file_packing_adjustment_bytes()} end end, {[], 0}, Messages), % foldl reverses lists and find_contiguous_block_prefix needs elems in the same order @@ -262,7 +379,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> {ok, UncorruptedMessagesTmp} = scan_file_for_valid_messages(form_filename(TmpFile)), % all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out lists:foreach(fun ({MsgId, _TotalSize, _FileOffset}) -> - 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) + 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read)) end, UncorruptedMessagesTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), %% 1) It's possible that everything in the tmp file is also in the main file @@ -295,7 +412,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> % we're in case 4 above. % check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) + 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read)) end, MsgIds), % The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), @@ -309,7 +426,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> % extending truncate. % Remember the head of the list will be the highest entry in the file [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)), + TmpSize = TmpTopOffset + TmpTopTotalSize + file_packing_adjustment_bytes(), ExpectedAbsPos = Top + TmpSize, {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), ok = file:truncate(MainHdl), % and now extend the main file as big as necessary in a single move @@ -336,7 +453,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> find_contiguous_block_prefix([]) -> {0, []}; find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) -> case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of - {ok, Acc} -> {Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)), lists:reverse(Acc)}; + {ok, Acc} -> {Offset + TotalSize + file_packing_adjustment_bytes(), lists:reverse(Acc)}; Res -> Res end. find_contiguous_block_prefix([], 0, Acc) -> @@ -344,7 +461,7 @@ find_contiguous_block_prefix([], 0, Acc) -> find_contiguous_block_prefix([], _N, _Acc) -> {0, []}; find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], ExpectedOffset, Acc) - when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) -> + when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) -> %% Can't use file_packing_adjustment_bytes() find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]); find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) -> find_contiguous_block_prefix(List). @@ -400,7 +517,7 @@ read_message_at_offset(FileHdl, Offset) -> scan_file_for_valid_messages(File) -> {ok, Hdl} = file:open(File, [raw, binary, read]), Valid = scan_file_for_valid_messages(Hdl, 0, []), - file:close(Hdl), % if something really bad's happened, the close could fail, but ignore + _ = file:close(Hdl), % if something really bad's happened, the close could fail, but ignore Valid. scan_file_for_valid_messages(FileHdl, Offset, Acc) -> @@ -415,12 +532,13 @@ scan_file_for_valid_messages(FileHdl, Offset, Acc) -> read_next_file_entry(FileHdl, Offset) -> - case file:read(FileHdl, 2 * (?INTEGER_SIZE_BYTES)) of + TwoIntegers = 2 * (?INTEGER_SIZE_BYTES), + case file:read(FileHdl, TwoIntegers) of {ok, <<TotalSize:(?INTEGER_SIZE_BITS), MsgIdBinSize:(?INTEGER_SIZE_BITS)>>} -> case {TotalSize =:= 0, MsgIdBinSize =:= 0} of {true, _} -> {ok, eof}; %% Nothing we can do other than stop {false, true} -> %% current message corrupted, try skipping past it - ExpectedAbsPos = Offset + (2* (?INTEGER_SIZE_BYTES)) + TotalSize + 1, + ExpectedAbsPos = Offset + file_packing_adjustment_bytes() + TotalSize, case file:position(FileHdl, {cur, TotalSize + 1}) of {ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}}; {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up @@ -429,15 +547,15 @@ read_next_file_entry(FileHdl, Offset) -> {false, false} -> %% all good, let's continue case file:read(FileHdl, MsgIdBinSize) of {ok, <<MsgId:MsgIdBinSize/binary>>} -> - ExpectedAbsPos = Offset + (2 * (?INTEGER_SIZE_BYTES)) + TotalSize, + ExpectedAbsPos = Offset + TwoIntegers + TotalSize, case file:position(FileHdl, {cur, TotalSize - MsgIdBinSize}) of {ok, ExpectedAbsPos} -> case file:read(FileHdl, 1) of {ok, <<(?WRITE_OK):(?WRITE_OK_SIZE_BITS)>>} -> {ok, {ok, binary_to_term(MsgId), TotalSize, - Offset + (2* (?INTEGER_SIZE_BYTES)) + TotalSize + 1}}; + Offset + file_packing_adjustment_bytes() + TotalSize}}; {ok, _SomeOtherData} -> - {ok, {corrupted, Offset + (2* (?INTEGER_SIZE_BYTES)) + TotalSize + 1}}; + {ok, {corrupted, Offset + file_packing_adjustment_bytes() + TotalSize}}; KO -> KO end; {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up |
