summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-07 07:43:59 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-07 07:43:59 +0100
commit28f974ef79de8a52d50a8441300c2b09082c14dd (patch)
treeb03a94805efa3a8f8573c3c14f2b8f6da5bc0027
parent47fd8d8869ec5710e38d00905f5528909b11c75e (diff)
downloadrabbitmq-server-git-28f974ef79de8a52d50a8441300c2b09082c14dd.tar.gz
generalise persistent flag to message attributes in rabbit_msg_store
This is just a renaming exercise, but it turns rabbit_msg_store into a general purpose message store.
-rw-r--r--src/rabbit_disk_queue.erl4
-rw-r--r--src/rabbit_msg_store.erl36
2 files changed, 19 insertions, 21 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 6beccf3a87..b786f03659 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -544,7 +544,7 @@ internal_fetch_attributes(Q, MarkDelivered, Advance,
case next(Q, MarkDelivered, Advance, State) of
empty -> empty;
{MsgId, IsDelivered, AckTag, Remaining} ->
- IsPersistent = rabbit_msg_store:is_persistent(MsgId, Store),
+ IsPersistent = rabbit_msg_store:attrs(MsgId, Store),
{MsgId, IsPersistent, IsDelivered, AckTag, Remaining}
end.
@@ -902,7 +902,7 @@ prune_mnesia(Store, Key, DeleteAcc, RemoveAcc, Len) ->
[#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] =
mnesia:dirty_read(rabbit_disk_queue, Key),
{DeleteAcc1, RemoveAcc1, Len1} =
- case rabbit_msg_store:is_persistent(MsgId, Store) of
+ case rabbit_msg_store:attrs(MsgId, Store) of
not_found ->
%% msg hasn't been found on disk, delete it
{[{Q, SeqId} | DeleteAcc], RemoveAcc, Len + 1};
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 427a669549..a1e9e17a1a 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_store).
--export([init/6, write/4, read/2, is_persistent/2, remove/2, release/2,
+-export([init/6, write/4, read/2, attrs/2, remove/2, release/2,
needs_sync/2, sync/1, cleanup/1, cache_info/1, memory/1,
ets_bpr/1, to_disk_only_mode/1, to_ram_disk_mode/1]).
@@ -57,7 +57,7 @@
}).
-record(msg_location,
- {msg_id, ref_count, file, offset, total_size, is_persistent}).
+ {msg_id, ref_count, file, offset, total_size, attrs}).
-record(file_summary,
{file, valid_total_size, contiguous_top, left, right}).
@@ -84,6 +84,7 @@
-type(ets_table() :: any()).
-type(msg_id() :: any()).
-type(msg() :: any()).
+-type(msg_attrs() :: any()).
-type(file_path() :: any()).
-type(io_device() :: any()).
@@ -109,9 +110,9 @@
non_neg_integer(), non_neg_integer(),
fun ((msg_id()) -> non_neg_integer()), non_neg_integer()) ->
msstate()).
--spec(write/4 :: (msg_id(), msg(), boolean(), msstate()) -> msstate()).
+-spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()).
-spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found').
--spec(is_persistent/2 :: (msg_id(), msstate()) -> boolean() | 'not_found').
+-spec(attrs/2 :: (msg_id(), msstate()) -> msg_attrs() | 'not_found').
-spec(remove/2 :: ([msg_id()], msstate()) -> msstate()).
-spec(release/2 :: ([msg_id()], msstate()) -> msstate()).
-spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()).
@@ -130,7 +131,7 @@
%% The components:
%%
%% MsgLocation: this is a (d)ets table which contains:
-%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent}
+%% {MsgId, RefCount, File, Offset, TotalSize, Attrs}
%% FileSummary: this is an ets table which contains:
%% {File, ValidTotalSize, ContiguousTop, Left, Right}
%%
@@ -317,7 +318,7 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun,
State1 #msstate { current_file_handle = FileHdl }.
-write(MsgId, Msg, IsPersistent,
+write(MsgId, Msg, Attrs,
State = #msstate { current_file_handle = CurHdl,
current_file_name = CurName,
current_offset = CurOffset,
@@ -325,13 +326,12 @@ write(MsgId, Msg, IsPersistent,
case dets_ets_lookup(State, MsgId) of
[] ->
%% New message, lots to do
- {ok, TotalSize} = rabbit_msg_file:append(
- CurHdl, MsgId, Msg, IsPersistent),
+ {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg, Attrs),
true = dets_ets_insert_new(
State, #msg_location {
msg_id = MsgId, ref_count = 1, file = CurName,
offset = CurOffset, total_size = TotalSize,
- is_persistent = IsPersistent }),
+ attrs = Attrs }),
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
right = undefined }] =
@@ -368,7 +368,7 @@ read(MsgId, State) ->
total_size = TotalSize }] ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {{ok, {MsgId, Msg, _IsPersistent}}, State1} =
+ {{ok, {MsgId, Msg, _Attrs}}, State1} =
with_read_handle_at(
File, Offset,
fun(Hdl) ->
@@ -399,13 +399,11 @@ read(MsgId, State) ->
end
end.
-is_persistent(MsgId, State) ->
+attrs(MsgId, State) ->
Objs = dets_ets_lookup(State, MsgId),
case Objs of
- [] ->
- not_found;
- [#msg_location { msg_id = MsgId, is_persistent = IsPersistent }] ->
- IsPersistent
+ [] -> not_found;
+ [#msg_location { msg_id = MsgId, attrs = Attrs }] -> Attrs
end.
remove(MsgIds, State = #msstate { current_file_name = CurName }) ->
@@ -790,7 +788,7 @@ verify_messages_referenced(RefCountFun, MsgIds) ->
scan_file_for_valid_messages_msg_ids(Dir, File) ->
{ok, Messages} = scan_file_for_valid_messages(Dir, File),
{ok, Messages,
- [MsgId || {MsgId, _IsPersistent, _TotalSize, _FileOffset} <- Messages]}.
+ [MsgId || {MsgId, _Attrs, _TotalSize, _FileOffset} <- Messages]}.
scan_file_for_valid_messages(Dir, File) ->
case open_file(Dir, File, ?READ_MODE) of
@@ -813,7 +811,7 @@ find_contiguous_block_prefix(List) ->
find_contiguous_block_prefix([], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds};
-find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset}
+find_contiguous_block_prefix([{MsgId, _Attrs, TotalSize, ExpectedOffset}
| Tail], ExpectedOffset, MsgIds) ->
ExpectedOffset1 = ExpectedOffset + TotalSize,
find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]);
@@ -845,7 +843,7 @@ load_messages(RefCountFun, Left, [File|Files],
State = #msstate { dir = Dir, file_summary = FileSummary }) ->
{ok, Messages} = scan_file_for_valid_messages(Dir, File),
{ValidMessages, ValidTotalSize} = lists:foldl(
- fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ fun (Obj = {MsgId, Attrs, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case RefCountFun(MsgId) of
0 -> {VMAcc, VTSAcc};
RefCount ->
@@ -854,7 +852,7 @@ load_messages(RefCountFun, Left, [File|Files],
msg_id = MsgId, ref_count = RefCount,
file = File, offset = Offset,
total_size = TotalSize,
- is_persistent = IsPersistent }),
+ attrs = Attrs }),
{[Obj | VMAcc], VTSAcc + TotalSize}
end
end, {[], 0}, Messages),