summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-09-02 17:34:26 +0100
committerMatthew Sackman <matthew@lshift.net>2009-09-02 17:34:26 +0100
commitc5bfb74c942e004fbb33137b4ebc1deebbd7396d (patch)
tree2aab168e95da0a86030cef8e8d2269ec8cc1dcf8
parent198349ed4d2f06f98ba0bda1e05e183548b3e52b (diff)
downloadrabbitmq-server-git-c5bfb74c942e004fbb33137b4ebc1deebbd7396d.tar.gz
Made the disk queue start up in the same mode it was last running in.
This is slightly grim because I have to store some values in the mnesia table which then have to survive all the start up logic, so there are a couple of annoying 1-line changes elsewhere. However, it does indeed work. There was also one bool() -> boolean() fix in the memory_manager.
-rw-r--r--src/rabbit_disk_queue.erl63
-rw-r--r--src/rabbit_memory_manager.erl2
2 files changed, 45 insertions, 20 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index ad7c8df13e..6238088413 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -70,12 +70,14 @@
-define(READ_MODE, [read, read_ahead]).
-define(WRITE_MODE, [write, delayed_write]).
--define(SHUTDOWN_MESSAGE_KEY, shutdown_token).
+-define(SHUTDOWN_MESSAGE_KEY, {internal_token, shutdown}).
-define(SHUTDOWN_MESSAGE,
#dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY,
msg_id = infinity_and_beyond,
is_delivered = never
- }).
+ }).
+
+-define(BPR_KEY, {internal_token, bytes_per_record}).
-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs
-define(SYNC_INTERVAL, 5). %% milliseconds
@@ -386,13 +388,22 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
ok = filelib:ensure_dir(form_filename("nothing")),
Node = node(),
- ok = case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
- disc_copies) of
- {atomic, ok} -> ok;
- {aborted, {already_exists, rabbit_disk_queue, Node,
- disc_copies}} -> ok;
- E -> E
- end,
+ {Mode, MnesiaBPR, EtsBPR} =
+ case lists:member(Node, mnesia:table_info(rabbit_disk_queue,
+ disc_copies)) of
+ true ->
+ %% memory manager assumes we start oppressed. As we're
+ %% not, make sure it knows about it, by reporting zero
+ %% memory usage, which ensures it'll tell us to become
+ %% liberated
+ rabbit_memory_manager:report_memory(
+ self(), 0, false),
+ {ram_disk, undefined, undefined};
+ false ->
+ [#dq_msg_loc { msg_id = {MnesiaBPR1, EtsBPR1}}] =
+ mnesia:dirty_read(rabbit_disk_queue, ?BPR_KEY),
+ {disk_only, MnesiaBPR1, EtsBPR1}
+ end,
ok = detect_shutdown_state_and_adjust_delivered_flags(),
@@ -418,7 +429,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
State =
#dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk,
+ operation_mode = Mode,
file_summary = ets:new(?FILE_SUMMARY_ETS_NAME,
[set, private]),
sequences = ets:new(?SEQUENCE_ETS_NAME,
@@ -437,8 +448,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
[set, private]),
memory_report_timer_ref = undefined,
wordsize = erlang:system_info(wordsize),
- mnesia_bytes_per_record = undefined,
- ets_bytes_per_record = undefined
+ mnesia_bytes_per_record = MnesiaBPR,
+ ets_bytes_per_record = EtsBPR
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
@@ -617,7 +628,7 @@ start_memory_timer(State) ->
report_memory(Hibernating, State) ->
Bytes = memory_use(State),
rabbit_memory_manager:report_memory(self(), trunc(2.5 * Bytes),
- Hibernating).
+ Hibernating).
memory_use(#dqstate { operation_mode = ram_disk,
file_summary = FileSummary,
@@ -658,12 +669,21 @@ to_disk_only_mode(State = #dqstate { operation_mode = ram_disk,
EtsSize = lists:max([1, ets:info(MsgLocationEts, size)]),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_only_copies),
+ MnesiaBPR = MnesiaMemBytes / MnesiaSize,
+ EtsBPR = EtsMemBytes / EtsSize,
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ mnesia:write(rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = ?BPR_KEY,
+ msg_id = {MnesiaBPR, EtsBPR},
+ is_delivered = never }, write)
+ end),
ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
true = ets:delete_all_objects(MsgLocationEts),
garbage_collect(),
State #dqstate { operation_mode = disk_only,
- mnesia_bytes_per_record = MnesiaMemBytes / MnesiaSize,
- ets_bytes_per_record = EtsMemBytes / EtsSize }.
+ mnesia_bytes_per_record = MnesiaBPR,
+ ets_bytes_per_record = EtsBPR }.
to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) ->
State;
@@ -673,6 +693,7 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
rabbit_log:info("Converting disk queue to ram disk mode~n", []),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_copies),
+ ok = mnesia:dirty_delete(rabbit_disk_queue, ?BPR_KEY),
true = ets:from_dets(MsgLocationEts, MsgLocationDets),
ok = dets:delete_all_objects(MsgLocationDets),
garbage_collect(),
@@ -1546,12 +1567,12 @@ mark_message_delivered(Key = '$end_of_table', _N) ->
mark_message_delivered(Key, N) ->
[Obj] = mnesia:read(rabbit_disk_queue, Key, write),
M = case Obj #dq_msg_loc.is_delivered of
- true -> N;
false ->
ok = mnesia:write(rabbit_disk_queue,
Obj #dq_msg_loc { is_delivered = true },
write),
- N - 1
+ N - 1;
+ _ -> N %% needs to match 'never' as well as 'true'
end,
mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M).
@@ -1594,7 +1615,9 @@ load_from_disk(State) ->
{ok, State2}.
prune_mnesia_flush_batch(DeleteAcc) ->
- lists:foldl(fun (Key, ok) ->
+ lists:foldl(fun ({internal_token, _}, ok) ->
+ ok;
+ (Key, ok) ->
mnesia:dirty_delete(rabbit_disk_queue, Key)
end, ok, DeleteAcc).
@@ -1648,7 +1671,9 @@ extract_sequence_numbers(Sequences) ->
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
mnesia:foldl(
- fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
+ fun (#dq_msg_loc { queue_and_seq_id = {internal_token, _} },
+ true) -> true;
+ (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
NextWrite = SeqId + 1,
case ets:lookup(Sequences, Q) of
[] -> ets:insert_new(Sequences,
diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl
index b9d7bf7be2..3b637b3a70 100644
--- a/src/rabbit_memory_manager.erl
+++ b/src/rabbit_memory_manager.erl
@@ -53,7 +53,7 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
-spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok').
--spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok').
+-spec(report_memory/3 :: (pid(), non_neg_integer(), boolean()) -> 'ok').
-spec(info/0 :: () -> [{atom(), any()}]).
-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').