diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-09-03 15:03:31 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-09-03 15:03:31 +0100 |
| commit | 82bad5e71fafb91fba327cc1699df763b07ce24e (patch) | |
| tree | 2c34f48c32a50d0a8866cf5f207cb4cc840680e3 | |
| parent | e32ffbe4f0e0de4aa8071e5f074117b963684f29 (diff) | |
| download | rabbitmq-server-git-82bad5e71fafb91fba327cc1699df763b07ce24e.tar.gz | |
rename #message_store_entry to #msg_location
to match what we call the containing table
| -rw-r--r-- | src/rabbit_disk_queue.erl | 76 |
1 files changed, 38 insertions, 38 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 2d13a337a7..c5e79df9ed 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -109,7 +109,7 @@ ets_bytes_per_record %% bytes per record in msg_location_ets }). --record(message_store_entry, +-record(msg_location, {msg_id, ref_count, file, offset, total_size, is_persistent}). -record(file_summary_entry, @@ -919,7 +919,7 @@ internal_fetch_attributes(Q, MarkDelivered, Advance, State) -> case queue_head(Q, MarkDelivered, Advance, State) of E = {ok, empty, _} -> E; {ok, AckTag, IsDelivered, - #message_store_entry { msg_id = MsgId, is_persistent = IsPersistent }, + #msg_location { msg_id = MsgId, is_persistent = IsPersistent }, Remaining, State1} -> {ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1} end. @@ -942,9 +942,9 @@ maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) -> true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), ok. -read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, - file = File, offset = Offset, - total_size = TotalSize }, State) -> +read_stored_message(#msg_location { msg_id = MsgId, ref_count = RefCount, + file = File, offset = Offset, + total_size = TotalSize }, State) -> case fetch_and_increment_cache(MsgId, State) of not_found -> {{ok, {MsgId, MsgBody, _IsPersistent, _BodySize}}, State1} = @@ -979,7 +979,7 @@ update_message_attributes(Q, SeqId, MarkDelivered, State) -> [Obj = #dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] = mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), - [StoreEntry = #message_store_entry { msg_id = MsgId }] = + [StoreEntry = #msg_location { msg_id = MsgId }] = dets_ets_lookup(State, MsgId), ok = case {IsDelivered, MarkDelivered} of {true, _} -> ok; @@ -1029,8 +1029,8 @@ remove_message(MsgId, Files, current_file_name = CurName }) -> [StoreEntry = - #message_store_entry { msg_id = MsgId, ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize }] = + #msg_location { msg_id = MsgId, ref_count = RefCount, file = File, + offset = Offset, total_size = TotalSize }] = dets_ets_lookup(State, MsgId), case RefCount of 1 -> @@ -1049,8 +1049,8 @@ remove_message(MsgId, Files, end; _ when 1 < RefCount -> ok = decrement_cache(MsgId, State), - ok = dets_ets_insert(State, StoreEntry #message_store_entry - { ref_count = RefCount - 1 }), + ok = dets_ets_insert(State, StoreEntry #msg_location { + ref_count = RefCount - 1 }), Files end. @@ -1068,8 +1068,8 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, CurHdl, MsgId, msg_to_bin(Message), IsPersistent), true = dets_ets_insert_new( - State, #message_store_entry - { msg_id = MsgId, ref_count = 1, file = CurName, + State, #msg_location { + msg_id = MsgId, ref_count = 1, file = CurName, offset = CurOffset, total_size = TotalSize, is_persistent = IsPersistent }), [FSEntry = #file_summary_entry { valid_total_size = ValidTotalSize, @@ -1090,10 +1090,10 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, NextOffset, State #dqstate {current_offset = NextOffset, current_dirty = true}); [StoreEntry = - #message_store_entry { msg_id = MsgId, ref_count = RefCount }] -> + #msg_location { msg_id = MsgId, ref_count = RefCount }] -> %% We already know about it, just update counter - ok = dets_ets_insert(State, StoreEntry #message_store_entry - { ref_count = RefCount + 1 }), + ok = dets_ets_insert(State, StoreEntry #msg_location { + ref_count = RefCount + 1 }), {ok, State} end. @@ -1105,8 +1105,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, }) -> NeedsSync = IsDirty andalso lists:any(fun ({MsgId, _IsDelivered}) -> - [#message_store_entry { msg_id = MsgId, file = File, - offset = Offset }] = + [#msg_location { msg_id = MsgId, file = File, + offset = Offset }] = dets_ets_lookup(State, MsgId), File =:= CurFile andalso Offset >= SyncOffset end, PubMsgIds), @@ -1389,8 +1389,8 @@ sort_msg_locations_by_offset(Dir, List) -> asc -> fun erlang:'<'/2; desc -> fun erlang:'>'/2 end, - lists:sort(fun (#message_store_entry { offset = OffA }, - #message_store_entry { offset = OffB }) -> + lists:sort(fun (#msg_location { offset = OffA }, + #msg_location { offset = OffB }) -> Comp(OffA, OffB) end, List). @@ -1431,7 +1431,7 @@ combine_files(#file_summary_entry { file = Source, {ok, TmpHdl} = open_file(Tmp, ?READ_MODE ++ ?WRITE_MODE), Worklist = lists:dropwhile( - fun (#message_store_entry { offset = Offset }) + fun (#msg_location { offset = Offset }) when Offset /= DestinationContiguousTop -> %% it cannot be that Offset == %% DestinationContiguousTop because if it @@ -1444,8 +1444,8 @@ combine_files(#file_summary_entry { file = Source, %% enforce it anyway end, sort_msg_locations_by_offset( asc, dets_ets_match_object( - State1, #message_store_entry - { file = Destination, _ = '_' }))), + State1, #msg_location { + file = Destination, _ = '_' }))), ok = copy_messages( Worklist, DestinationContiguousTop, DestinationValid, DestinationHdl, TmpHdl, Destination, State1), @@ -1465,8 +1465,8 @@ combine_files(#file_summary_entry { file = Source, end, SourceWorkList = sort_msg_locations_by_offset( - asc, dets_ets_match_object(State1, #message_store_entry - { file = Source, _ = '_' })), + asc, dets_ets_match_object(State1, #msg_location { + file = Source, _ = '_' })), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State1), %% tidy up @@ -1479,15 +1479,15 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, State) -> {FinalOffset, BlockStart1, BlockEnd1} = lists:foldl( - fun (StoreEntry = #message_store_entry { offset = Offset, - total_size = TotalSize }, + fun (StoreEntry = #msg_location { offset = Offset, + total_size = TotalSize }, {CurOffset, BlockStart, BlockEnd}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile %% update MsgLocationDets to reflect change of file and offset - ok = dets_ets_insert(State, StoreEntry #message_store_entry - { file = Destination, - offset = CurOffset }), + ok = dets_ets_insert(State, StoreEntry #msg_location { + file = Destination, + offset = CurOffset }), NextOffset = CurOffset + TotalSize, if BlockStart =:= undefined -> %% base case, called only for the first list elem @@ -1670,10 +1670,10 @@ prune_mnesia(State, Key, Files, DeleteAcc, Len) -> [] -> %% msg hasn't been found on disk, delete it {[{Q, SeqId} | DeleteAcc], Files, Len + 1}; - [#message_store_entry { msg_id = MsgId, is_persistent = true }] -> + [#msg_location { msg_id = MsgId, is_persistent = true }] -> %% msg is persistent, keep it {DeleteAcc, Files, Len}; - [#message_store_entry { msg_id = MsgId, is_persistent = false}] -> + [#msg_location { msg_id = MsgId, is_persistent = false}] -> %% msg is not persistent, delete it Files2 = remove_message(MsgId, Files, State), {[{Q, SeqId} | DeleteAcc], Files2, Len + 1} @@ -1770,13 +1770,13 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) -> load_messages(Left, [], State) -> Num = list_to_integer(filename:rootname(Left)), Offset = - case dets_ets_match_object(State, #message_store_entry - { file = Left, _ = '_' }) of + case dets_ets_match_object(State, #msg_location { + file = Left, _ = '_' }) of [] -> 0; L -> - [ #message_store_entry {file = Left, - offset = MaxOffset, - total_size = TotalSize} | _ ] = + [ #msg_location { file = Left, + offset = MaxOffset, + total_size = TotalSize} | _ ] = sort_msg_locations_by_offset(desc, L), MaxOffset + TotalSize end, @@ -1794,8 +1794,8 @@ load_messages(Left, [File|Files], 0 -> {VMAcc, VTSAcc}; RefCount -> true = dets_ets_insert_new( - State, #message_store_entry - { msg_id = MsgId, ref_count = RefCount, + State, #msg_location { + msg_id = MsgId, ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize, is_persistent = IsPersistent }), |
