summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-04-08 14:23:57 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-04-08 14:23:57 +0100
commitd363b6cf58759d8457799d1917e1bc97b85f5a0f (patch)
treeacbec88bd372563bacdacf45b92c4967b73da4e3
parent7da3859257cf6860fb742fc21e581bbd504ff6be (diff)
parent04bf045f42e118e3b19b448730be6b3124b3349a (diff)
downloadrabbitmq-server-git-d363b6cf58759d8457799d1917e1bc97b85f5a0f.tar.gz
Merging bug24038 into bug23554 and hook slave up to the new apis. Overall, it doesn't save that many lines, but it allows the slave to not duplicate -defines which were also in amqqueue_process, which is well worth the effort
-rw-r--r--src/rabbit_amqqueue_process.erl54
-rw-r--r--src/rabbit_amqqueue_process_utils.erl99
-rw-r--r--src/rabbit_mirror_queue_slave.erl57
3 files changed, 137 insertions, 73 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a8b19b72de..3bcdf70694 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -21,8 +21,6 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(SYNC_INTERVAL, 25). %% milliseconds
--define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(BASE_MESSAGE_PROPERTIES,
#message_properties{expiry = undefined, needs_confirming = false}).
@@ -262,37 +260,27 @@ backing_queue_module(#amqqueue{arguments = Args}) ->
_Nodes -> rabbit_mirror_queue_master
end.
-ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
- State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
- State.
+ rabbit_amqqueue_process_utils:ensure_sync_timer(
+ fun sync_timer_getter/1, fun sync_timer_setter/2, State).
+
+stop_sync_timer(State) ->
+ rabbit_amqqueue_process_utils:stop_sync_timer(
+ fun sync_timer_getter/1, fun sync_timer_setter/2, State).
+
+sync_timer_getter(State) -> State#q.sync_timer_ref.
+sync_timer_setter(Timer, State) -> State#q{sync_timer_ref = Timer}.
-stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- State;
-stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#q{sync_timer_ref = undefined}.
-
-ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL,
- rabbit_amqqueue, update_ram_duration,
- [self()]),
- State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
ensure_rate_timer(State) ->
- State.
+ rabbit_amqqueue_process_utils:ensure_rate_timer(
+ fun rate_timer_getter/1, fun rate_timer_setter/2, State).
-stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- State;
-stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
-stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#q{rate_timer_ref = undefined}.
+stop_rate_timer(State) ->
+ rabbit_amqqueue_process_utils:stop_rate_timer(
+ fun rate_timer_getter/1, fun rate_timer_setter/2, State).
+
+rate_timer_getter(State) -> State#q.rate_timer_ref.
+rate_timer_setter(Timer, State) -> State#q{rate_timer_ref = Timer}.
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
@@ -1234,15 +1222,11 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
stats_timer = StatsTimer}) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- BQS3 = BQ:handle_pre_hibernate(BQS2),
+ BQS1 = rabbit_amqqueue_process_utils:backing_queue_pre_hibernate(BQ, BQS),
rabbit_event:if_enabled(StatsTimer,
fun () ->
emit_stats(State, [{idle_since, now()}])
end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS3},
+ backing_queue_state = BQS1},
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_amqqueue_process_utils.erl b/src/rabbit_amqqueue_process_utils.erl
new file mode 100644
index 0000000000..feb2a79ca2
--- /dev/null
+++ b/src/rabbit_amqqueue_process_utils.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 201-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_amqqueue_process_utils).
+
+-define(SYNC_INTERVAL, 25). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+
+-export([backing_queue_pre_hibernate/2,
+ ensure_sync_timer/3, stop_sync_timer/3,
+ ensure_rate_timer/3, stop_rate_timer/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(bq_mod() :: atom()).
+-type(bq_state() :: any()). %% A good example of dialyzer's shortcomings
+
+-type(queue_state() :: any()). %% Another such example.
+-type(getter(A) :: fun ((queue_state()) -> A)).
+-type(setter(A) :: fun ((A, queue_state()) -> queue_state())).
+
+-type(tref() :: term()). %% Sigh. According to timer docs.
+
+-spec(backing_queue_pre_hibernate/2 :: (bq_mod(), bq_state()) -> bq_state()).
+
+-spec(ensure_sync_timer/3 :: (getter('undefined'|tref()),
+ setter('undefined'|tref()),
+ queue_state()) -> queue_state()).
+-spec(stop_sync_timer/3 :: (getter('undefined'|tref()),
+ setter('undefined'|tref()),
+ queue_state()) -> queue_state()).
+
+-spec(ensure_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()),
+ setter('undefined'|'just_measured'|tref()),
+ queue_state()) -> queue_state()).
+-spec(stop_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()),
+ setter('undefined'|'just_measured'|tref()),
+ queue_state()) -> queue_state()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+backing_queue_pre_hibernate(BQ, BQS) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ BQ:handle_pre_hibernate(BQS2).
+
+ensure_sync_timer(Getter, Setter, State) ->
+ case Getter(State) of
+ undefined -> {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue,
+ sync_timeout, [self()]),
+ Setter(TRef, State);
+ _TRef -> State
+ end.
+
+stop_sync_timer(Getter, Setter, State) ->
+ case Getter(State) of
+ undefined -> State;
+ TRef -> {ok, cancel} = timer:cancel(TRef),
+ Setter(undefined, State)
+ end.
+
+ensure_rate_timer(Getter, Setter, State) ->
+ case Getter(State) of
+ undefined -> {ok, TRef} =
+ timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue,
+ update_ram_duration, [self()]),
+ Setter(TRef, State);
+ just_measured -> Setter(undefined, State);
+ _TRef -> State
+ end.
+
+stop_rate_timer(Getter, Setter, State) ->
+ case Getter(State) of
+ undefined -> State;
+ just_measured -> Setter(undefined, State);
+ TRef -> {ok, cancel} = timer:cancel(TRef),
+ Setter(undefined, State)
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 8ca82fa1fe..70b5c43da6 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -58,9 +58,6 @@
msg_id_status
}).
--define(SYNC_INTERVAL, 25). %% milliseconds
--define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-
start_link(Q) ->
gen_server2:start_link(?MODULE, [Q], []).
@@ -232,13 +229,8 @@ code_change(_OldVsn, State, _Extra) ->
handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- %% mainly copied from amqqueue_process
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- BQS3 = BQ:handle_pre_hibernate(BQS2),
- {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
+ BQS1 = rabbit_amqqueue_process_utils:backing_queue_pre_hibernate(BQ, BQS),
+ {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS1 })}.
prioritise_call(Msg, _From, _State) ->
case Msg of
@@ -480,41 +472,30 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
false -> {stop_sync_timer(State1), hibernate}
end.
-%% copied+pasted from amqqueue_process
backing_queue_idle_timeout(State = #state { backing_queue = BQ }) ->
run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State).
-ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
- State #state { sync_timer_ref = TRef };
ensure_sync_timer(State) ->
- State.
+ rabbit_amqqueue_process_utils:ensure_sync_timer(
+ fun sync_timer_getter/1, fun sync_timer_setter/2, State).
+
+stop_sync_timer(State) ->
+ rabbit_amqqueue_process_utils:stop_sync_timer(
+ fun sync_timer_getter/1, fun sync_timer_setter/2, State).
+
+sync_timer_getter(State) -> State#state.sync_timer_ref.
+sync_timer_setter(Timer, State) -> State#state{sync_timer_ref = Timer}.
-stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- State;
-stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
- State #state { sync_timer_ref = undefined }.
-
-ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL,
- rabbit_amqqueue, update_ram_duration,
- [self()]),
- State #state { rate_timer_ref = TRef };
-ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
- State #state { rate_timer_ref = undefined };
ensure_rate_timer(State) ->
- State.
+ rabbit_amqqueue_process_utils:ensure_rate_timer(
+ fun rate_timer_getter/1, fun rate_timer_setter/2, State).
+
+stop_rate_timer(State) ->
+ rabbit_amqqueue_process_utils:stop_rate_timer(
+ fun rate_timer_getter/1, fun rate_timer_setter/2, State).
-stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- State;
-stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
- State #state { rate_timer_ref = undefined };
-stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
- State #state { rate_timer_ref = undefined }.
+rate_timer_getter(State) -> State#state.rate_timer_ref.
+rate_timer_setter(Timer, State) -> State#state{rate_timer_ref = Timer}.
maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },