diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-09-02 17:34:26 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-09-02 17:34:26 +0100 |
| commit | c5bfb74c942e004fbb33137b4ebc1deebbd7396d (patch) | |
| tree | 2aab168e95da0a86030cef8e8d2269ec8cc1dcf8 | |
| parent | 198349ed4d2f06f98ba0bda1e05e183548b3e52b (diff) | |
| download | rabbitmq-server-git-c5bfb74c942e004fbb33137b4ebc1deebbd7396d.tar.gz | |
Made the disk queue start up in the same mode it was last running in.
This is slightly grim because I have to store some values in the mnesia table which then have to survive all the start up logic, so there are a couple of annoying 1-line changes elsewhere. However, it does indeed work.
There was also one bool() -> boolean() fix in the memory_manager.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_memory_manager.erl | 2 |
2 files changed, 45 insertions, 20 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index ad7c8df13e..6238088413 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -70,12 +70,14 @@ -define(READ_MODE, [read, read_ahead]). -define(WRITE_MODE, [write, delayed_write]). --define(SHUTDOWN_MESSAGE_KEY, shutdown_token). +-define(SHUTDOWN_MESSAGE_KEY, {internal_token, shutdown}). -define(SHUTDOWN_MESSAGE, #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY, msg_id = infinity_and_beyond, is_delivered = never - }). + }). + +-define(BPR_KEY, {internal_token, bytes_per_record}). -define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs -define(SYNC_INTERVAL, 5). %% milliseconds @@ -386,13 +388,22 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> ok = filelib:ensure_dir(form_filename("nothing")), Node = node(), - ok = case mnesia:change_table_copy_type(rabbit_disk_queue, Node, - disc_copies) of - {atomic, ok} -> ok; - {aborted, {already_exists, rabbit_disk_queue, Node, - disc_copies}} -> ok; - E -> E - end, + {Mode, MnesiaBPR, EtsBPR} = + case lists:member(Node, mnesia:table_info(rabbit_disk_queue, + disc_copies)) of + true -> + %% memory manager assumes we start oppressed. As we're + %% not, make sure it knows about it, by reporting zero + %% memory usage, which ensures it'll tell us to become + %% liberated + rabbit_memory_manager:report_memory( + self(), 0, false), + {ram_disk, undefined, undefined}; + false -> + [#dq_msg_loc { msg_id = {MnesiaBPR1, EtsBPR1}}] = + mnesia:dirty_read(rabbit_disk_queue, ?BPR_KEY), + {disk_only, MnesiaBPR1, EtsBPR1} + end, ok = detect_shutdown_state_and_adjust_delivered_flags(), @@ -418,7 +429,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, - operation_mode = ram_disk, + operation_mode = Mode, file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]), sequences = ets:new(?SEQUENCE_ETS_NAME, @@ -437,8 +448,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> [set, private]), memory_report_timer_ref = undefined, wordsize = erlang:system_info(wordsize), - mnesia_bytes_per_record = undefined, - ets_bytes_per_record = undefined + mnesia_bytes_per_record = MnesiaBPR, + ets_bytes_per_record = EtsBPR }, {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = @@ -617,7 +628,7 @@ start_memory_timer(State) -> report_memory(Hibernating, State) -> Bytes = memory_use(State), rabbit_memory_manager:report_memory(self(), trunc(2.5 * Bytes), - Hibernating). + Hibernating). memory_use(#dqstate { operation_mode = ram_disk, file_summary = FileSummary, @@ -658,12 +669,21 @@ to_disk_only_mode(State = #dqstate { operation_mode = ram_disk, EtsSize = lists:max([1, ets:info(MsgLocationEts, size)]), {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies), + MnesiaBPR = MnesiaMemBytes / MnesiaSize, + EtsBPR = EtsMemBytes / EtsSize, + ok = rabbit_misc:execute_mnesia_transaction( + fun() -> + mnesia:write(rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = ?BPR_KEY, + msg_id = {MnesiaBPR, EtsBPR}, + is_delivered = never }, write) + end), ok = dets:from_ets(MsgLocationDets, MsgLocationEts), true = ets:delete_all_objects(MsgLocationEts), garbage_collect(), State #dqstate { operation_mode = disk_only, - mnesia_bytes_per_record = MnesiaMemBytes / MnesiaSize, - ets_bytes_per_record = EtsMemBytes / EtsSize }. + mnesia_bytes_per_record = MnesiaBPR, + ets_bytes_per_record = EtsBPR }. to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) -> State; @@ -673,6 +693,7 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, rabbit_log:info("Converting disk queue to ram disk mode~n", []), {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies), + ok = mnesia:dirty_delete(rabbit_disk_queue, ?BPR_KEY), true = ets:from_dets(MsgLocationEts, MsgLocationDets), ok = dets:delete_all_objects(MsgLocationDets), garbage_collect(), @@ -1546,12 +1567,12 @@ mark_message_delivered(Key = '$end_of_table', _N) -> mark_message_delivered(Key, N) -> [Obj] = mnesia:read(rabbit_disk_queue, Key, write), M = case Obj #dq_msg_loc.is_delivered of - true -> N; false -> ok = mnesia:write(rabbit_disk_queue, Obj #dq_msg_loc { is_delivered = true }, write), - N - 1 + N - 1; + _ -> N %% needs to match 'never' as well as 'true' end, mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M). @@ -1594,7 +1615,9 @@ load_from_disk(State) -> {ok, State2}. prune_mnesia_flush_batch(DeleteAcc) -> - lists:foldl(fun (Key, ok) -> + lists:foldl(fun ({internal_token, _}, ok) -> + ok; + (Key, ok) -> mnesia:dirty_delete(rabbit_disk_queue, Key) end, ok, DeleteAcc). @@ -1648,7 +1671,9 @@ extract_sequence_numbers(Sequences) -> fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), mnesia:foldl( - fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> + fun (#dq_msg_loc { queue_and_seq_id = {internal_token, _} }, + true) -> true; + (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> NextWrite = SeqId + 1, case ets:lookup(Sequences, Q) of [] -> ets:insert_new(Sequences, diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl index b9d7bf7be2..3b637b3a70 100644 --- a/src/rabbit_memory_manager.erl +++ b/src/rabbit_memory_manager.erl @@ -53,7 +53,7 @@ -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). -spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok'). --spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok'). +-spec(report_memory/3 :: (pid(), non_neg_integer(), boolean()) -> 'ok'). -spec(info/0 :: () -> [{atom(), any()}]). -spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). |
