summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-17 13:03:17 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-17 13:03:17 +0100
commitb1f86bacbee9d1d3a61dd8f03d7ea3b47484ed52 (patch)
treef297825a1f66d3c6c885dfb0e846c9ae1705d5ac
parentc196178600ced8eec833902be30309d076f98e4a (diff)
downloadrabbitmq-server-git-b1f86bacbee9d1d3a61dd8f03d7ea3b47484ed52.tar.gz
The use of the in-memory run length queue in disk_only queue is considered a show stopper, and rightly so. I personally don't like the idea of adding additional tokens to the disk queue to indicated queue switch because it can substantially increase the number of OS calls and writes and reads from disk and, eg, getting queue length right and memory size right is made a fair bit more complex. So abandon the two queues idea.
Instead, store the persistent flag in the stop byte on disk. Then on startup, the persistent flag turns up in the MsgLocations ets table. This is all done and all tests pass. The next stage is that on start up, go through each queue and just wipe out non-persistent messages. This should be pretty fast. Then call the shuffle_up function as is currently being done. This will eliminate the gaps in sequences. This really should be enough. Then the mixed_queue can go back to just talking about a single queue.
-rw-r--r--src/rabbit_disk_queue.erl152
-rw-r--r--src/rabbit_misc.erl8
-rw-r--r--src/rabbit_mixed_queue.erl5
3 files changed, 98 insertions, 67 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 868eab4a5e..4eef884f72 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -53,7 +53,8 @@
-include("rabbit.hrl").
-define(WRITE_OK_SIZE_BITS, 8).
--define(WRITE_OK, 255).
+-define(WRITE_OK_TRANSIENT, 255).
+-define(WRITE_OK_PERSISTENT, 254).
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
@@ -101,8 +102,8 @@
%% The components:
%%
-%% MsgLocation: this is a dets table which contains:
-%% {MsgId, RefCount, File, Offset, TotalSize}
+%% MsgLocation: this is a (d)ets table which contains:
+%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent}
%% FileSummary: this is an ets table which contains:
%% {File, ValidTotalSize, ContiguousTop, Left, Right}
%% Sequences: this is an ets table which contains:
@@ -393,7 +394,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
?FILE_EXTENSION_DETS)},
{min_no_slots, 1024*1024},
%% man says this should be <= 32M. But it works...
- {max_no_slots, 1024*1024*1024},
+ {max_no_slots, 30*1024*1024},
{type, set}
]),
@@ -509,8 +510,8 @@ handle_cast({ack, Q, MsgSeqIds}, State) ->
handle_cast({auto_ack_next_message, Q}, State) ->
{ok, State1} = internal_auto_ack(Q, State),
noreply(State1);
-handle_cast({tx_publish, Message = #basic_message { guid = MsgId }}, State) ->
- {ok, State1} = internal_tx_publish(MsgId, Message, State),
+handle_cast({tx_publish, Message}, State) ->
+ {ok, State1} = internal_tx_publish(Message, State),
noreply(State1);
handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
@@ -636,8 +637,8 @@ memory_use(#dqstate { operation_mode = disk_only,
(WordSize * (ets:info(FileSummary, memory) +
ets:info(Cache, memory) +
ets:info(Sequences, memory))) +
- round(MnesiaSizeEstimate) +
- round(MsgLocationSizeEstimate).
+ rabbit_misc:ceil(MnesiaSizeEstimate) +
+ rabbit_misc:ceil(MsgLocationSizeEstimate).
to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) ->
State;
@@ -872,7 +873,8 @@ insert_into_cache(Message = #basic_message { guid = MsgId },
true -> 0;
false -> 1
end,
- true = ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}),
+ true =
+ ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}),
ok.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -890,8 +892,9 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
{Q, ReadSeqId+1, WriteSeqId}),
{ok,
case Result of
- {MsgId, Delivered, {MsgId, ReadSeqId}} ->
- {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining};
+ {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}} ->
+ {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId},
+ Remaining};
{Message, BodySize, Delivered, {MsgId, ReadSeqId}} ->
{Message, BodySize, Delivered, {MsgId, ReadSeqId},
Remaining}
@@ -927,7 +930,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] =
mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
- [{MsgId, RefCount, File, Offset, TotalSize}] =
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
dets_ets_lookup(State, MsgId),
ok =
if FakeDeliver orelse Delivered -> ok;
@@ -940,12 +943,13 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
case fetch_and_increment_cache(MsgId, State) of
not_found ->
{FileHdl, State1} = get_read_handle(File, Offset, State),
- {ok, {MsgBody, BodySize}} =
+ {ok, {MsgBody, IsPersistent, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
- Message = bin_to_msg(MsgBody),
+ #basic_message { is_persistent=IsPersistent, guid=MsgId } =
+ Message = bin_to_msg(MsgBody),
ok = if RefCount > 1 orelse ForceInCache ->
- insert_into_cache(Message, BodySize,
- ForceInCache, State1);
+ insert_into_cache
+ (Message, BodySize, ForceInCache, State1);
true -> ok
%% it's not in the cache and we only
%% have 1 queue with the message. So
@@ -959,13 +963,14 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
State}
end;
false ->
- {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State}
+ {ok, {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}}, State}
end.
internal_auto_ack(Q, State) ->
case internal_deliver(Q, false, true, State) of
{ok, empty, State1} -> {ok, State1};
- {ok, {_MsgId, _Delivered, MsgSeqId, _Remaining}, State1} ->
+ {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining},
+ State1} ->
remove_messages(Q, [MsgSeqId], true, State1)
end.
@@ -985,7 +990,7 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
Files =
lists:foldl(
fun ({MsgId, SeqId}, Files1) ->
- [{MsgId, RefCount, File, Offset, TotalSize}] =
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
dets_ets_lookup(State, MsgId),
Files2 =
case RefCount of
@@ -1007,8 +1012,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
ok = dets_ets_insert(
- State, {MsgId, RefCount - 1,
- File, Offset, TotalSize}),
+ State, {MsgId, RefCount - 1, File, Offset,
+ TotalSize, IsPersistent}),
Files1
end,
ok = case MnesiaDelete of
@@ -1023,7 +1028,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
State1 = compact(Files, State),
{ok, State1}.
-internal_tx_publish(MsgId, Message,
+internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
+ guid = MsgId },
State = #dqstate { current_file_handle = CurHdl,
current_file_name = CurName,
current_offset = CurOffset,
@@ -1032,10 +1038,11 @@ internal_tx_publish(MsgId, Message,
case dets_ets_lookup(State, MsgId) of
[] ->
%% New message, lots to do
- {ok, TotalSize} =
- append_message(CurHdl, MsgId, msg_to_bin(Message)),
- true = dets_ets_insert_new(State, {MsgId, 1, CurName,
- CurOffset, TotalSize}),
+ {ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message),
+ IsPersistent),
+ true = dets_ets_insert_new
+ (State, {MsgId, 1, CurName,
+ CurOffset, TotalSize, IsPersistent}),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
ets:lookup(FileSummary, CurName),
ValidTotalSize1 = ValidTotalSize + TotalSize +
@@ -1051,10 +1058,10 @@ internal_tx_publish(MsgId, Message,
maybe_roll_to_new_file(
NextOffset, State #dqstate {current_offset = NextOffset,
current_dirty = true});
- [{MsgId, RefCount, File, Offset, TotalSize}] ->
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] ->
%% We already know about it, just update counter
ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
- Offset, TotalSize}),
+ Offset, TotalSize, IsPersistent}),
{ok, State}
end.
@@ -1080,7 +1087,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
lists:foldl(
fun (MsgId, {InCurFileAcc, SeqId}) ->
[{MsgId, _RefCount, File, Offset,
- _TotalSize}] = dets_ets_lookup(State, MsgId),
+ _TotalSize, _IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
ok = mnesia:write(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id =
@@ -1109,7 +1117,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
internal_publish(Q, Message = #basic_message { guid = MsgId },
IsDelivered, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
- internal_tx_publish(MsgId, Message, State),
+ internal_tx_publish(Message, State),
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
ok = mnesia:dirty_write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId},
@@ -1363,7 +1371,7 @@ sort_msg_locations_by_offset(Asc, List) ->
true -> fun erlang:'<'/2;
false -> fun erlang:'>'/2
end,
- lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
+ lists:sort(fun ({_, _, _, OffA, _, _}, {_, _, _, OffB, _, _}) ->
Comp(OffA, OffB)
end, List).
@@ -1402,7 +1410,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
read_ahead, delayed_write]),
Worklist =
lists:dropwhile(
- fun ({_, _, _, Offset, _})
+ fun ({_, _, _, Offset, _, _})
when Offset /= DestinationContiguousTop ->
%% it cannot be that Offset ==
%% DestinationContiguousTop because if it
@@ -1416,7 +1424,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
end, sort_msg_locations_by_offset(
true, dets_ets_match_object(State,
{'_', '_', Destination,
- '_', '_'}))),
+ '_', '_', '_'}))),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
DestinationHdl, TmpHdl, Destination, State),
@@ -1438,7 +1446,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
sort_msg_locations_by_offset(
true, dets_ets_match_object(State,
{'_', '_', Source,
- '_', '_'})),
+ '_', '_', '_'})),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
@@ -1452,14 +1460,15 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, State) ->
{FinalOffset, BlockStart1, BlockEnd1} =
lists:foldl(
- fun ({MsgId, RefCount, _Source, Offset, TotalSize},
+ fun ({MsgId, RefCount, _Source, Offset, TotalSize, IsPersistent},
{CurOffset, BlockStart, BlockEnd}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
%% update MsgLocationDets to reflect change of file and offset
- ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
- CurOffset, TotalSize}),
+ ok = dets_ets_insert
+ (State, {MsgId, RefCount, Destination,
+ CurOffset, TotalSize, IsPersistent}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
%% base case, called only for the first list elem
@@ -1643,11 +1652,13 @@ load_messages(undefined, [],
State;
load_messages(Left, [], State) ->
Num = list_to_integer(filename:rootname(Left)),
- Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of
- [] -> 0;
- L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] =
- sort_msg_locations_by_offset(false, L),
- MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ Offset =
+ case dets_ets_match_object(State, {'_', '_', Left, '_', '_', '_'}) of
+ [] -> 0;
+ L ->
+ [ {_MsgId, _RefCount, Left, MaxOffset, TotalSize, _IsPersistent}
+ | _ ] = sort_msg_locations_by_offset(false, L),
+ MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
end,
State #dqstate { current_file_num = Num, current_file_name = Left,
current_offset = Offset };
@@ -1656,7 +1667,7 @@ load_messages(Left, [File|Files],
%% [{MsgId, TotalSize, FileOffset}]
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
- fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ fun ({MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case erlang:length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
@@ -1666,9 +1677,9 @@ load_messages(Left, [File|Files],
msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
- true =
- dets_ets_insert_new(State, {MsgId, RefCount, File,
- Offset, TotalSize}),
+ true = dets_ets_insert_new
+ (State, {MsgId, RefCount, File,
+ Offset, TotalSize, IsPersistent}),
{[{MsgId, TotalSize, Offset}|VMAcc],
VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
}
@@ -1706,20 +1717,22 @@ verify_messages_in_mnesia(MsgIds) ->
msg_id))
end, MsgIds).
+grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) ->
+ MsgId.
+
recover_crashed_compactions1(Files, TmpFile) ->
- GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end,
NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFile, Files),
%% [{MsgId, TotalSize, FileOffset}]
{ok, UncorruptedMessagesTmp} =
scan_file_for_valid_messages(form_filename(TmpFile)),
- MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
+ MsgIdsTmp = lists:map(fun grab_msg_id/1, UncorruptedMessagesTmp),
%% all of these messages should appear in the mnesia table,
%% otherwise they wouldn't have been copied out
verify_messages_in_mnesia(MsgIdsTmp),
{ok, UncorruptedMessages} =
scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
- MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
+ MsgIds = lists:map(fun grab_msg_id/1, UncorruptedMessages),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
%% tmpfile). This means that compaction failed immediately
@@ -1788,7 +1801,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
{ok, MainMessages} =
scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
- MsgIdsMain = lists:map(GrabMsgId, MainMessages),
+ MsgIdsMain = lists:map(fun grab_msg_id/1, MainMessages),
%% check that everything in MsgIds is in MsgIdsMain
true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
MsgIds),
@@ -1833,16 +1846,20 @@ get_disk_queue_files() ->
%% ---- RAW READING AND WRITING OF FILES ----
-append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) ->
+append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
BodySize = size(MsgBody),
MsgIdBin = term_to_binary(MsgId),
MsgIdBinSize = size(MsgIdBin),
TotalSize = BodySize + MsgIdBinSize,
+ StopByte = case IsPersistent of
+ true -> ?WRITE_OK_PERSISTENT;
+ false -> ?WRITE_OK_TRANSIENT
+ end,
case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
MsgIdBin:MsgIdBinSize/binary,
MsgBody:BodySize/binary,
- ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of
+ StopByte:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, TotalSize};
KO -> KO
end.
@@ -1856,9 +1873,14 @@ read_message_at_offset(FileHdl, Offset, TotalSize) ->
MsgIdBinSize:?INTEGER_SIZE_BITS,
Rest:TotalSizeWriteOkBytes/binary>>} ->
BodySize = TotalSize - MsgIdBinSize,
- <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
- ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest,
- {ok, {MsgBody, BodySize}};
+ case Rest of
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, false, BodySize}};
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, true, BodySize}}
+ end;
KO -> KO
end;
KO -> KO
@@ -1876,15 +1898,15 @@ scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
{ok, eof} -> {ok, Acc};
{ok, {corrupted, NextOffset}} ->
scan_file_for_valid_messages(FileHdl, NextOffset, Acc);
- {ok, {ok, MsgId, TotalSize, NextOffset}} ->
- scan_file_for_valid_messages(FileHdl, NextOffset,
- [{MsgId, TotalSize, Offset}|Acc]);
+ {ok, {ok, MsgId, IsPersistent, TotalSize, NextOffset}} ->
+ scan_file_for_valid_messages(
+ FileHdl, NextOffset,
+ [{MsgId, IsPersistent, TotalSize, Offset} | Acc]);
_KO ->
%% bad message, but we may still have recovered some valid messages
{ok, Acc}
end.
-
read_next_file_entry(FileHdl, Offset) ->
TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
case file:read(FileHdl, TwoIntegers) of
@@ -1915,9 +1937,15 @@ read_next_file_entry(FileHdl, Offset) ->
?FILE_PACKING_ADJUSTMENT,
case file:read(FileHdl, 1) of
{ok,
- <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} ->
- {ok, {ok, binary_to_term(MsgId),
- TotalSize, NextOffset}};
+ <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} ->
+ {ok,
+ {ok, binary_to_term(MsgId),
+ false, TotalSize, NextOffset}};
+ {ok,
+ <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} ->
+ {ok,
+ {ok, binary_to_term(MsgId),
+ true, TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
{ok, {corrupted, NextOffset}};
KO -> KO
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 176ddddb03..fc30834e3a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -446,10 +446,12 @@ unfold(Fun, Acc, Init) ->
false -> {Acc, Init}
end.
-ceil(N) when N - trunc(N) > 0 ->
- 1 + trunc(N);
ceil(N) ->
- N.
+ T = trunc(N),
+ case N - T of
+ 0 -> N;
+ _ -> 1 + T
+ end.
keygets(Keys, KeyList) ->
lists:reverse(
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index d864d9b2f6..425d776377 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -311,7 +311,8 @@ publish_delivered(Msg =
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
- {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(PubQ),
+ {MsgId, IsPersistent, false, AckTag, 0} =
+ rabbit_disk_queue:phantom_deliver(PubQ),
{ok, AckTag, State1};
false ->
%% in this case, we don't actually care about the ack, so
@@ -340,7 +341,7 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
AckTag1 =
case IsDurable andalso IsPersistent of
true ->
- {MsgId, IsDelivered1, AckTag2, _PersistRem}
+ {MsgId, IsPersistent, IsDelivered1, AckTag2, _PRem}
= rabbit_disk_queue:phantom_deliver(Q),
AckTag2;
false ->