summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl152
-rw-r--r--src/rabbit_misc.erl8
-rw-r--r--src/rabbit_mixed_queue.erl5
3 files changed, 98 insertions, 67 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 868eab4a5e..4eef884f72 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -53,7 +53,8 @@
-include("rabbit.hrl").
-define(WRITE_OK_SIZE_BITS, 8).
--define(WRITE_OK, 255).
+-define(WRITE_OK_TRANSIENT, 255).
+-define(WRITE_OK_PERSISTENT, 254).
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
@@ -101,8 +102,8 @@
%% The components:
%%
-%% MsgLocation: this is a dets table which contains:
-%% {MsgId, RefCount, File, Offset, TotalSize}
+%% MsgLocation: this is a (d)ets table which contains:
+%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent}
%% FileSummary: this is an ets table which contains:
%% {File, ValidTotalSize, ContiguousTop, Left, Right}
%% Sequences: this is an ets table which contains:
@@ -393,7 +394,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
?FILE_EXTENSION_DETS)},
{min_no_slots, 1024*1024},
%% man says this should be <= 32M. But it works...
- {max_no_slots, 1024*1024*1024},
+ {max_no_slots, 30*1024*1024},
{type, set}
]),
@@ -509,8 +510,8 @@ handle_cast({ack, Q, MsgSeqIds}, State) ->
handle_cast({auto_ack_next_message, Q}, State) ->
{ok, State1} = internal_auto_ack(Q, State),
noreply(State1);
-handle_cast({tx_publish, Message = #basic_message { guid = MsgId }}, State) ->
- {ok, State1} = internal_tx_publish(MsgId, Message, State),
+handle_cast({tx_publish, Message}, State) ->
+ {ok, State1} = internal_tx_publish(Message, State),
noreply(State1);
handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
@@ -636,8 +637,8 @@ memory_use(#dqstate { operation_mode = disk_only,
(WordSize * (ets:info(FileSummary, memory) +
ets:info(Cache, memory) +
ets:info(Sequences, memory))) +
- round(MnesiaSizeEstimate) +
- round(MsgLocationSizeEstimate).
+ rabbit_misc:ceil(MnesiaSizeEstimate) +
+ rabbit_misc:ceil(MsgLocationSizeEstimate).
to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) ->
State;
@@ -872,7 +873,8 @@ insert_into_cache(Message = #basic_message { guid = MsgId },
true -> 0;
false -> 1
end,
- true = ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}),
+ true =
+ ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}),
ok.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -890,8 +892,9 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
{Q, ReadSeqId+1, WriteSeqId}),
{ok,
case Result of
- {MsgId, Delivered, {MsgId, ReadSeqId}} ->
- {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining};
+ {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}} ->
+ {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId},
+ Remaining};
{Message, BodySize, Delivered, {MsgId, ReadSeqId}} ->
{Message, BodySize, Delivered, {MsgId, ReadSeqId},
Remaining}
@@ -927,7 +930,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] =
mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
- [{MsgId, RefCount, File, Offset, TotalSize}] =
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
dets_ets_lookup(State, MsgId),
ok =
if FakeDeliver orelse Delivered -> ok;
@@ -940,12 +943,13 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
case fetch_and_increment_cache(MsgId, State) of
not_found ->
{FileHdl, State1} = get_read_handle(File, Offset, State),
- {ok, {MsgBody, BodySize}} =
+ {ok, {MsgBody, IsPersistent, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
- Message = bin_to_msg(MsgBody),
+ #basic_message { is_persistent=IsPersistent, guid=MsgId } =
+ Message = bin_to_msg(MsgBody),
ok = if RefCount > 1 orelse ForceInCache ->
- insert_into_cache(Message, BodySize,
- ForceInCache, State1);
+ insert_into_cache
+ (Message, BodySize, ForceInCache, State1);
true -> ok
%% it's not in the cache and we only
%% have 1 queue with the message. So
@@ -959,13 +963,14 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
State}
end;
false ->
- {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State}
+ {ok, {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}}, State}
end.
internal_auto_ack(Q, State) ->
case internal_deliver(Q, false, true, State) of
{ok, empty, State1} -> {ok, State1};
- {ok, {_MsgId, _Delivered, MsgSeqId, _Remaining}, State1} ->
+ {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining},
+ State1} ->
remove_messages(Q, [MsgSeqId], true, State1)
end.
@@ -985,7 +990,7 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
Files =
lists:foldl(
fun ({MsgId, SeqId}, Files1) ->
- [{MsgId, RefCount, File, Offset, TotalSize}] =
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
dets_ets_lookup(State, MsgId),
Files2 =
case RefCount of
@@ -1007,8 +1012,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
ok = dets_ets_insert(
- State, {MsgId, RefCount - 1,
- File, Offset, TotalSize}),
+ State, {MsgId, RefCount - 1, File, Offset,
+ TotalSize, IsPersistent}),
Files1
end,
ok = case MnesiaDelete of
@@ -1023,7 +1028,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
State1 = compact(Files, State),
{ok, State1}.
-internal_tx_publish(MsgId, Message,
+internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
+ guid = MsgId },
State = #dqstate { current_file_handle = CurHdl,
current_file_name = CurName,
current_offset = CurOffset,
@@ -1032,10 +1038,11 @@ internal_tx_publish(MsgId, Message,
case dets_ets_lookup(State, MsgId) of
[] ->
%% New message, lots to do
- {ok, TotalSize} =
- append_message(CurHdl, MsgId, msg_to_bin(Message)),
- true = dets_ets_insert_new(State, {MsgId, 1, CurName,
- CurOffset, TotalSize}),
+ {ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message),
+ IsPersistent),
+ true = dets_ets_insert_new
+ (State, {MsgId, 1, CurName,
+ CurOffset, TotalSize, IsPersistent}),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
ets:lookup(FileSummary, CurName),
ValidTotalSize1 = ValidTotalSize + TotalSize +
@@ -1051,10 +1058,10 @@ internal_tx_publish(MsgId, Message,
maybe_roll_to_new_file(
NextOffset, State #dqstate {current_offset = NextOffset,
current_dirty = true});
- [{MsgId, RefCount, File, Offset, TotalSize}] ->
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] ->
%% We already know about it, just update counter
ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
- Offset, TotalSize}),
+ Offset, TotalSize, IsPersistent}),
{ok, State}
end.
@@ -1080,7 +1087,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
lists:foldl(
fun (MsgId, {InCurFileAcc, SeqId}) ->
[{MsgId, _RefCount, File, Offset,
- _TotalSize}] = dets_ets_lookup(State, MsgId),
+ _TotalSize, _IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
ok = mnesia:write(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id =
@@ -1109,7 +1117,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
internal_publish(Q, Message = #basic_message { guid = MsgId },
IsDelivered, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
- internal_tx_publish(MsgId, Message, State),
+ internal_tx_publish(Message, State),
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
ok = mnesia:dirty_write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId},
@@ -1363,7 +1371,7 @@ sort_msg_locations_by_offset(Asc, List) ->
true -> fun erlang:'<'/2;
false -> fun erlang:'>'/2
end,
- lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
+ lists:sort(fun ({_, _, _, OffA, _, _}, {_, _, _, OffB, _, _}) ->
Comp(OffA, OffB)
end, List).
@@ -1402,7 +1410,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
read_ahead, delayed_write]),
Worklist =
lists:dropwhile(
- fun ({_, _, _, Offset, _})
+ fun ({_, _, _, Offset, _, _})
when Offset /= DestinationContiguousTop ->
%% it cannot be that Offset ==
%% DestinationContiguousTop because if it
@@ -1416,7 +1424,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
end, sort_msg_locations_by_offset(
true, dets_ets_match_object(State,
{'_', '_', Destination,
- '_', '_'}))),
+ '_', '_', '_'}))),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
DestinationHdl, TmpHdl, Destination, State),
@@ -1438,7 +1446,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
sort_msg_locations_by_offset(
true, dets_ets_match_object(State,
{'_', '_', Source,
- '_', '_'})),
+ '_', '_', '_'})),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
@@ -1452,14 +1460,15 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, State) ->
{FinalOffset, BlockStart1, BlockEnd1} =
lists:foldl(
- fun ({MsgId, RefCount, _Source, Offset, TotalSize},
+ fun ({MsgId, RefCount, _Source, Offset, TotalSize, IsPersistent},
{CurOffset, BlockStart, BlockEnd}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
%% update MsgLocationDets to reflect change of file and offset
- ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
- CurOffset, TotalSize}),
+ ok = dets_ets_insert
+ (State, {MsgId, RefCount, Destination,
+ CurOffset, TotalSize, IsPersistent}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
%% base case, called only for the first list elem
@@ -1643,11 +1652,13 @@ load_messages(undefined, [],
State;
load_messages(Left, [], State) ->
Num = list_to_integer(filename:rootname(Left)),
- Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of
- [] -> 0;
- L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] =
- sort_msg_locations_by_offset(false, L),
- MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ Offset =
+ case dets_ets_match_object(State, {'_', '_', Left, '_', '_', '_'}) of
+ [] -> 0;
+ L ->
+ [ {_MsgId, _RefCount, Left, MaxOffset, TotalSize, _IsPersistent}
+ | _ ] = sort_msg_locations_by_offset(false, L),
+ MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
end,
State #dqstate { current_file_num = Num, current_file_name = Left,
current_offset = Offset };
@@ -1656,7 +1667,7 @@ load_messages(Left, [File|Files],
%% [{MsgId, TotalSize, FileOffset}]
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
- fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ fun ({MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case erlang:length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
@@ -1666,9 +1677,9 @@ load_messages(Left, [File|Files],
msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
- true =
- dets_ets_insert_new(State, {MsgId, RefCount, File,
- Offset, TotalSize}),
+ true = dets_ets_insert_new
+ (State, {MsgId, RefCount, File,
+ Offset, TotalSize, IsPersistent}),
{[{MsgId, TotalSize, Offset}|VMAcc],
VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
}
@@ -1706,20 +1717,22 @@ verify_messages_in_mnesia(MsgIds) ->
msg_id))
end, MsgIds).
+grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) ->
+ MsgId.
+
recover_crashed_compactions1(Files, TmpFile) ->
- GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end,
NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFile, Files),
%% [{MsgId, TotalSize, FileOffset}]
{ok, UncorruptedMessagesTmp} =
scan_file_for_valid_messages(form_filename(TmpFile)),
- MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
+ MsgIdsTmp = lists:map(fun grab_msg_id/1, UncorruptedMessagesTmp),
%% all of these messages should appear in the mnesia table,
%% otherwise they wouldn't have been copied out
verify_messages_in_mnesia(MsgIdsTmp),
{ok, UncorruptedMessages} =
scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
- MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
+ MsgIds = lists:map(fun grab_msg_id/1, UncorruptedMessages),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
%% tmpfile). This means that compaction failed immediately
@@ -1788,7 +1801,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
{ok, MainMessages} =
scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
- MsgIdsMain = lists:map(GrabMsgId, MainMessages),
+ MsgIdsMain = lists:map(fun grab_msg_id/1, MainMessages),
%% check that everything in MsgIds is in MsgIdsMain
true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
MsgIds),
@@ -1833,16 +1846,20 @@ get_disk_queue_files() ->
%% ---- RAW READING AND WRITING OF FILES ----
-append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) ->
+append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
BodySize = size(MsgBody),
MsgIdBin = term_to_binary(MsgId),
MsgIdBinSize = size(MsgIdBin),
TotalSize = BodySize + MsgIdBinSize,
+ StopByte = case IsPersistent of
+ true -> ?WRITE_OK_PERSISTENT;
+ false -> ?WRITE_OK_TRANSIENT
+ end,
case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
MsgIdBin:MsgIdBinSize/binary,
MsgBody:BodySize/binary,
- ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of
+ StopByte:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, TotalSize};
KO -> KO
end.
@@ -1856,9 +1873,14 @@ read_message_at_offset(FileHdl, Offset, TotalSize) ->
MsgIdBinSize:?INTEGER_SIZE_BITS,
Rest:TotalSizeWriteOkBytes/binary>>} ->
BodySize = TotalSize - MsgIdBinSize,
- <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
- ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest,
- {ok, {MsgBody, BodySize}};
+ case Rest of
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, false, BodySize}};
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, true, BodySize}}
+ end;
KO -> KO
end;
KO -> KO
@@ -1876,15 +1898,15 @@ scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
{ok, eof} -> {ok, Acc};
{ok, {corrupted, NextOffset}} ->
scan_file_for_valid_messages(FileHdl, NextOffset, Acc);
- {ok, {ok, MsgId, TotalSize, NextOffset}} ->
- scan_file_for_valid_messages(FileHdl, NextOffset,
- [{MsgId, TotalSize, Offset}|Acc]);
+ {ok, {ok, MsgId, IsPersistent, TotalSize, NextOffset}} ->
+ scan_file_for_valid_messages(
+ FileHdl, NextOffset,
+ [{MsgId, IsPersistent, TotalSize, Offset} | Acc]);
_KO ->
%% bad message, but we may still have recovered some valid messages
{ok, Acc}
end.
-
read_next_file_entry(FileHdl, Offset) ->
TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
case file:read(FileHdl, TwoIntegers) of
@@ -1915,9 +1937,15 @@ read_next_file_entry(FileHdl, Offset) ->
?FILE_PACKING_ADJUSTMENT,
case file:read(FileHdl, 1) of
{ok,
- <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} ->
- {ok, {ok, binary_to_term(MsgId),
- TotalSize, NextOffset}};
+ <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} ->
+ {ok,
+ {ok, binary_to_term(MsgId),
+ false, TotalSize, NextOffset}};
+ {ok,
+ <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} ->
+ {ok,
+ {ok, binary_to_term(MsgId),
+ true, TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
{ok, {corrupted, NextOffset}};
KO -> KO
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 176ddddb03..fc30834e3a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -446,10 +446,12 @@ unfold(Fun, Acc, Init) ->
false -> {Acc, Init}
end.
-ceil(N) when N - trunc(N) > 0 ->
- 1 + trunc(N);
ceil(N) ->
- N.
+ T = trunc(N),
+ case N - T of
+ 0 -> N;
+ _ -> 1 + T
+ end.
keygets(Keys, KeyList) ->
lists:reverse(
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index d864d9b2f6..425d776377 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -311,7 +311,8 @@ publish_delivered(Msg =
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
- {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(PubQ),
+ {MsgId, IsPersistent, false, AckTag, 0} =
+ rabbit_disk_queue:phantom_deliver(PubQ),
{ok, AckTag, State1};
false ->
%% in this case, we don't actually care about the ack, so
@@ -340,7 +341,7 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
AckTag1 =
case IsDurable andalso IsPersistent of
true ->
- {MsgId, IsDelivered1, AckTag2, _PersistRem}
+ {MsgId, IsPersistent, IsDelivered1, AckTag2, _PRem}
= rabbit_disk_queue:phantom_deliver(Q),
AckTag2;
false ->