summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-04-13 12:09:54 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-04-13 12:09:54 +0100
commit2f9dbde73fc57c2e35308fefd6fff56c73a46fb6 (patch)
tree5dde546c7b67408c936e42d1b57a2c248085e3ef
parent418af6d6efdcc791a8f5a308badcaab0808f3c3f (diff)
downloadrabbitmq-server-git-2f9dbde73fc57c2e35308fefd6fff56c73a46fb6.tar.gz
Undo merge from bug24038 which has now become INVALID.
-rw-r--r--src/rabbit_amqqueue_process.erl46
-rw-r--r--src/rabbit_amqqueue_process_utils.erl99
-rw-r--r--src/rabbit_mirror_queue_slave.erl47
3 files changed, 59 insertions, 133 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3bcdf70694..53bdd3b296 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -21,6 +21,8 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
+-define(SYNC_INTERVAL, 25). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(BASE_MESSAGE_PROPERTIES,
#message_properties{expiry = undefined, needs_confirming = false}).
@@ -260,27 +262,37 @@ backing_queue_module(#amqqueue{arguments = Args}) ->
_Nodes -> rabbit_mirror_queue_master
end.
+ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
- rabbit_amqqueue_process_utils:ensure_sync_timer(
- fun sync_timer_getter/1, fun sync_timer_setter/2, State).
-
-stop_sync_timer(State) ->
- rabbit_amqqueue_process_utils:stop_sync_timer(
- fun sync_timer_getter/1, fun sync_timer_setter/2, State).
-
-sync_timer_getter(State) -> State#q.sync_timer_ref.
-sync_timer_setter(Timer, State) -> State#q{sync_timer_ref = Timer}.
+ State.
+stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
+ State;
+stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#q{sync_timer_ref = undefined}.
+
+ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
+ {ok, TRef} = timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ rabbit_amqqueue, update_ram_duration,
+ [self()]),
+ State#q{rate_timer_ref = TRef};
+ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
+ State#q{rate_timer_ref = undefined};
ensure_rate_timer(State) ->
- rabbit_amqqueue_process_utils:ensure_rate_timer(
- fun rate_timer_getter/1, fun rate_timer_setter/2, State).
-
-stop_rate_timer(State) ->
- rabbit_amqqueue_process_utils:stop_rate_timer(
- fun rate_timer_getter/1, fun rate_timer_setter/2, State).
+ State.
-rate_timer_getter(State) -> State#q.rate_timer_ref.
-rate_timer_setter(Timer, State) -> State#q{rate_timer_ref = Timer}.
+stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
+ State;
+stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
+ State#q{rate_timer_ref = undefined};
+stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#q{rate_timer_ref = undefined}.
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
diff --git a/src/rabbit_amqqueue_process_utils.erl b/src/rabbit_amqqueue_process_utils.erl
deleted file mode 100644
index feb2a79ca2..0000000000
--- a/src/rabbit_amqqueue_process_utils.erl
+++ /dev/null
@@ -1,99 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 201-2011 VMware, Inc. All rights reserved.
-%%
-
--module(rabbit_amqqueue_process_utils).
-
--define(SYNC_INTERVAL, 25). %% milliseconds
--define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-
--export([backing_queue_pre_hibernate/2,
- ensure_sync_timer/3, stop_sync_timer/3,
- ensure_rate_timer/3, stop_rate_timer/3]).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(bq_mod() :: atom()).
--type(bq_state() :: any()). %% A good example of dialyzer's shortcomings
-
--type(queue_state() :: any()). %% Another such example.
--type(getter(A) :: fun ((queue_state()) -> A)).
--type(setter(A) :: fun ((A, queue_state()) -> queue_state())).
-
--type(tref() :: term()). %% Sigh. According to timer docs.
-
--spec(backing_queue_pre_hibernate/2 :: (bq_mod(), bq_state()) -> bq_state()).
-
--spec(ensure_sync_timer/3 :: (getter('undefined'|tref()),
- setter('undefined'|tref()),
- queue_state()) -> queue_state()).
--spec(stop_sync_timer/3 :: (getter('undefined'|tref()),
- setter('undefined'|tref()),
- queue_state()) -> queue_state()).
-
--spec(ensure_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()),
- setter('undefined'|'just_measured'|tref()),
- queue_state()) -> queue_state()).
--spec(stop_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()),
- setter('undefined'|'just_measured'|tref()),
- queue_state()) -> queue_state()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-backing_queue_pre_hibernate(BQ, BQS) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- BQ:handle_pre_hibernate(BQS2).
-
-ensure_sync_timer(Getter, Setter, State) ->
- case Getter(State) of
- undefined -> {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue,
- sync_timeout, [self()]),
- Setter(TRef, State);
- _TRef -> State
- end.
-
-stop_sync_timer(Getter, Setter, State) ->
- case Getter(State) of
- undefined -> State;
- TRef -> {ok, cancel} = timer:cancel(TRef),
- Setter(undefined, State)
- end.
-
-ensure_rate_timer(Getter, Setter, State) ->
- case Getter(State) of
- undefined -> {ok, TRef} =
- timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue,
- update_ram_duration, [self()]),
- Setter(TRef, State);
- just_measured -> Setter(undefined, State);
- _TRef -> State
- end.
-
-stop_rate_timer(Getter, Setter, State) ->
- case Getter(State) of
- undefined -> State;
- just_measured -> Setter(undefined, State);
- TRef -> {ok, cancel} = timer:cancel(TRef),
- Setter(undefined, State)
- end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 89b8971cf1..e3cfe54dcc 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -47,6 +47,9 @@
-include("rabbit.hrl").
-include("gm_specs.hrl").
+-define(SYNC_INTERVAL, 25). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+
-record(state, { q,
gm,
master_node,
@@ -478,27 +481,37 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
backing_queue_idle_timeout(State = #state { backing_queue = BQ }) ->
run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State).
+ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ State #state { sync_timer_ref = TRef };
ensure_sync_timer(State) ->
- rabbit_amqqueue_process_utils:ensure_sync_timer(
- fun sync_timer_getter/1, fun sync_timer_setter/2, State).
-
-stop_sync_timer(State) ->
- rabbit_amqqueue_process_utils:stop_sync_timer(
- fun sync_timer_getter/1, fun sync_timer_setter/2, State).
-
-sync_timer_getter(State) -> State#state.sync_timer_ref.
-sync_timer_setter(Timer, State) -> State#state{sync_timer_ref = Timer}.
+ State.
+stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ State;
+stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { sync_timer_ref = undefined }.
+
+ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ rabbit_amqqueue, update_ram_duration,
+ [self()]),
+ State #state { rate_timer_ref = TRef };
+ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
ensure_rate_timer(State) ->
- rabbit_amqqueue_process_utils:ensure_rate_timer(
- fun rate_timer_getter/1, fun rate_timer_setter/2, State).
-
-stop_rate_timer(State) ->
- rabbit_amqqueue_process_utils:stop_rate_timer(
- fun rate_timer_getter/1, fun rate_timer_setter/2, State).
+ State.
-rate_timer_getter(State) -> State#state.rate_timer_ref.
-rate_timer_setter(Timer, State) -> State#state{rate_timer_ref = Timer}.
+stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ State;
+stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { rate_timer_ref = undefined }.
maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },