diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-08 14:23:57 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-08 14:23:57 +0100 |
| commit | d363b6cf58759d8457799d1917e1bc97b85f5a0f (patch) | |
| tree | acbec88bd372563bacdacf45b92c4967b73da4e3 | |
| parent | 7da3859257cf6860fb742fc21e581bbd504ff6be (diff) | |
| parent | 04bf045f42e118e3b19b448730be6b3124b3349a (diff) | |
| download | rabbitmq-server-git-d363b6cf58759d8457799d1917e1bc97b85f5a0f.tar.gz | |
Merging bug24038 into bug23554 and hook slave up to the new apis. Overall, it doesn't save that many lines, but it allows the slave to not duplicate -defines which were also in amqqueue_process, which is well worth the effort
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process_utils.erl | 99 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 57 |
3 files changed, 137 insertions, 73 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a8b19b72de..3bcdf70694 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -21,8 +21,6 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(SYNC_INTERVAL, 25). %% milliseconds --define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined, needs_confirming = false}). @@ -262,37 +260,27 @@ backing_queue_module(#amqqueue{arguments = Args}) -> _Nodes -> rabbit_mirror_queue_master end. -ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), - State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> - State. + rabbit_amqqueue_process_utils:ensure_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +stop_sync_timer(State) -> + rabbit_amqqueue_process_utils:stop_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +sync_timer_getter(State) -> State#q.sync_timer_ref. +sync_timer_setter(Timer, State) -> State#q{sync_timer_ref = Timer}. -stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> - State; -stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#q{sync_timer_ref = undefined}. - -ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), - State#q{rate_timer_ref = TRef}; -ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; ensure_rate_timer(State) -> - State. + rabbit_amqqueue_process_utils:ensure_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). -stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> - State; -stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; -stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#q{rate_timer_ref = undefined}. +stop_rate_timer(State) -> + rabbit_amqqueue_process_utils:stop_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). + +rate_timer_getter(State) -> State#q.rate_timer_ref. +rate_timer_setter(Timer, State) -> State#q{rate_timer_ref = Timer}. stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; @@ -1234,15 +1222,11 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS, stats_timer = StatsTimer}) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - BQS3 = BQ:handle_pre_hibernate(BQS2), + BQS1 = rabbit_amqqueue_process_utils:backing_queue_pre_hibernate(BQ, BQS), rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - backing_queue_state = BQS3}, + backing_queue_state = BQS1}, {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_amqqueue_process_utils.erl b/src/rabbit_amqqueue_process_utils.erl new file mode 100644 index 0000000000..feb2a79ca2 --- /dev/null +++ b/src/rabbit_amqqueue_process_utils.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 201-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_amqqueue_process_utils). + +-define(SYNC_INTERVAL, 25). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). + +-export([backing_queue_pre_hibernate/2, + ensure_sync_timer/3, stop_sync_timer/3, + ensure_rate_timer/3, stop_rate_timer/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(bq_mod() :: atom()). +-type(bq_state() :: any()). %% A good example of dialyzer's shortcomings + +-type(queue_state() :: any()). %% Another such example. +-type(getter(A) :: fun ((queue_state()) -> A)). +-type(setter(A) :: fun ((A, queue_state()) -> queue_state())). + +-type(tref() :: term()). %% Sigh. According to timer docs. + +-spec(backing_queue_pre_hibernate/2 :: (bq_mod(), bq_state()) -> bq_state()). + +-spec(ensure_sync_timer/3 :: (getter('undefined'|tref()), + setter('undefined'|tref()), + queue_state()) -> queue_state()). +-spec(stop_sync_timer/3 :: (getter('undefined'|tref()), + setter('undefined'|tref()), + queue_state()) -> queue_state()). + +-spec(ensure_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()), + setter('undefined'|'just_measured'|tref()), + queue_state()) -> queue_state()). +-spec(stop_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()), + setter('undefined'|'just_measured'|tref()), + queue_state()) -> queue_state()). + +-endif. + +%%---------------------------------------------------------------------------- + +backing_queue_pre_hibernate(BQ, BQS) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + BQ:handle_pre_hibernate(BQS2). + +ensure_sync_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, + sync_timeout, [self()]), + Setter(TRef, State); + _TRef -> State + end. + +stop_sync_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> State; + TRef -> {ok, cancel} = timer:cancel(TRef), + Setter(undefined, State) + end. + +ensure_rate_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> {ok, TRef} = + timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue, + update_ram_duration, [self()]), + Setter(TRef, State); + just_measured -> Setter(undefined, State); + _TRef -> State + end. + +stop_rate_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> State; + just_measured -> Setter(undefined, State); + TRef -> {ok, cancel} = timer:cancel(TRef), + Setter(undefined, State) + end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8ca82fa1fe..70b5c43da6 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -58,9 +58,6 @@ msg_id_status }). --define(SYNC_INTERVAL, 25). %% milliseconds --define(RAM_DURATION_UPDATE_INTERVAL, 5000). - start_link(Q) -> gen_server2:start_link(?MODULE, [Q], []). @@ -232,13 +229,8 @@ code_change(_OldVsn, State, _Extra) -> handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - %% mainly copied from amqqueue_process - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - BQS3 = BQ:handle_pre_hibernate(BQS2), - {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. + BQS1 = rabbit_amqqueue_process_utils:backing_queue_pre_hibernate(BQ, BQS), + {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS1 })}. prioritise_call(Msg, _From, _State) -> case Msg of @@ -480,41 +472,30 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> false -> {stop_sync_timer(State1), hibernate} end. -%% copied+pasted from amqqueue_process backing_queue_idle_timeout(State = #state { backing_queue = BQ }) -> run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State). -ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), - State #state { sync_timer_ref = TRef }; ensure_sync_timer(State) -> - State. + rabbit_amqqueue_process_utils:ensure_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +stop_sync_timer(State) -> + rabbit_amqqueue_process_utils:stop_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +sync_timer_getter(State) -> State#state.sync_timer_ref. +sync_timer_setter(Timer, State) -> State#state{sync_timer_ref = Timer}. -stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> - State; -stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), - State #state { sync_timer_ref = undefined }. - -ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), - State #state { rate_timer_ref = TRef }; -ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> - State #state { rate_timer_ref = undefined }; ensure_rate_timer(State) -> - State. + rabbit_amqqueue_process_utils:ensure_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). + +stop_rate_timer(State) -> + rabbit_amqqueue_process_utils:stop_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). -stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> - State; -stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> - State #state { rate_timer_ref = undefined }; -stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), - State #state { rate_timer_ref = undefined }. +rate_timer_getter(State) -> State#state.rate_timer_ref. +rate_timer_setter(Timer, State) -> State#state{rate_timer_ref = Timer}. maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, |
