diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-24 16:53:35 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-24 16:53:35 +0100 |
| commit | c3155a188e0e845294340210b2ad6f5e1af4a69d (patch) | |
| tree | 9cb3706c9042d05b99d4f2cc941f0cc90c739ad4 /src | |
| parent | dc7c6605ab76a2bd32261881f8eac1276df392cb (diff) | |
| download | rabbitmq-server-git-c3155a188e0e845294340210b2ad6f5e1af4a69d.tar.gz | |
handle_cache => file_handle_cache.
Also switched to using the #message_store_entry record throughout.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_file_handle_cache.erl (renamed from src/rabbit_handle_cache.erl) | 2 |
2 files changed, 60 insertions, 53 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 33a1aaa830..aee91f5db0 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -397,12 +397,13 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {min_no_slots, 1024*1024}, %% man says this should be <= 32M. But it works... {max_no_slots, 30*1024*1024}, - {type, set} + {type, set}, + {keypos, 2} ]), %% it would be better to have this as private, but dets:from_ets/2 %% seems to blow up if it is set private - MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), + MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected, {keypos, 2}]), InitName = "0" ++ ?FILE_EXTENSION, State = @@ -419,7 +420,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> current_offset = 0, current_dirty = false, file_size_limit = FileSizeLimit, - read_file_hc_cache = rabbit_handle_cache:init( + read_file_hc_cache = rabbit_file_handle_cache:init( ReadFileHandlesLimit, [read, raw, binary, read_ahead]), on_sync_txns = [], @@ -588,7 +589,7 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, _ -> sync_current_file_handle(State), file:close(FileHdl) end, - HC1 = rabbit_handle_cache:close_all(HC), + HC1 = rabbit_file_handle_cache:close_all(HC), State1 #dqstate { current_file_handle = undefined, current_dirty = false, read_file_hc_cache = HC1, @@ -763,7 +764,7 @@ with_read_handle_at(File, Offset, Fun, State = end, FilePath = form_filename(File), {Result, HC1} = - rabbit_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC), + rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC), {Result, State1 #dqstate { read_file_hc_cache = HC1 }}. sequence_lookup(Sequences, Q) -> @@ -913,7 +914,7 @@ update_message_attributes(Q, SeqId, MarkDelivered, State) -> [Obj = #dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] = mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), - [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = + [StoreEntry = #message_store_entry { msg_id = MsgId }] = dets_ets_lookup(State, MsgId), ok = case {IsDelivered, MarkDelivered} of {true, _} -> ok; @@ -922,10 +923,7 @@ update_message_attributes(Q, SeqId, MarkDelivered, State) -> mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) end, - {{MsgId, SeqId}, IsDelivered, - #message_store_entry { msg_id = MsgId, ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize, - is_persistent = IsPersistent }}. + {{MsgId, SeqId}, IsDelivered, StoreEntry}. internal_foldl(Q, Fun, Init, State) -> State1 = #dqstate { sequences = Sequences } = @@ -974,7 +972,9 @@ remove_message(MsgId, Files, State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> - [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = + [StoreEntry = + #message_store_entry { msg_id = MsgId, ref_count = RefCount, file = File, + offset = Offset, total_size = TotalSize }] = dets_ets_lookup(State, MsgId), case RefCount of 1 -> @@ -993,8 +993,8 @@ remove_message(MsgId, Files, end; _ when 1 < RefCount -> ok = decrement_cache(MsgId, State), - ok = dets_ets_insert(State, {MsgId, RefCount - 1, File, Offset, - TotalSize, IsPersistent}), + ok = dets_ets_insert(State, StoreEntry #message_store_entry + { ref_count = RefCount - 1 }), Files end. @@ -1011,8 +1011,10 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, {ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message), IsPersistent), true = dets_ets_insert_new - (State, {MsgId, 1, CurName, - CurOffset, TotalSize, IsPersistent}), + (State, #message_store_entry + { msg_id = MsgId, ref_count = 1, file = CurName, + offset = CurOffset, total_size = TotalSize, + is_persistent = IsPersistent }), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), ValidTotalSize1 = ValidTotalSize + TotalSize + @@ -1028,10 +1030,11 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, maybe_roll_to_new_file( NextOffset, State #dqstate {current_offset = NextOffset, current_dirty = true}); - [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] -> + [StoreEntry = + #message_store_entry { msg_id = MsgId, ref_count = RefCount }] -> %% We already know about it, just update counter - ok = dets_ets_insert(State, {MsgId, RefCount + 1, File, - Offset, TotalSize, IsPersistent}), + ok = dets_ets_insert(State, StoreEntry #message_store_entry + { ref_count = RefCount + 1 }), {ok, State} end. @@ -1043,8 +1046,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, }) -> NeedsSync = IsDirty andalso lists:any(fun ({MsgId, _IsDelivered}) -> - [{MsgId, _RefCount, File, Offset, - _TotalSize, _IsPersistent}] = + [#message_store_entry { msg_id = MsgId, file = File, + offset = Offset }] = dets_ets_lookup(State, MsgId), File =:= CurFile andalso Offset >= SyncOffset end, PubMsgIds), @@ -1205,8 +1208,7 @@ internal_delete_queue(Q, State) -> Objs = mnesia:dirty_match_object( rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, '_'}, - msg_id = '_', - is_delivered = '_' + _ = '_' }), MsgSeqIds = lists:map( @@ -1330,12 +1332,13 @@ adjust_meta_and_combine( true -> {false, State} end. -sort_msg_locations_by_offset(Asc, List) -> - Comp = case Asc of - true -> fun erlang:'<'/2; - false -> fun erlang:'>'/2 +sort_msg_locations_by_offset(Dir, List) -> + Comp = case Dir of + asc -> fun erlang:'<'/2; + desc -> fun erlang:'>'/2 end, - lists:sort(fun ({_, _, _, OffA, _, _}, {_, _, _, OffB, _, _}) -> + lists:sort(fun (#message_store_entry { offset = OffA }, + #message_store_entry { offset = OffB }) -> Comp(OffA, OffB) end, List). @@ -1374,7 +1377,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, read_ahead, delayed_write]), Worklist = lists:dropwhile( - fun ({_, _, _, Offset, _, _}) + fun (#message_store_entry { offset = Offset }) when Offset /= DestinationContiguousTop -> %% it cannot be that Offset == %% DestinationContiguousTop because if it @@ -1386,9 +1389,9 @@ combine_files({Source, SourceValid, _SourceContiguousTop, %% as we require, however, we need to %% enforce it anyway end, sort_msg_locations_by_offset( - true, dets_ets_match_object(State, - {'_', '_', Destination, - '_', '_', '_'}))), + asc, dets_ets_match_object( + State, #message_store_entry + { file = Destination, _ = '_' }))), ok = copy_messages( Worklist, DestinationContiguousTop, DestinationValid, DestinationHdl, TmpHdl, Destination, State), @@ -1408,9 +1411,8 @@ combine_files({Source, SourceValid, _SourceContiguousTop, end, SourceWorkList = sort_msg_locations_by_offset( - true, dets_ets_match_object(State, - {'_', '_', Source, - '_', '_', '_'})), + asc, dets_ets_match_object(State, #message_store_entry + { file = Source, _ = '_' })), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State), %% tidy up @@ -1424,15 +1426,16 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, State) -> {FinalOffset, BlockStart1, BlockEnd1} = lists:foldl( - fun ({MsgId, RefCount, _Source, Offset, TotalSize, IsPersistent}, + fun (StoreEntry = #message_store_entry { offset = Offset, + total_size = TotalSize }, {CurOffset, BlockStart, BlockEnd}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, %% update MsgLocationDets to reflect change of file and offset - ok = dets_ets_insert - (State, {MsgId, RefCount, Destination, - CurOffset, TotalSize, IsPersistent}), + ok = dets_ets_insert (State, StoreEntry #message_store_entry + { file = Destination, + offset = CurOffset }), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> %% base case, called only for the first list elem @@ -1459,7 +1462,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, ok. close_file(File, State = #dqstate { read_file_hc_cache = HC }) -> - HC1 = rabbit_handle_cache:close_file(form_filename(File), HC), + HC1 = rabbit_file_handle_cache:close_file(form_filename(File), HC), State #dqstate { read_file_hc_cache = HC1 }. delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> @@ -1545,10 +1548,10 @@ prune_mnesia(State, Key, Files, DeleteAcc, Len) -> [] -> %% msg hasn't been found on disk, delete it {[{Q, SeqId} | DeleteAcc], Files, Len + 1}; - [{MsgId, _RefCount, _File, _Offset, _TotalSize, true}] -> + [#message_store_entry { msg_id = MsgId, is_persistent = true }] -> %% msg is persistent, keep it {DeleteAcc, Files, Len}; - [{MsgId, _RefCount, _File, _Offset, _TotalSize, false}] -> + [#message_store_entry { msg_id = MsgId, is_persistent = false}] -> %% msg is not persistent, delete it Files2 = remove_message(MsgId, Files, State), {[{Q, SeqId} | DeleteAcc], Files2, Len + 1} @@ -1645,11 +1648,14 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) -> load_messages(Left, [], State) -> Num = list_to_integer(filename:rootname(Left)), Offset = - case dets_ets_match_object(State, {'_', '_', Left, '_', '_', '_'}) of + case dets_ets_match_object(State, #message_store_entry + { file = Left, _ = '_' }) of [] -> 0; L -> - [ {_MsgId, _RefCount, Left, MaxOffset, TotalSize, _IsPersistent} - | _ ] = sort_msg_locations_by_offset(false, L), + [ #message_store_entry {file = Left, + offset = MaxOffset, + total_size = TotalSize} | _ ] = + sort_msg_locations_by_offset(desc, L), MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT end, State #dqstate { current_file_num = Num, current_file_name = Left, @@ -1663,15 +1669,17 @@ load_messages(Left, [File|Files], case length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_' - }, + _ = '_' + }, msg_id)) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = dets_ets_insert_new - (State, {MsgId, RefCount, File, - Offset, TotalSize, IsPersistent}), + (State, #message_store_entry + { msg_id = MsgId, ref_count = RefCount, + file = File, offset = Offset, + total_size = TotalSize, + is_persistent = IsPersistent }), {[Obj | VMAcc], VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT } @@ -1702,9 +1710,8 @@ verify_messages_in_mnesia(MsgIds) -> true = 0 < length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_' - }, + _ = '_' + }, msg_id)) end, MsgIds). diff --git a/src/rabbit_handle_cache.erl b/src/rabbit_file_handle_cache.erl index fcd79269ef..83acffd03e 100644 --- a/src/rabbit_handle_cache.erl +++ b/src/rabbit_file_handle_cache.erl @@ -29,7 +29,7 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_handle_cache). +-module(rabbit_file_handle_cache). -export([init/2, close_all/1, close_file/2, with_file_handle_at/4]). |
