summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-09-03 14:33:37 +0100
committerMatthew Sackman <matthew@lshift.net>2009-09-03 14:33:37 +0100
commite32ffbe4f0e0de4aa8071e5f074117b963684f29 (patch)
tree271da70012926341c3c8bf6300043e281968f684
parented467dd9d88c219f74fa1736a94623c680f92785 (diff)
downloadrabbitmq-server-git-e32ffbe4f0e0de4aa8071e5f074117b963684f29.tar.gz
Switched to using a file to hold the disk_only data. Also found a bug where vaporise was wiping out the disk_only data (both as a file, and when it was in mnesia). The result was that if the dq was in disk_only mode before being vaporised, it would refuse to start up again. Thus vaporise now pushes the queue back to ram_disk mode if necessary, after wiping out the contents of the mnesia table. Finally, all tests pass again.
-rw-r--r--src/rabbit_disk_queue.erl49
1 files changed, 27 insertions, 22 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 961d1fe639..2d13a337a7 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -65,6 +65,7 @@
-define(CACHE_MAX_SIZE, 10485760).
-define(MAX_READ_FILE_HANDLES, 256).
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+-define(DISK_ONLY_MODE_FILE, "disk_only_stats.dat").
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read, read_ahead]).
@@ -77,8 +78,6 @@
is_delivered = never
}).
--define(BPR_KEY, {internal_token, bytes_per_record}).
-
-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(HIBERNATE_AFTER_MIN, 1000).
@@ -403,9 +402,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
self(), 0, false),
{ram_disk, undefined, undefined};
false ->
- [#dq_msg_loc { msg_id = {MnesiaBPR1, EtsBPR1}}] =
- mnesia:dirty_read(rabbit_disk_queue, ?BPR_KEY),
- {disk_only, MnesiaBPR1, EtsBPR1}
+ Path = form_filename(?DISK_ONLY_MODE_FILE),
+ case rabbit_misc:read_term_file(Path) of
+ {ok, [{MnesiaBPR1, EtsBPR1}]} ->
+ {disk_only, MnesiaBPR1, EtsBPR1};
+ {error, Reason} ->
+ throw({error, {cannot_read_disk_only_mode_file, Path,
+ Reason}})
+ end
end,
ok = detect_shutdown_state_and_adjust_delivered_flags(),
@@ -494,9 +498,14 @@ handle_call({foldl, Fun, Init, Q}, _From, State) ->
reply(Result, State1);
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
-handle_call(stop_vaporise, _From, State) ->
+handle_call(stop_vaporise, _From, State = #dqstate { operation_mode = Mode }) ->
State1 = shutdown(State),
{atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
+ {atomic, ok} = case Mode of
+ ram_disk -> {atomic, ok};
+ disk_only -> mnesia:change_table_copy_type(
+ rabbit_disk_queue, node(), disc_copies)
+ end,
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok, State1}; %% gen_server now calls terminate
handle_call(to_disk_only_mode, _From, State) ->
@@ -675,13 +684,12 @@ to_disk_only_mode(State = #dqstate { operation_mode = ram_disk,
disc_only_copies),
MnesiaBPR = MnesiaMemBytes / MnesiaSize,
EtsBPR = EtsMemBytes / EtsSize,
- ok = rabbit_misc:execute_mnesia_transaction(
- fun() ->
- mnesia:write(rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = ?BPR_KEY,
- msg_id = {MnesiaBPR, EtsBPR},
- is_delivered = never }, write)
- end),
+ Path = form_filename(?DISK_ONLY_MODE_FILE),
+ case rabbit_misc:write_term_file(Path, [{MnesiaBPR, EtsBPR}]) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_create_disk_only_mode_file, Path, Reason}})
+ end,
ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
true = ets:delete_all_objects(MsgLocationEts),
garbage_collect(),
@@ -697,6 +705,7 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
rabbit_log:info("Converting disk queue to ram disk mode~n", []),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_copies),
+ ok = file:delete(form_filename(?DISK_ONLY_MODE_FILE)),
true = ets:from_dets(MsgLocationEts, MsgLocationDets),
ok = dets:delete_all_objects(MsgLocationDets),
garbage_collect(),
@@ -1596,12 +1605,12 @@ mark_message_delivered(Key = '$end_of_table', _N) ->
mark_message_delivered(Key, N) ->
[Obj] = mnesia:read(rabbit_disk_queue, Key, write),
M = case Obj #dq_msg_loc.is_delivered of
- false ->
+ true -> N;
+ false ->
ok = mnesia:write(rabbit_disk_queue,
Obj #dq_msg_loc { is_delivered = true },
write),
- N - 1;
- _ -> N %% needs to match 'never' as well as 'true'
+ N - 1
end,
mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M).
@@ -1644,9 +1653,7 @@ load_from_disk(State) ->
{ok, State2}.
prune_mnesia_flush_batch(DeleteAcc) ->
- lists:foldl(fun ({internal_token, _}, ok) ->
- ok;
- (Key, ok) ->
+ lists:foldl(fun (Key, ok) ->
mnesia:dirty_delete(rabbit_disk_queue, Key)
end, ok, DeleteAcc).
@@ -1700,9 +1707,7 @@ extract_sequence_numbers(Sequences) ->
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
mnesia:foldl(
- fun (#dq_msg_loc { queue_and_seq_id = {internal_token, _} },
- true) -> true;
- (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
+ fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
NextWrite = SeqId + 1,
case ets:lookup(Sequences, Q) of
[] -> ets:insert_new(Sequences,