summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl155
-rw-r--r--src/rabbit_mixed_queue.erl2
2 files changed, 79 insertions, 78 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 362d1e42e9..f6a1c8ca80 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -43,7 +43,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2, length/1, foldl/3, prefetch/1
+ requeue_next_n/2, len/1, foldl/3, prefetch/1
]).
-export([filesync/0, cache_info/0]).
@@ -274,7 +274,7 @@
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
-spec(delete_queue/1 :: (queue_name()) -> 'ok').
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
--spec(length/1 :: (queue_name()) -> non_neg_integer()).
+-spec(len/1 :: (queue_name()) -> non_neg_integer()).
-spec(foldl/3 :: (fun ((message(), ack_tag(), boolean(), A) -> A),
A, queue_name()) -> A).
-spec(stop/0 :: () -> 'ok').
@@ -337,8 +337,8 @@ delete_non_durable_queues(DurableQueues) ->
gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues},
infinity).
-length(Q) ->
- gen_server2:call(?SERVER, {length, Q}, infinity).
+len(Q) ->
+ gen_server2:call(?SERVER, {len, Q}, infinity).
foldl(Fun, Init, Acc) ->
gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity).
@@ -474,7 +474,7 @@ handle_call({purge, Q}, _From, State) ->
reply(Count, State1);
handle_call(filesync, _From, State) ->
reply(ok, sync_current_file_handle(State));
-handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
+handle_call({len, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
reply(WriteSeqId - ReadSeqId, State);
handle_call({foldl, Fun, Init, Q}, _From, State) ->
@@ -817,13 +817,13 @@ fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) ->
case ets:lookup(Cache, MsgId) of
[] ->
not_found;
- [{MsgId, Message, MsgSize, _RefCount}] ->
- NewRefCount = ets:update_counter(Cache, MsgId, {4, 1}),
- {Message, MsgSize, NewRefCount}
+ [{MsgId, Message, _RefCount}] ->
+ NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}),
+ {Message, NewRefCount}
end.
decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
- true = try case ets:update_counter(Cache, MsgId, {4, -1}) of
+ true = try case ets:update_counter(Cache, MsgId, {3, -1}) of
N when N =< 0 -> true = ets:delete(Cache, MsgId);
_N -> true
end
@@ -835,15 +835,15 @@ decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
end,
ok.
-insert_into_cache(Message = #basic_message { guid = MsgId }, MsgSize,
- State = #dqstate { message_cache = Cache }) ->
- case cache_is_full(State) of
+insert_into_cache(Message = #basic_message { guid = MsgId },
+ #dqstate { message_cache = Cache }) ->
+ case cache_is_full(Cache) of
true -> ok;
- false -> true = ets:insert_new(Cache, {MsgId, Message, MsgSize, 1}),
+ false -> true = ets:insert_new(Cache, {MsgId, Message, 1}),
ok
end.
-cache_is_full(#dqstate { message_cache = Cache }) ->
+cache_is_full(Cache) ->
ets:info(Cache, memory) > ?CACHE_MAX_SIZE.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -888,7 +888,7 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
total_size = TotalSize }, State) ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {{ok, {MsgBody, _IsPersistent, EncodedBodySize}}, State1} =
+ {{ok, {MsgBody, _IsPersistent, _BodySize}}, State1} =
with_read_handle_at(
File, Offset,
fun(Hdl) ->
@@ -898,14 +898,14 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
end, State),
Message = #basic_message {} = bin_to_msg(MsgBody),
ok = if RefCount > 1 ->
- insert_into_cache(Message, EncodedBodySize, State1);
+ insert_into_cache(Message, State1);
true -> ok
%% it's not in the cache and we only have
%% 1 queue with the message. So don't
%% bother putting it in the cache.
end,
{Message, State1};
- {Message, _EncodedBodySize, _RefCount} ->
+ {Message, _RefCount} ->
{Message, State}
end.
@@ -1100,8 +1100,7 @@ internal_publish(Q, Message = #basic_message { guid = MsgId },
internal_tx_cancel(MsgIds, State) ->
%% we don't need seq ids because we're not touching mnesia,
%% because seqids were never assigned
- MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds),
- undefined)),
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
remove_messages(undefined, MsgSeqIds, false, State).
internal_requeue(_Q, [], State) ->
@@ -1272,6 +1271,12 @@ compact(FilesSet, State) ->
end, [], Files),
lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)).
+%% At this stage, we simply know that the file has had msgs removed
+%% from it. However, we don't know if we need to merge it left (which
+%% is what we would prefer), or merge it right. If we merge left, then
+%% this file is the source, and the left file is the destination. If
+%% we merge right then this file is the destination and the right file
+%% is the source.
combine_file(File, State = #dqstate { file_summary = FileSummary,
current_file_name = CurName
}) ->
@@ -1508,15 +1513,19 @@ load_from_disk(State) ->
ok = recover_crashed_compactions(Files, TmpFiles),
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
- State1 = load_messages(undefined, Files, State),
+ Files1 = case Files of
+ [] -> [State #dqstate.current_file_name];
+ _ -> Files
+ end,
+ State1 = load_messages(undefined, Files1, State),
%% Finally, check there is nothing in mnesia which we haven't
%% loaded
Key = mnesia:dirty_first(rabbit_disk_queue),
{ok, AlteredFiles} = prune_mnesia(State1, Key, sets:new(), [], 0),
State2 = compact(AlteredFiles, State1),
- State3 = extract_sequence_numbers(State2),
+ ok = extract_sequence_numbers(State2 #dqstate.sequences),
ok = del_index(),
- {ok, State3}.
+ {ok, State2}.
prune_mnesia_flush_batch(DeleteAcc) ->
lists:foldl(fun (Key, ok) ->
@@ -1561,7 +1570,7 @@ prune_mnesia(State, Key, Files, DeleteAcc, Len) ->
end,
prune_mnesia(State, Key1, Files1, DeleteAcc2, Len2).
-extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
+extract_sequence_numbers(Sequences) ->
true =
rabbit_misc:execute_mnesia_transaction(
%% the ets manipulation within this transaction is
@@ -1588,10 +1597,9 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
end
end, true, rabbit_disk_queue)
end),
- ok = remove_gaps_in_sequences(State),
- State.
+ ok = remove_gaps_in_sequences(Sequences).
-remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
+remove_gaps_in_sequences(Sequences) ->
%% read the comments at internal_requeue.
%% Because we are at startup, we know that no sequence ids have
@@ -1634,11 +1642,6 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
end,
shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
-load_messages(undefined, [],
- State = #dqstate { file_summary = FileSummary,
- current_file_name = CurName }) ->
- true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}),
- State;
load_messages(Left, [], State) ->
Num = list_to_integer(filename:rootname(Left)),
Offset =
@@ -1655,15 +1658,15 @@ load_messages(Left, [File|Files],
State = #dqstate { file_summary = FileSummary }) ->
%% [{MsgId, TotalSize, FileOffset}]
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
- {ValidMessagesRev, ValidTotalSize} = lists:foldl(
+ {ValidMessages, ValidTotalSize} = lists:foldl(
fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case erlang:length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_'
- },
- msg_id)) of
+ case length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_'
+ },
+ msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true = dets_ets_insert_new
@@ -1674,10 +1677,9 @@ load_messages(Left, [File|Files],
}
end
end, {[], 0}, Messages),
- %% foldl reverses lists and find_contiguous_block_prefix needs
- %% elems in the same order as from scan_file_for_valid_messages
- {ContiguousTop, _} = find_contiguous_block_prefix(
- lists:reverse(ValidMessagesRev)),
+ %% foldl reverses lists, find_contiguous_block_prefix needs
+ %% msgs eldest first, so, ValidMessages is the right way round
+ {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages),
Right = case Files of
[] -> undefined;
[F|_] -> F
@@ -1697,13 +1699,13 @@ recover_crashed_compactions(Files, TmpFiles) ->
verify_messages_in_mnesia(MsgIds) ->
lists:foreach(
fun (MsgId) ->
- true = 0 < erlang:length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_'
- },
- msg_id))
+ true = 0 < length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_'
+ },
+ msg_id))
end, MsgIds).
grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) ->
@@ -1758,7 +1760,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% main file is a valid message in mnesia
verify_messages_in_mnesia(MsgIds),
%% The main file should be contiguous
- {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
+ {Top, MsgIds} = find_contiguous_block_prefix(
+ lists:reverse(UncorruptedMessages)),
%% we should have that none of the messages in the prefix
%% are in the tmp file
true = lists:all(fun (MsgId) ->
@@ -1800,28 +1803,22 @@ recover_crashed_compactions1(Files, TmpFile) ->
end,
ok.
-%% this assumes that the messages are ordered such that the highest
-%% address is at the head of the list. This matches what
-%% scan_file_for_valid_messages produces
+%% takes the list in *ascending* order (i.e. oldest message
+%% first). This is the opposite of whach scan_file_for_valid_messages
+%% produces. The list of msgs that is produced is youngest first
find_contiguous_block_prefix([]) -> {0, []};
-find_contiguous_block_prefix([ {MsgId, _IsPersistent, TotalSize, Offset}
- | Tail]) ->
- case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of
- {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
- lists:reverse(Acc)};
- Res -> Res
- end.
-find_contiguous_block_prefix([], 0, Acc) ->
- {ok, Acc};
-find_contiguous_block_prefix([], _N, _Acc) ->
- {0, []};
-find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, Offset} | Tail],
- ExpectedOffset, Acc)
- when ExpectedOffset =:= Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT ->
- find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]);
-find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) ->
- find_contiguous_block_prefix(List).
-
+find_contiguous_block_prefix(List) ->
+ find_contiguous_block_prefix(List, 0, []).
+
+find_contiguous_block_prefix([], ExpectedOffset, MsgIds) ->
+ {ExpectedOffset, MsgIds};
+find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset}
+ | Tail], ExpectedOffset, MsgIds) ->
+ ExpectedOffset1 = ExpectedOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]);
+find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
+ {ExpectedOffset, MsgIds}.
+
file_name_sort(A, B) ->
ANum = list_to_integer(filename:rootname(A)),
BNum = list_to_integer(filename:rootname(B)),
@@ -1873,11 +1870,15 @@ read_message_from_disk(FileHdl, TotalSize) ->
end.
scan_file_for_valid_messages(File) ->
- {ok, Hdl} = file:open(File, [raw, binary, read]),
- Valid = scan_file_for_valid_messages(Hdl, 0, []),
- %% if something really bad's happened, the close could fail, but ignore
- file:close(Hdl),
- Valid.
+ case file:open(File, [raw, binary, read]) of
+ {ok, Hdl} ->
+ Valid = scan_file_for_valid_messages(Hdl, 0, []),
+ %% if something really bad's happened, the close could fail, but ignore
+ file:close(Hdl),
+ Valid;
+ {error, enoent} -> {ok, []};
+ {error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}})
+ end.
scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
case read_next_file_entry(FileHdl, Offset) of
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 4d916cb365..771a920f87 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -101,7 +101,7 @@
-endif.
init(Queue, IsDurable) ->
- Len = rabbit_disk_queue:length(Queue),
+ Len = rabbit_disk_queue:len(Queue),
MsgBuf = inc_queue_length(Queue, queue:new(), Len),
Size = rabbit_disk_queue:foldl(
fun (Msg = #basic_message { is_persistent = true },