diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-24 15:22:19 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-24 15:22:19 +0100 |
| commit | 2f6f113da83e37bc55c075dcfdfb1a6edf339cd5 (patch) | |
| tree | 7c4c45da23e174911519fd7735fdd0b5521a3933 | |
| parent | 99b96b785868d61a4a4a07136ee5109515546d2c (diff) | |
| download | rabbitmq-server-git-2f6f113da83e37bc55c075dcfdfb1a6edf339cd5.tar.gz | |
All the below, done.
- encoded body size is being cached, but doesn't need to be.
- extract_sequence_numbers and remove_gaps_in_sequences should just
take #dqstate.sequences, not the whole #dqstate.
- rename length/1 to len/1 - that's what it's called in other APIs,
e.g. queue. It will also allow you to remove the erlang: prefix from
the calls to erlang:length.
- load_messages/3 - instead of taking and returning a state, make it
take the file_summary only, and return {current_file_num,
current_file_name, current_offset}.
(not done due to Good Reasons, but the 1st base case that was objected to has vanished)
| -rw-r--r-- | src/rabbit_disk_queue.erl | 155 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 2 |
2 files changed, 79 insertions, 78 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 362d1e42e9..f6a1c8ca80 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -43,7 +43,7 @@ tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, purge/1, delete_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1, - requeue_next_n/2, length/1, foldl/3, prefetch/1 + requeue_next_n/2, len/1, foldl/3, prefetch/1 ]). -export([filesync/0, cache_info/0]). @@ -274,7 +274,7 @@ -spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(delete_queue/1 :: (queue_name()) -> 'ok'). -spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). --spec(length/1 :: (queue_name()) -> non_neg_integer()). +-spec(len/1 :: (queue_name()) -> non_neg_integer()). -spec(foldl/3 :: (fun ((message(), ack_tag(), boolean(), A) -> A), A, queue_name()) -> A). -spec(stop/0 :: () -> 'ok'). @@ -337,8 +337,8 @@ delete_non_durable_queues(DurableQueues) -> gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity). -length(Q) -> - gen_server2:call(?SERVER, {length, Q}, infinity). +len(Q) -> + gen_server2:call(?SERVER, {len, Q}, infinity). foldl(Fun, Init, Acc) -> gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity). @@ -474,7 +474,7 @@ handle_call({purge, Q}, _From, State) -> reply(Count, State1); handle_call(filesync, _From, State) -> reply(ok, sync_current_file_handle(State)); -handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> +handle_call({len, Q}, _From, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), reply(WriteSeqId - ReadSeqId, State); handle_call({foldl, Fun, Init, Q}, _From, State) -> @@ -817,13 +817,13 @@ fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) -> case ets:lookup(Cache, MsgId) of [] -> not_found; - [{MsgId, Message, MsgSize, _RefCount}] -> - NewRefCount = ets:update_counter(Cache, MsgId, {4, 1}), - {Message, MsgSize, NewRefCount} + [{MsgId, Message, _RefCount}] -> + NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}), + {Message, NewRefCount} end. decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> - true = try case ets:update_counter(Cache, MsgId, {4, -1}) of + true = try case ets:update_counter(Cache, MsgId, {3, -1}) of N when N =< 0 -> true = ets:delete(Cache, MsgId); _N -> true end @@ -835,15 +835,15 @@ decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> end, ok. -insert_into_cache(Message = #basic_message { guid = MsgId }, MsgSize, - State = #dqstate { message_cache = Cache }) -> - case cache_is_full(State) of +insert_into_cache(Message = #basic_message { guid = MsgId }, + #dqstate { message_cache = Cache }) -> + case cache_is_full(Cache) of true -> ok; - false -> true = ets:insert_new(Cache, {MsgId, Message, MsgSize, 1}), + false -> true = ets:insert_new(Cache, {MsgId, Message, 1}), ok end. -cache_is_full(#dqstate { message_cache = Cache }) -> +cache_is_full(Cache) -> ets:info(Cache, memory) > ?CACHE_MAX_SIZE. %% ---- INTERNAL RAW FUNCTIONS ---- @@ -888,7 +888,7 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, total_size = TotalSize }, State) -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {{ok, {MsgBody, _IsPersistent, EncodedBodySize}}, State1} = + {{ok, {MsgBody, _IsPersistent, _BodySize}}, State1} = with_read_handle_at( File, Offset, fun(Hdl) -> @@ -898,14 +898,14 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, end, State), Message = #basic_message {} = bin_to_msg(MsgBody), ok = if RefCount > 1 -> - insert_into_cache(Message, EncodedBodySize, State1); + insert_into_cache(Message, State1); true -> ok %% it's not in the cache and we only have %% 1 queue with the message. So don't %% bother putting it in the cache. end, {Message, State1}; - {Message, _EncodedBodySize, _RefCount} -> + {Message, _RefCount} -> {Message, State} end. @@ -1100,8 +1100,7 @@ internal_publish(Q, Message = #basic_message { guid = MsgId }, internal_tx_cancel(MsgIds, State) -> %% we don't need seq ids because we're not touching mnesia, %% because seqids were never assigned - MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), - undefined)), + MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), remove_messages(undefined, MsgSeqIds, false, State). internal_requeue(_Q, [], State) -> @@ -1272,6 +1271,12 @@ compact(FilesSet, State) -> end, [], Files), lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). +%% At this stage, we simply know that the file has had msgs removed +%% from it. However, we don't know if we need to merge it left (which +%% is what we would prefer), or merge it right. If we merge left, then +%% this file is the source, and the left file is the destination. If +%% we merge right then this file is the destination and the right file +%% is the source. combine_file(File, State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> @@ -1508,15 +1513,19 @@ load_from_disk(State) -> ok = recover_crashed_compactions(Files, TmpFiles), %% There should be no more tmp files now, so go ahead and load the %% whole lot - State1 = load_messages(undefined, Files, State), + Files1 = case Files of + [] -> [State #dqstate.current_file_name]; + _ -> Files + end, + State1 = load_messages(undefined, Files1, State), %% Finally, check there is nothing in mnesia which we haven't %% loaded Key = mnesia:dirty_first(rabbit_disk_queue), {ok, AlteredFiles} = prune_mnesia(State1, Key, sets:new(), [], 0), State2 = compact(AlteredFiles, State1), - State3 = extract_sequence_numbers(State2), + ok = extract_sequence_numbers(State2 #dqstate.sequences), ok = del_index(), - {ok, State3}. + {ok, State2}. prune_mnesia_flush_batch(DeleteAcc) -> lists:foldl(fun (Key, ok) -> @@ -1561,7 +1570,7 @@ prune_mnesia(State, Key, Files, DeleteAcc, Len) -> end, prune_mnesia(State, Key1, Files1, DeleteAcc2, Len2). -extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> +extract_sequence_numbers(Sequences) -> true = rabbit_misc:execute_mnesia_transaction( %% the ets manipulation within this transaction is @@ -1588,10 +1597,9 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> end end, true, rabbit_disk_queue) end), - ok = remove_gaps_in_sequences(State), - State. + ok = remove_gaps_in_sequences(Sequences). -remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> +remove_gaps_in_sequences(Sequences) -> %% read the comments at internal_requeue. %% Because we are at startup, we know that no sequence ids have @@ -1634,11 +1642,6 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) -> end, shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc). -load_messages(undefined, [], - State = #dqstate { file_summary = FileSummary, - current_file_name = CurName }) -> - true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), - State; load_messages(Left, [], State) -> Num = list_to_integer(filename:rootname(Left)), Offset = @@ -1655,15 +1658,15 @@ load_messages(Left, [File|Files], State = #dqstate { file_summary = FileSummary }) -> %% [{MsgId, TotalSize, FileOffset}] {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), - {ValidMessagesRev, ValidTotalSize} = lists:foldl( + {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case erlang:length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_' - }, - msg_id)) of + case length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_' + }, + msg_id)) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = dets_ets_insert_new @@ -1674,10 +1677,9 @@ load_messages(Left, [File|Files], } end end, {[], 0}, Messages), - %% foldl reverses lists and find_contiguous_block_prefix needs - %% elems in the same order as from scan_file_for_valid_messages - {ContiguousTop, _} = find_contiguous_block_prefix( - lists:reverse(ValidMessagesRev)), + %% foldl reverses lists, find_contiguous_block_prefix needs + %% msgs eldest first, so, ValidMessages is the right way round + {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), Right = case Files of [] -> undefined; [F|_] -> F @@ -1697,13 +1699,13 @@ recover_crashed_compactions(Files, TmpFiles) -> verify_messages_in_mnesia(MsgIds) -> lists:foreach( fun (MsgId) -> - true = 0 < erlang:length(mnesia:dirty_index_match_object - (rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_' - }, - msg_id)) + true = 0 < length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_' + }, + msg_id)) end, MsgIds). grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) -> @@ -1758,7 +1760,8 @@ recover_crashed_compactions1(Files, TmpFile) -> %% main file is a valid message in mnesia verify_messages_in_mnesia(MsgIds), %% The main file should be contiguous - {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), + {Top, MsgIds} = find_contiguous_block_prefix( + lists:reverse(UncorruptedMessages)), %% we should have that none of the messages in the prefix %% are in the tmp file true = lists:all(fun (MsgId) -> @@ -1800,28 +1803,22 @@ recover_crashed_compactions1(Files, TmpFile) -> end, ok. -%% this assumes that the messages are ordered such that the highest -%% address is at the head of the list. This matches what -%% scan_file_for_valid_messages produces +%% takes the list in *ascending* order (i.e. oldest message +%% first). This is the opposite of whach scan_file_for_valid_messages +%% produces. The list of msgs that is produced is youngest first find_contiguous_block_prefix([]) -> {0, []}; -find_contiguous_block_prefix([ {MsgId, _IsPersistent, TotalSize, Offset} - | Tail]) -> - case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of - {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, - lists:reverse(Acc)}; - Res -> Res - end. -find_contiguous_block_prefix([], 0, Acc) -> - {ok, Acc}; -find_contiguous_block_prefix([], _N, _Acc) -> - {0, []}; -find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, Offset} | Tail], - ExpectedOffset, Acc) - when ExpectedOffset =:= Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT -> - find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]); -find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) -> - find_contiguous_block_prefix(List). - +find_contiguous_block_prefix(List) -> + find_contiguous_block_prefix(List, 0, []). + +find_contiguous_block_prefix([], ExpectedOffset, MsgIds) -> + {ExpectedOffset, MsgIds}; +find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset} + | Tail], ExpectedOffset, MsgIds) -> + ExpectedOffset1 = ExpectedOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]); +find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> + {ExpectedOffset, MsgIds}. + file_name_sort(A, B) -> ANum = list_to_integer(filename:rootname(A)), BNum = list_to_integer(filename:rootname(B)), @@ -1873,11 +1870,15 @@ read_message_from_disk(FileHdl, TotalSize) -> end. scan_file_for_valid_messages(File) -> - {ok, Hdl} = file:open(File, [raw, binary, read]), - Valid = scan_file_for_valid_messages(Hdl, 0, []), - %% if something really bad's happened, the close could fail, but ignore - file:close(Hdl), - Valid. + case file:open(File, [raw, binary, read]) of + {ok, Hdl} -> + Valid = scan_file_for_valid_messages(Hdl, 0, []), + %% if something really bad's happened, the close could fail, but ignore + file:close(Hdl), + Valid; + {error, enoent} -> {ok, []}; + {error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}}) + end. scan_file_for_valid_messages(FileHdl, Offset, Acc) -> case read_next_file_entry(FileHdl, Offset) of diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 4d916cb365..771a920f87 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -101,7 +101,7 @@ -endif. init(Queue, IsDurable) -> - Len = rabbit_disk_queue:length(Queue), + Len = rabbit_disk_queue:len(Queue), MsgBuf = inc_queue_length(Queue, queue:new(), Len), Size = rabbit_disk_queue:foldl( fun (Msg = #basic_message { is_persistent = true }, |
