summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-10 15:42:40 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-10 15:42:40 +0100
commit41376d3da57588fc5c3536e80edc048bbfaf6434 (patch)
tree8d4d16a68268c8fddfef7c0f60291e7a52f33af7 /src
parent056b4b013476b0a6d447c81ec5fb598048241ac9 (diff)
downloadrabbitmq-server-git-41376d3da57588fc5c3536e80edc048bbfaf6434.tar.gz
All but the compaction/GC done now. Not tested at all.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl224
1 files changed, 171 insertions, 53 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index b1aaa8dbf4..5dc2095334 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -31,7 +31,14 @@
-module(rabbit_disk_queue).
--compile(export_all). %% CHANGE ME
+-behaviour(gen_server).
+
+-export([start_link/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([publish/3, deliver/2, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]).
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
@@ -45,6 +52,8 @@
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
+-define(SERVER, ?MODULE).
+
-record(dqstate, {msg_location,
file_summary,
file_detail,
@@ -56,7 +65,32 @@
read_file_handles_limit
}).
-init(FileSizeLimit, ReadFileHandlesLimit) ->
+%% ---- PUBLIC API ----
+
+start_link(FileSizeLimit, ReadFileHandlesLimit) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [FileSizeLimit, ReadFileHandlesLimit], []).
+
+publish(Q, MsgId, Msg) when is_binary(Msg) ->
+ gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
+
+deliver(Q, MsgId) ->
+ gen_server:call(?SERVER, {deliver, Q, MsgId}).
+
+ack(Q, MsgIds) when is_list(MsgIds) ->
+ gen_server:cast(?SERVER, {ack, Q, MsgIds}).
+
+tx_publish(MsgId, Msg) when is_binary(Msg) ->
+ gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}).
+
+tx_commit(Q, MsgIds) when is_list(MsgIds) ->
+ gen_server:call(?SERVER, {tx_commit, Q, MsgIds}).
+
+tx_cancel(MsgIds) when is_list(MsgIds) ->
+ gen_server:cast(?SERVER, {tx_cancel, MsgIds}).
+
+%% ---- GEN-SERVER INTERNAL API ----
+
+init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
Dir = base_directory(),
ok = filelib:ensure_dir(Dir),
@@ -74,12 +108,54 @@ init(FileSizeLimit, ReadFileHandlesLimit) ->
{ok, FileHdl} = file:open(form_filename(CurrentName), [append, raw, binary]),
{ok, State1 # dqstate { current_file_handle = FileHdl }}.
+handle_call({deliver, Q, MsgId}, _From, State) ->
+ {ok, {MsgBody, BodySize, Delivered}, State1} = internal_deliver(Q, MsgId, State),
+ {reply, {MsgBody, BodySize, Delivered}, State1};
+handle_call({tx_commit, Q, MsgIds}, _From, State) ->
+ {ok, State1} = internal_tx_commit(Q, MsgIds, State),
+ {reply, ok, State1}.
+
+handle_cast({publish, Q, MsgId, MsgBody}, State) ->
+ {ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
+ {noreply, State1};
+handle_cast({ack, Q, MsgIds}, State) ->
+ {ok, State1} = lists:foldl(fun (MsgId, {ok, State2}) ->
+ internal_ack(Q, MsgId, State2)
+ end, State, MsgIds),
+ {noreply, State1};
+handle_cast({tx_publish, MsgId, MsgBody}, State) ->
+ {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
+ {noreply, State1};
+handle_cast({tx_cancel, MsgIds}, State) ->
+ {ok, State1} = internal_tx_cancel(MsgIds, State),
+ {noreply, State1}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #dqstate { current_file_handle = FileHdl,
+ read_file_handles = {ReadHdls, _ReadHdlsAge}
+ }) ->
+ ok = file:sync(FileHdl),
+ ok = file:close(FileHdl),
+ dict:map(fun (_File, Hdl) ->
+ ok = file:close(Hdl)
+ end, ReadHdls).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ---- UTILITY FUNCTIONS ----
+
form_filename(Name) ->
filename:join(base_directory(), Name).
base_directory() ->
filename:join(mnesia:system_info(directory), "/rabbit_disk_queue/").
+file_packing_adjustment_bytes() ->
+ 1 + (2* (?INTEGER_SIZE_BYTES)).
+
%% ---- INTERNAL RAW FUNCTIONS ----
internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
@@ -88,9 +164,9 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
read_file_handles_limit = ReadFileHandlesLimit,
read_file_handles = {ReadHdls, ReadHdlsAge}
}) ->
- [{MsgId, _RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId),
- if CurName =:= File ->
- ok = file:sync(CurHdl)
+ [{MsgId, _RefCount, File, Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId),
+ if CurName =:= File -> ok = file:sync(CurHdl);
+ true -> ok
end,
% so this next bit implements an LRU for file handles. But it's a bit insane, and smells
% of premature optimisation. So I might remove it and dump it overboard
@@ -104,7 +180,7 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
{Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, ReadHdlsAge)};
_False ->
{_Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge),
- OldHdl = dict:find(OldFile, ReadHdls),
+ {ok, OldHdl} = dict:find(OldFile, ReadHdls),
ok = file:close(OldHdl),
ReadHdls2 = dict:erase(OldFile, ReadHdls),
{Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), gb_trees:enter(Now, File, ReadHdlsAge2)}
@@ -114,9 +190,11 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
{Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))}
end,
% read the message
- {ok, {MsgBody, BodySize, TotalSize}} = read_message_at_offset(FileHdl, Offset),
- ok = mnesia:write(rabbit_disk_queue, {Q, MsgId}, write),
- {ok, {MsgBody, BodySize, TotalSize}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
+ {ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset),
+ [{{Q, MsgId}, Delivered}] = mnesia:read(rabbit_disk_queue, {Q, MsgId}, read),
+ ok = mnesia:write(rabbit_disk_queue, {{Q, MsgId}, true}, write),
+ {ok, {MsgBody, BodySize, Delivered},
+ State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
file_summary = FileSummary,
@@ -130,48 +208,87 @@ internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
true = ets:delete_object(FileDetail, {File, Offset, TotalSize}),
{ok, {ValidTotalSize, ContiguousTop, Left, Right}} = dict:find(File, FileSummary),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- FileSummary2 = dict:store(File, {ValidTotalSize - TotalSize - 1 - (2* (?INTEGER_SIZE_BYTES)),
+ FileSummary2 = dict:store(File, {ValidTotalSize - TotalSize - file_packing_adjustment_bytes(),
ContiguousTop1, Left, Right}, FileSummary),
ok = mnesia:delete({rabbit_disk_queue, {Q, MsgId}}),
FileSummary2;
- true ->
+ 1 < RefCount ->
ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
FileSummary
end,
State1 = compact(File, State # dqstate { file_summary = FileSummary1 } ),
{ok, State1}.
-internal_publish(Q, MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation,
+internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation,
current_file_handle = CurHdl,
current_file_name = CurName,
file_summary = FileSummary,
file_detail = FileDetail
- }
- ) when is_binary(MsgBody) ->
- {ok, State1} =
- case ets:lookup(MsgLocation, MsgId) of
- [] ->
- % New message, lots to do
- {ok, Offset} = file:position(CurHdl, cur),
- {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
- true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}),
- true = ets:insert_new(FileDetail, {CurName, Offset, TotalSize}),
- {ok, {ValidTotalSize, ContiguousTop, Left, undefined}} = dict:find(CurName, FileSummary),
- ValidTotalSize1 = ValidTotalSize + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)),
- ContiguousTop1 = if Offset =:= ContiguousTop ->
- ValidTotalSize; % can't be any holes in this file
- true -> ContiguousTop
- end,
- FileSummary2 = dict:store(CurName, {ValidTotalSize1, ContiguousTop1, Left, undefined}, FileSummary),
- maybe_roll_to_new_file(Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)),
- State # dqstate { file_summary = FileSummary2 });
- [{MsgId, RefCount, File, Offset, TotalSize}] ->
- % We already know about it, just update counter
- ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}),
- {ok, State}
- end,
- ok = mnesia:write(rabbit_disk_queue, {Q, MsgId}, write),
- {ok, State1}.
+ }) ->
+ case ets:lookup(MsgLocation, MsgId) of
+ [] ->
+ % New message, lots to do
+ {ok, Offset} = file:position(CurHdl, cur),
+ {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
+ true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}),
+ true = ets:insert_new(FileDetail, {CurName, Offset, TotalSize}),
+ {ok, {ValidTotalSize, ContiguousTop, Left, undefined}} = dict:find(CurName, FileSummary),
+ ValidTotalSize1 = ValidTotalSize + TotalSize + file_packing_adjustment_bytes(),
+ ContiguousTop1 = if Offset =:= ContiguousTop ->
+ ValidTotalSize; % can't be any holes in this file
+ true -> ContiguousTop
+ end,
+ FileSummary2 = dict:store(CurName, {ValidTotalSize1, ContiguousTop1, Left, undefined}, FileSummary),
+ maybe_roll_to_new_file(Offset + TotalSize + file_packing_adjustment_bytes(),
+ State # dqstate { file_summary = FileSummary2 });
+ [{MsgId, RefCount, File, Offset, TotalSize}] ->
+ % We already know about it, just update counter
+ ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}),
+ {ok, State}
+ end.
+
+internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
+ current_file_handle = CurHdl,
+ current_file_name = CurName
+ }) ->
+ {atomic, Sync}
+ = mnesia:transaction(
+ fun() -> lists:foldl(fun (MsgId, Acc) ->
+ [{MsgId, _RefCount, File, _Offset, _TotalSize}] =
+ ets:lookup(MsgLocation, MsgId),
+ ok = mnesia:write(rabbit_disk_queue, {{Q, MsgId}, false}, write),
+ Acc or (CurName =:= File)
+ end, false, MsgIds)
+ end),
+ if Sync -> ok = file:sync(CurHdl);
+ true -> ok
+ end,
+ {ok, State}.
+
+internal_publish(Q, MsgId, MsgBody, State) ->
+ {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
+ internal_tx_commit(Q, [MsgId], State1).
+
+internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation,
+ file_summary = FileSummary,
+ file_detail = FileDetail }) ->
+ FileSummary1 =
+ lists:foldl(fun (MsgId, FileSummary2) ->
+ [{MsgId, RefCount, File, Offset, TotalSize}]
+ = ets:lookup(MsgLocation, MsgId),
+ if 1 =:= RefCount ->
+ true = ets:delete(MsgLocation, MsgId),
+ true = ets:delete_object(FileDetail, {File, Offset, TotalSize}),
+ {ok, {ValidTotalSize, ContiguousTop, Left, Right}} = dict:find(File, FileSummary2),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ dict:store(File, {ValidTotalSize - TotalSize - file_packing_adjustment_bytes(),
+ ContiguousTop1, Left, Right}, FileSummary2);
+ 1 < RefCount ->
+ ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
+ FileSummary2
+ end
+ end, FileSummary, MsgIds),
+ {ok, State #dqstate { file_summary = FileSummary1 }}.
%% ---- ROLLING OVER THE APPEND FILE ----
@@ -188,7 +305,7 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi
NextNum = CurNum + 1,
NextName = integer_to_list(NextNum) ++ (?FILE_EXTENSION),
[] = ets:lookup(FileDetail, NextName),
- {ok, NextHdl} = file:open(form_filename(NextNum), [write, raw, binary]),
+ {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary]),
{ok, {ValidTotalSize, ContiguousTop, Left, undefined}} = dict:find(CurName, FileSummary),
FileSummary1 = dict:store(CurName, {ValidTotalSize, ContiguousTop, Left, NextName}, FileSummary),
{ok, State # dqstate { current_file_name = NextName,
@@ -212,7 +329,7 @@ load_from_disk(State) ->
{Files, TmpFiles} = get_disk_queue_files(),
ok = recover_crashed_compactions(Files, TmpFiles),
% There should be no more tmp files now, so go ahead and load the whole lot
- {ok, State1 = #dqstate{ msg_location = MsgLocation }} = load_messages(undefined, Files, State),
+ (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State),
% Finally, check there is nothing in mnesia which we haven't loaded
true = lists:all(fun ({_Q, MsgId}) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
mnesia:all_keys(rabbit_disk_queue)),
@@ -232,12 +349,12 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) of
+ case length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
true = ets:insert_new(FileDetail, {File, Offset, TotalSize}),
- {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES))}
+ {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + file_packing_adjustment_bytes()}
end
end, {[], 0}, Messages),
% foldl reverses lists and find_contiguous_block_prefix needs elems in the same order
@@ -262,7 +379,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
{ok, UncorruptedMessagesTmp} = scan_file_for_valid_messages(form_filename(TmpFile)),
% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
lists:foreach(fun ({MsgId, _TotalSize, _FileOffset}) ->
- 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}))
+ 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read))
end, UncorruptedMessagesTmp),
{ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
%% 1) It's possible that everything in the tmp file is also in the main file
@@ -295,7 +412,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
% we're in case 4 above.
% check that everything in the main file is a valid message in mnesia
lists:foreach(fun (MsgId) ->
- 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}))
+ 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read))
end, MsgIds),
% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
@@ -309,7 +426,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
% extending truncate.
% Remember the head of the list will be the highest entry in the file
[{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)),
+ TmpSize = TmpTopOffset + TmpTopTotalSize + file_packing_adjustment_bytes(),
ExpectedAbsPos = Top + TmpSize,
{ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}),
ok = file:truncate(MainHdl), % and now extend the main file as big as necessary in a single move
@@ -336,7 +453,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
find_contiguous_block_prefix([]) -> {0, []};
find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) ->
case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of
- {ok, Acc} -> {Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)), lists:reverse(Acc)};
+ {ok, Acc} -> {Offset + TotalSize + file_packing_adjustment_bytes(), lists:reverse(Acc)};
Res -> Res
end.
find_contiguous_block_prefix([], 0, Acc) ->
@@ -344,7 +461,7 @@ find_contiguous_block_prefix([], 0, Acc) ->
find_contiguous_block_prefix([], _N, _Acc) ->
{0, []};
find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], ExpectedOffset, Acc)
- when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) ->
+ when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) -> %% Can't use file_packing_adjustment_bytes()
find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]);
find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) ->
find_contiguous_block_prefix(List).
@@ -400,7 +517,7 @@ read_message_at_offset(FileHdl, Offset) ->
scan_file_for_valid_messages(File) ->
{ok, Hdl} = file:open(File, [raw, binary, read]),
Valid = scan_file_for_valid_messages(Hdl, 0, []),
- file:close(Hdl), % if something really bad's happened, the close could fail, but ignore
+ _ = file:close(Hdl), % if something really bad's happened, the close could fail, but ignore
Valid.
scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
@@ -415,12 +532,13 @@ scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
read_next_file_entry(FileHdl, Offset) ->
- case file:read(FileHdl, 2 * (?INTEGER_SIZE_BYTES)) of
+ TwoIntegers = 2 * (?INTEGER_SIZE_BYTES),
+ case file:read(FileHdl, TwoIntegers) of
{ok, <<TotalSize:(?INTEGER_SIZE_BITS), MsgIdBinSize:(?INTEGER_SIZE_BITS)>>} ->
case {TotalSize =:= 0, MsgIdBinSize =:= 0} of
{true, _} -> {ok, eof}; %% Nothing we can do other than stop
{false, true} -> %% current message corrupted, try skipping past it
- ExpectedAbsPos = Offset + (2* (?INTEGER_SIZE_BYTES)) + TotalSize + 1,
+ ExpectedAbsPos = Offset + file_packing_adjustment_bytes() + TotalSize,
case file:position(FileHdl, {cur, TotalSize + 1}) of
{ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}};
{ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up
@@ -429,15 +547,15 @@ read_next_file_entry(FileHdl, Offset) ->
{false, false} -> %% all good, let's continue
case file:read(FileHdl, MsgIdBinSize) of
{ok, <<MsgId:MsgIdBinSize/binary>>} ->
- ExpectedAbsPos = Offset + (2 * (?INTEGER_SIZE_BYTES)) + TotalSize,
+ ExpectedAbsPos = Offset + TwoIntegers + TotalSize,
case file:position(FileHdl, {cur, TotalSize - MsgIdBinSize}) of
{ok, ExpectedAbsPos} ->
case file:read(FileHdl, 1) of
{ok, <<(?WRITE_OK):(?WRITE_OK_SIZE_BITS)>>} ->
{ok, {ok, binary_to_term(MsgId), TotalSize,
- Offset + (2* (?INTEGER_SIZE_BYTES)) + TotalSize + 1}};
+ Offset + file_packing_adjustment_bytes() + TotalSize}};
{ok, _SomeOtherData} ->
- {ok, {corrupted, Offset + (2* (?INTEGER_SIZE_BYTES)) + TotalSize + 1}};
+ {ok, {corrupted, Offset + file_packing_adjustment_bytes() + TotalSize}};
KO -> KO
end;
{ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up