summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-24 16:53:35 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-24 16:53:35 +0100
commitc3155a188e0e845294340210b2ad6f5e1af4a69d (patch)
tree9cb3706c9042d05b99d4f2cc941f0cc90c739ad4 /src
parentdc7c6605ab76a2bd32261881f8eac1276df392cb (diff)
downloadrabbitmq-server-git-c3155a188e0e845294340210b2ad6f5e1af4a69d.tar.gz
handle_cache => file_handle_cache.
Also switched to using the #message_store_entry record throughout.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl111
-rw-r--r--src/rabbit_file_handle_cache.erl (renamed from src/rabbit_handle_cache.erl)2
2 files changed, 60 insertions, 53 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 33a1aaa830..aee91f5db0 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -397,12 +397,13 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{min_no_slots, 1024*1024},
%% man says this should be <= 32M. But it works...
{max_no_slots, 30*1024*1024},
- {type, set}
+ {type, set},
+ {keypos, 2}
]),
%% it would be better to have this as private, but dets:from_ets/2
%% seems to blow up if it is set private
- MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
+ MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected, {keypos, 2}]),
InitName = "0" ++ ?FILE_EXTENSION,
State =
@@ -419,7 +420,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
current_offset = 0,
current_dirty = false,
file_size_limit = FileSizeLimit,
- read_file_hc_cache = rabbit_handle_cache:init(
+ read_file_hc_cache = rabbit_file_handle_cache:init(
ReadFileHandlesLimit,
[read, raw, binary, read_ahead]),
on_sync_txns = [],
@@ -588,7 +589,7 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
_ -> sync_current_file_handle(State),
file:close(FileHdl)
end,
- HC1 = rabbit_handle_cache:close_all(HC),
+ HC1 = rabbit_file_handle_cache:close_all(HC),
State1 #dqstate { current_file_handle = undefined,
current_dirty = false,
read_file_hc_cache = HC1,
@@ -763,7 +764,7 @@ with_read_handle_at(File, Offset, Fun, State =
end,
FilePath = form_filename(File),
{Result, HC1} =
- rabbit_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC),
+ rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC),
{Result, State1 #dqstate { read_file_hc_cache = HC1 }}.
sequence_lookup(Sequences, Q) ->
@@ -913,7 +914,7 @@ update_message_attributes(Q, SeqId, MarkDelivered, State) ->
[Obj =
#dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] =
mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
- [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
+ [StoreEntry = #message_store_entry { msg_id = MsgId }] =
dets_ets_lookup(State, MsgId),
ok = case {IsDelivered, MarkDelivered} of
{true, _} -> ok;
@@ -922,10 +923,7 @@ update_message_attributes(Q, SeqId, MarkDelivered, State) ->
mnesia:dirty_write(rabbit_disk_queue,
Obj #dq_msg_loc {is_delivered = true})
end,
- {{MsgId, SeqId}, IsDelivered,
- #message_store_entry { msg_id = MsgId, ref_count = RefCount, file = File,
- offset = Offset, total_size = TotalSize,
- is_persistent = IsPersistent }}.
+ {{MsgId, SeqId}, IsDelivered, StoreEntry}.
internal_foldl(Q, Fun, Init, State) ->
State1 = #dqstate { sequences = Sequences } =
@@ -974,7 +972,9 @@ remove_message(MsgId, Files,
State = #dqstate { file_summary = FileSummary,
current_file_name = CurName
}) ->
- [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
+ [StoreEntry =
+ #message_store_entry { msg_id = MsgId, ref_count = RefCount, file = File,
+ offset = Offset, total_size = TotalSize }] =
dets_ets_lookup(State, MsgId),
case RefCount of
1 ->
@@ -993,8 +993,8 @@ remove_message(MsgId, Files,
end;
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
- ok = dets_ets_insert(State, {MsgId, RefCount - 1, File, Offset,
- TotalSize, IsPersistent}),
+ ok = dets_ets_insert(State, StoreEntry #message_store_entry
+ { ref_count = RefCount - 1 }),
Files
end.
@@ -1011,8 +1011,10 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
{ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message),
IsPersistent),
true = dets_ets_insert_new
- (State, {MsgId, 1, CurName,
- CurOffset, TotalSize, IsPersistent}),
+ (State, #message_store_entry
+ { msg_id = MsgId, ref_count = 1, file = CurName,
+ offset = CurOffset, total_size = TotalSize,
+ is_persistent = IsPersistent }),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
ets:lookup(FileSummary, CurName),
ValidTotalSize1 = ValidTotalSize + TotalSize +
@@ -1028,10 +1030,11 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
maybe_roll_to_new_file(
NextOffset, State #dqstate {current_offset = NextOffset,
current_dirty = true});
- [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] ->
+ [StoreEntry =
+ #message_store_entry { msg_id = MsgId, ref_count = RefCount }] ->
%% We already know about it, just update counter
- ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
- Offset, TotalSize, IsPersistent}),
+ ok = dets_ets_insert(State, StoreEntry #message_store_entry
+ { ref_count = RefCount + 1 }),
{ok, State}
end.
@@ -1043,8 +1046,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
}) ->
NeedsSync = IsDirty andalso
lists:any(fun ({MsgId, _IsDelivered}) ->
- [{MsgId, _RefCount, File, Offset,
- _TotalSize, _IsPersistent}] =
+ [#message_store_entry { msg_id = MsgId, file = File,
+ offset = Offset }] =
dets_ets_lookup(State, MsgId),
File =:= CurFile andalso Offset >= SyncOffset
end, PubMsgIds),
@@ -1205,8 +1208,7 @@ internal_delete_queue(Q, State) ->
Objs = mnesia:dirty_match_object(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, '_'},
- msg_id = '_',
- is_delivered = '_'
+ _ = '_'
}),
MsgSeqIds =
lists:map(
@@ -1330,12 +1332,13 @@ adjust_meta_and_combine(
true -> {false, State}
end.
-sort_msg_locations_by_offset(Asc, List) ->
- Comp = case Asc of
- true -> fun erlang:'<'/2;
- false -> fun erlang:'>'/2
+sort_msg_locations_by_offset(Dir, List) ->
+ Comp = case Dir of
+ asc -> fun erlang:'<'/2;
+ desc -> fun erlang:'>'/2
end,
- lists:sort(fun ({_, _, _, OffA, _, _}, {_, _, _, OffB, _, _}) ->
+ lists:sort(fun (#message_store_entry { offset = OffA },
+ #message_store_entry { offset = OffB }) ->
Comp(OffA, OffB)
end, List).
@@ -1374,7 +1377,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
read_ahead, delayed_write]),
Worklist =
lists:dropwhile(
- fun ({_, _, _, Offset, _, _})
+ fun (#message_store_entry { offset = Offset })
when Offset /= DestinationContiguousTop ->
%% it cannot be that Offset ==
%% DestinationContiguousTop because if it
@@ -1386,9 +1389,9 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
%% as we require, however, we need to
%% enforce it anyway
end, sort_msg_locations_by_offset(
- true, dets_ets_match_object(State,
- {'_', '_', Destination,
- '_', '_', '_'}))),
+ asc, dets_ets_match_object(
+ State, #message_store_entry
+ { file = Destination, _ = '_' }))),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
DestinationHdl, TmpHdl, Destination, State),
@@ -1408,9 +1411,8 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
end,
SourceWorkList =
sort_msg_locations_by_offset(
- true, dets_ets_match_object(State,
- {'_', '_', Source,
- '_', '_', '_'})),
+ asc, dets_ets_match_object(State, #message_store_entry
+ { file = Source, _ = '_' })),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
@@ -1424,15 +1426,16 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, State) ->
{FinalOffset, BlockStart1, BlockEnd1} =
lists:foldl(
- fun ({MsgId, RefCount, _Source, Offset, TotalSize, IsPersistent},
+ fun (StoreEntry = #message_store_entry { offset = Offset,
+ total_size = TotalSize },
{CurOffset, BlockStart, BlockEnd}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
%% update MsgLocationDets to reflect change of file and offset
- ok = dets_ets_insert
- (State, {MsgId, RefCount, Destination,
- CurOffset, TotalSize, IsPersistent}),
+ ok = dets_ets_insert (State, StoreEntry #message_store_entry
+ { file = Destination,
+ offset = CurOffset }),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
%% base case, called only for the first list elem
@@ -1459,7 +1462,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
ok.
close_file(File, State = #dqstate { read_file_hc_cache = HC }) ->
- HC1 = rabbit_handle_cache:close_file(form_filename(File), HC),
+ HC1 = rabbit_file_handle_cache:close_file(form_filename(File), HC),
State #dqstate { read_file_hc_cache = HC1 }.
delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
@@ -1545,10 +1548,10 @@ prune_mnesia(State, Key, Files, DeleteAcc, Len) ->
[] ->
%% msg hasn't been found on disk, delete it
{[{Q, SeqId} | DeleteAcc], Files, Len + 1};
- [{MsgId, _RefCount, _File, _Offset, _TotalSize, true}] ->
+ [#message_store_entry { msg_id = MsgId, is_persistent = true }] ->
%% msg is persistent, keep it
{DeleteAcc, Files, Len};
- [{MsgId, _RefCount, _File, _Offset, _TotalSize, false}] ->
+ [#message_store_entry { msg_id = MsgId, is_persistent = false}] ->
%% msg is not persistent, delete it
Files2 = remove_message(MsgId, Files, State),
{[{Q, SeqId} | DeleteAcc], Files2, Len + 1}
@@ -1645,11 +1648,14 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
load_messages(Left, [], State) ->
Num = list_to_integer(filename:rootname(Left)),
Offset =
- case dets_ets_match_object(State, {'_', '_', Left, '_', '_', '_'}) of
+ case dets_ets_match_object(State, #message_store_entry
+ { file = Left, _ = '_' }) of
[] -> 0;
L ->
- [ {_MsgId, _RefCount, Left, MaxOffset, TotalSize, _IsPersistent}
- | _ ] = sort_msg_locations_by_offset(false, L),
+ [ #message_store_entry {file = Left,
+ offset = MaxOffset,
+ total_size = TotalSize} | _ ] =
+ sort_msg_locations_by_offset(desc, L),
MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
end,
State #dqstate { current_file_num = Num, current_file_name = Left,
@@ -1663,15 +1669,17 @@ load_messages(Left, [File|Files],
case length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_'
- },
+ _ = '_'
+ },
msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true = dets_ets_insert_new
- (State, {MsgId, RefCount, File,
- Offset, TotalSize, IsPersistent}),
+ (State, #message_store_entry
+ { msg_id = MsgId, ref_count = RefCount,
+ file = File, offset = Offset,
+ total_size = TotalSize,
+ is_persistent = IsPersistent }),
{[Obj | VMAcc],
VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
}
@@ -1702,9 +1710,8 @@ verify_messages_in_mnesia(MsgIds) ->
true = 0 < length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_'
- },
+ _ = '_'
+ },
msg_id))
end, MsgIds).
diff --git a/src/rabbit_handle_cache.erl b/src/rabbit_file_handle_cache.erl
index fcd79269ef..83acffd03e 100644
--- a/src/rabbit_handle_cache.erl
+++ b/src/rabbit_file_handle_cache.erl
@@ -29,7 +29,7 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_handle_cache).
+-module(rabbit_file_handle_cache).
-export([init/2, close_all/1, close_file/2, with_file_handle_at/4]).