summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-28 17:11:58 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-28 17:11:58 +0000
commitfd8bffd2df83e0d58ba36a323466a417d38f7c4d (patch)
treec2684f87506ac6281e63d6449d6a444f2d44e334 /src
parent9566c5bfbb46338ff414f9bd8c56925ff8711b87 (diff)
downloadrabbitmq-server-git-fd8bffd2df83e0d58ba36a323466a417d38f7c4d.tar.gz
Large amounts of debitrotting due to changes to confirms api and such like. Sadly mirrored confirms aren't working again yet... not really sure why
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl115
2 files changed, 92 insertions, 27 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 11831a2998..e2f9b0208d 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -232,8 +232,8 @@ ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS}) ->
needs_idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
BQ:needs_idle_timeout(BQS).
-idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
- BQ:idle_timeout(BQS).
+idle_timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ State #state { backing_queue_state = BQ:idle_timeout(BQS) }.
handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS}) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4f9d2066be..396e3c3592 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -67,7 +67,8 @@
-export([start_link/1, set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, handle_pre_hibernate/1]).
+ code_change/3, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -82,6 +83,7 @@
master_node,
backing_queue,
backing_queue_state,
+ sync_timer_ref,
rate_timer_ref,
sender_queues, %% :: Pid -> MsgQ
@@ -91,6 +93,7 @@
guid_to_channel %% for confirms
}).
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
start_link(Q) ->
@@ -137,6 +140,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
backing_queue = BQ,
backing_queue_state = BQS,
rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
sender_queues = dict:new(),
guid_ack = dict:new(),
@@ -212,7 +216,14 @@ handle_cast(update_ram_duration,
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
noreply(State #state { rate_timer_ref = just_measured,
- backing_queue_state = BQS2 }).
+ backing_queue_state = BQS2 });
+
+handle_cast(sync_timeout, State) ->
+ noreply(backing_queue_idle_timeout(
+ State #state { sync_timer_ref = undefined })).
+
+handle_info(timeout, State) ->
+ noreply(backing_queue_idle_timeout(State));
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -245,12 +256,30 @@ code_change(_OldVsn, State, _Extra) ->
handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
%% mainly copied from amqqueue_process
- BQS1 = BQ:handle_pre_hibernate(BQS),
- %% no activity for a while == 0 egress and ingress rates
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS2 })}.
+ BQS3 = BQ:handle_pre_hibernate(BQS2),
+ {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
+
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ {gm_deaths, _Deaths} -> 5;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ {gm, _Msg} -> 5;
+ _ -> 0
+ end.
%% ---------------------------------------------------------------------------
%% GM
@@ -285,12 +314,9 @@ handle_msg([SPid], _From, Msg) ->
maybe_run_queue_via_backing_queue(
Mod, Fun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- guid_to_channel = GTC }) ->
+ backing_queue_state = BQS }) ->
{Guids, BQS1} = BQ:invoke(Mod, Fun, BQS),
- GTC1 = lists:foldl(fun maybe_confirm_message/2, GTC, Guids),
- State #state { backing_queue_state = BQS1,
- guid_to_channel = GTC1 }.
+ confirm_messages(Guids, State #state { backing_queue_state = BQS1 }).
record_confirm_or_confirm(#delivery { msg_seq_no = undefined }, _Q, GTC) ->
GTC;
@@ -305,13 +331,27 @@ record_confirm_or_confirm(#delivery { sender = ChPid, msg_seq_no = MsgSeqNo },
ok = rabbit_channel:confirm(ChPid, MsgSeqNo),
GTC.
-maybe_confirm_message(Guid, GTC) ->
- case dict:find(Guid, GTC) of
- {ok, {ChPid, MsgSeqNo}} when MsgSeqNo =/= undefined ->
- ok = rabbit_channel:confirm(ChPid, MsgSeqNo),
- dict:erase(Guid, GTC);
- error ->
- GTC
+confirm_messages(Guids, State = #state { guid_to_channel = GTC }) ->
+ {CMs, GTC1} =
+ lists:foldl(
+ fun(Guid, {CMs, GTC0}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(Guid, GTC0)};
+ _ ->
+ {CMs, GTC0}
+ end
+ end, {gb_trees:empty(), GTC}, Guids),
+ gb_trees:map(fun(ChPid, MsgSeqNos) ->
+ rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, CMs),
+ State #state { guid_to_channel = GTC1 }.
+
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
end.
handle_process_result({ok, State}) -> noreply(State);
@@ -348,15 +388,39 @@ promote_me(From, #state { q = Q,
{become, rabbit_amqqueue_process, QueueState, hibernate}.
noreply(State) ->
- {noreply, next_state(State), hibernate}.
+ {NewState, Timeout} = next_state(State),
+ {noreply, NewState, Timeout}.
reply(Reply, State) ->
- {reply, Reply, next_state(State), hibernate}.
+ {NewState, Timeout} = next_state(State),
+ {reply, Reply, NewState, Timeout}.
next_state(State) ->
- ensure_rate_timer(State).
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ ensure_rate_timer(State),
+ case BQ:needs_idle_timeout(BQS) of
+ true -> {ensure_sync_timer(State1), 0};
+ false -> {stop_sync_timer(State1), hibernate}
+ end.
%% copied+pasted from amqqueue_process
+backing_queue_idle_timeout(State = #state { backing_queue = BQ }) ->
+ maybe_run_queue_via_backing_queue(
+ BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+
+ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ State #state { sync_timer_ref = TRef };
+ensure_sync_timer(State) ->
+ State.
+
+stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ State;
+stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { sync_timer_ref = undefined }.
+
ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
{ok, TRef} = timer:apply_after(
?RAM_DURATION_UPDATE_INTERVAL,
@@ -438,10 +502,11 @@ process_instruction(
{true, AckRequired} ->
{AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
ChPid, BQS),
- {GA1, GTC3} = case AckRequired of
- true -> {dict:store(Guid, AckTag, GA), GTC1};
- false -> {GA, maybe_confirm_message(Guid, GTC1)}
- end,
+ {GA1, GTC3} =
+ case AckRequired of
+ true -> {dict:store(Guid, AckTag, GA), GTC1};
+ false -> {GA, confirm_messages([Guid], GTC1)}
+ end,
State1 #state { backing_queue_state = BQS1,
guid_ack = GA1,
guid_to_channel = GTC3 }