diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-13 12:09:54 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-13 12:09:54 +0100 |
| commit | 2f9dbde73fc57c2e35308fefd6fff56c73a46fb6 (patch) | |
| tree | 5dde546c7b67408c936e42d1b57a2c248085e3ef | |
| parent | 418af6d6efdcc791a8f5a308badcaab0808f3c3f (diff) | |
| download | rabbitmq-server-git-2f9dbde73fc57c2e35308fefd6fff56c73a46fb6.tar.gz | |
Undo merge from bug24038 which has now become INVALID.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process_utils.erl | 99 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 47 |
3 files changed, 59 insertions, 133 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3bcdf70694..53bdd3b296 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -21,6 +21,8 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). +-define(SYNC_INTERVAL, 25). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined, needs_confirming = false}). @@ -260,27 +262,37 @@ backing_queue_module(#amqqueue{arguments = Args}) -> _Nodes -> rabbit_mirror_queue_master end. +ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> - rabbit_amqqueue_process_utils:ensure_sync_timer( - fun sync_timer_getter/1, fun sync_timer_setter/2, State). - -stop_sync_timer(State) -> - rabbit_amqqueue_process_utils:stop_sync_timer( - fun sync_timer_getter/1, fun sync_timer_setter/2, State). - -sync_timer_getter(State) -> State#q.sync_timer_ref. -sync_timer_setter(Timer, State) -> State#q{sync_timer_ref = Timer}. + State. +stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> + State; +stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{sync_timer_ref = undefined}. + +ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> + {ok, TRef} = timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, + rabbit_amqqueue, update_ram_duration, + [self()]), + State#q{rate_timer_ref = TRef}; +ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> + State#q{rate_timer_ref = undefined}; ensure_rate_timer(State) -> - rabbit_amqqueue_process_utils:ensure_rate_timer( - fun rate_timer_getter/1, fun rate_timer_setter/2, State). - -stop_rate_timer(State) -> - rabbit_amqqueue_process_utils:stop_rate_timer( - fun rate_timer_getter/1, fun rate_timer_setter/2, State). + State. -rate_timer_getter(State) -> State#q.rate_timer_ref. -rate_timer_setter(Timer, State) -> State#q{rate_timer_ref = Timer}. +stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> + State; +stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> + State#q{rate_timer_ref = undefined}; +stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{rate_timer_ref = undefined}. stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; diff --git a/src/rabbit_amqqueue_process_utils.erl b/src/rabbit_amqqueue_process_utils.erl deleted file mode 100644 index feb2a79ca2..0000000000 --- a/src/rabbit_amqqueue_process_utils.erl +++ /dev/null @@ -1,99 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 201-2011 VMware, Inc. All rights reserved. -%% - --module(rabbit_amqqueue_process_utils). - --define(SYNC_INTERVAL, 25). %% milliseconds --define(RAM_DURATION_UPDATE_INTERVAL, 5000). - --export([backing_queue_pre_hibernate/2, - ensure_sync_timer/3, stop_sync_timer/3, - ensure_rate_timer/3, stop_rate_timer/3]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(bq_mod() :: atom()). --type(bq_state() :: any()). %% A good example of dialyzer's shortcomings - --type(queue_state() :: any()). %% Another such example. --type(getter(A) :: fun ((queue_state()) -> A)). --type(setter(A) :: fun ((A, queue_state()) -> queue_state())). - --type(tref() :: term()). %% Sigh. According to timer docs. - --spec(backing_queue_pre_hibernate/2 :: (bq_mod(), bq_state()) -> bq_state()). - --spec(ensure_sync_timer/3 :: (getter('undefined'|tref()), - setter('undefined'|tref()), - queue_state()) -> queue_state()). --spec(stop_sync_timer/3 :: (getter('undefined'|tref()), - setter('undefined'|tref()), - queue_state()) -> queue_state()). - --spec(ensure_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()), - setter('undefined'|'just_measured'|tref()), - queue_state()) -> queue_state()). --spec(stop_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()), - setter('undefined'|'just_measured'|tref()), - queue_state()) -> queue_state()). - --endif. - -%%---------------------------------------------------------------------------- - -backing_queue_pre_hibernate(BQ, BQS) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - BQ:handle_pre_hibernate(BQS2). - -ensure_sync_timer(Getter, Setter, State) -> - case Getter(State) of - undefined -> {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, - sync_timeout, [self()]), - Setter(TRef, State); - _TRef -> State - end. - -stop_sync_timer(Getter, Setter, State) -> - case Getter(State) of - undefined -> State; - TRef -> {ok, cancel} = timer:cancel(TRef), - Setter(undefined, State) - end. - -ensure_rate_timer(Getter, Setter, State) -> - case Getter(State) of - undefined -> {ok, TRef} = - timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue, - update_ram_duration, [self()]), - Setter(TRef, State); - just_measured -> Setter(undefined, State); - _TRef -> State - end. - -stop_rate_timer(Getter, Setter, State) -> - case Getter(State) of - undefined -> State; - just_measured -> Setter(undefined, State); - TRef -> {ok, cancel} = timer:cancel(TRef), - Setter(undefined, State) - end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 89b8971cf1..e3cfe54dcc 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -47,6 +47,9 @@ -include("rabbit.hrl"). -include("gm_specs.hrl"). +-define(SYNC_INTERVAL, 25). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). + -record(state, { q, gm, master_node, @@ -478,27 +481,37 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> backing_queue_idle_timeout(State = #state { backing_queue = BQ }) -> run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State). +ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + State #state { sync_timer_ref = TRef }; ensure_sync_timer(State) -> - rabbit_amqqueue_process_utils:ensure_sync_timer( - fun sync_timer_getter/1, fun sync_timer_setter/2, State). - -stop_sync_timer(State) -> - rabbit_amqqueue_process_utils:stop_sync_timer( - fun sync_timer_getter/1, fun sync_timer_setter/2, State). - -sync_timer_getter(State) -> State#state.sync_timer_ref. -sync_timer_setter(Timer, State) -> State#state{sync_timer_ref = Timer}. + State. +stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> + State; +stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { sync_timer_ref = undefined }. + +ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, + rabbit_amqqueue, update_ram_duration, + [self()]), + State #state { rate_timer_ref = TRef }; +ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; ensure_rate_timer(State) -> - rabbit_amqqueue_process_utils:ensure_rate_timer( - fun rate_timer_getter/1, fun rate_timer_setter/2, State). - -stop_rate_timer(State) -> - rabbit_amqqueue_process_utils:stop_rate_timer( - fun rate_timer_getter/1, fun rate_timer_setter/2, State). + State. -rate_timer_getter(State) -> State#state.rate_timer_ref. -rate_timer_setter(Timer, State) -> State#state{rate_timer_ref = Timer}. +stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> + State; +stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { rate_timer_ref = undefined }. maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, |
