diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-09-03 14:33:37 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-09-03 14:33:37 +0100 |
| commit | e32ffbe4f0e0de4aa8071e5f074117b963684f29 (patch) | |
| tree | 271da70012926341c3c8bf6300043e281968f684 | |
| parent | ed467dd9d88c219f74fa1736a94623c680f92785 (diff) | |
| download | rabbitmq-server-git-e32ffbe4f0e0de4aa8071e5f074117b963684f29.tar.gz | |
Switched to using a file to hold the disk_only data. Also found a bug where vaporise was wiping out the disk_only data (both as a file, and when it was in mnesia). The result was that if the dq was in disk_only mode before being vaporised, it would refuse to start up again. Thus vaporise now pushes the queue back to ram_disk mode if necessary, after wiping out the contents of the mnesia table. Finally, all tests pass again.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 49 |
1 files changed, 27 insertions, 22 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 961d1fe639..2d13a337a7 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -65,6 +65,7 @@ -define(CACHE_MAX_SIZE, 10485760). -define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). +-define(DISK_ONLY_MODE_FILE, "disk_only_stats.dat"). -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read, read_ahead]). @@ -77,8 +78,6 @@ is_delivered = never }). --define(BPR_KEY, {internal_token, bytes_per_record}). - -define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs -define(SYNC_INTERVAL, 5). %% milliseconds -define(HIBERNATE_AFTER_MIN, 1000). @@ -403,9 +402,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> self(), 0, false), {ram_disk, undefined, undefined}; false -> - [#dq_msg_loc { msg_id = {MnesiaBPR1, EtsBPR1}}] = - mnesia:dirty_read(rabbit_disk_queue, ?BPR_KEY), - {disk_only, MnesiaBPR1, EtsBPR1} + Path = form_filename(?DISK_ONLY_MODE_FILE), + case rabbit_misc:read_term_file(Path) of + {ok, [{MnesiaBPR1, EtsBPR1}]} -> + {disk_only, MnesiaBPR1, EtsBPR1}; + {error, Reason} -> + throw({error, {cannot_read_disk_only_mode_file, Path, + Reason}}) + end end, ok = detect_shutdown_state_and_adjust_delivered_flags(), @@ -494,9 +498,14 @@ handle_call({foldl, Fun, Init, Q}, _From, State) -> reply(Result, State1); handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate -handle_call(stop_vaporise, _From, State) -> +handle_call(stop_vaporise, _From, State = #dqstate { operation_mode = Mode }) -> State1 = shutdown(State), {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), + {atomic, ok} = case Mode of + ram_disk -> {atomic, ok}; + disk_only -> mnesia:change_table_copy_type( + rabbit_disk_queue, node(), disc_copies) + end, lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), {stop, normal, ok, State1}; %% gen_server now calls terminate handle_call(to_disk_only_mode, _From, State) -> @@ -675,13 +684,12 @@ to_disk_only_mode(State = #dqstate { operation_mode = ram_disk, disc_only_copies), MnesiaBPR = MnesiaMemBytes / MnesiaSize, EtsBPR = EtsMemBytes / EtsSize, - ok = rabbit_misc:execute_mnesia_transaction( - fun() -> - mnesia:write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = ?BPR_KEY, - msg_id = {MnesiaBPR, EtsBPR}, - is_delivered = never }, write) - end), + Path = form_filename(?DISK_ONLY_MODE_FILE), + case rabbit_misc:write_term_file(Path, [{MnesiaBPR, EtsBPR}]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_create_disk_only_mode_file, Path, Reason}}) + end, ok = dets:from_ets(MsgLocationDets, MsgLocationEts), true = ets:delete_all_objects(MsgLocationEts), garbage_collect(), @@ -697,6 +705,7 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, rabbit_log:info("Converting disk queue to ram disk mode~n", []), {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies), + ok = file:delete(form_filename(?DISK_ONLY_MODE_FILE)), true = ets:from_dets(MsgLocationEts, MsgLocationDets), ok = dets:delete_all_objects(MsgLocationDets), garbage_collect(), @@ -1596,12 +1605,12 @@ mark_message_delivered(Key = '$end_of_table', _N) -> mark_message_delivered(Key, N) -> [Obj] = mnesia:read(rabbit_disk_queue, Key, write), M = case Obj #dq_msg_loc.is_delivered of - false -> + true -> N; + false -> ok = mnesia:write(rabbit_disk_queue, Obj #dq_msg_loc { is_delivered = true }, write), - N - 1; - _ -> N %% needs to match 'never' as well as 'true' + N - 1 end, mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M). @@ -1644,9 +1653,7 @@ load_from_disk(State) -> {ok, State2}. prune_mnesia_flush_batch(DeleteAcc) -> - lists:foldl(fun ({internal_token, _}, ok) -> - ok; - (Key, ok) -> + lists:foldl(fun (Key, ok) -> mnesia:dirty_delete(rabbit_disk_queue, Key) end, ok, DeleteAcc). @@ -1700,9 +1707,7 @@ extract_sequence_numbers(Sequences) -> fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), mnesia:foldl( - fun (#dq_msg_loc { queue_and_seq_id = {internal_token, _} }, - true) -> true; - (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> + fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> NextWrite = SeqId + 1, case ets:lookup(Sequences, Q) of [] -> ets:insert_new(Sequences, |
