summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-26 12:02:53 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-26 12:02:53 +0000
commit49a8392111d5ce1e1b87e4d831a4f61f114ed9a5 (patch)
tree51c1da64107ea9102cb145ec0770fc12e384c390 /src
parent29cdb59f55490ee190a393b9fc03dd29fe08f142 (diff)
downloadrabbitmq-server-git-49a8392111d5ce1e1b87e4d831a4f61f114ed9a5.tar.gz
Add spec, fix API, refuse to run if there are pending acks, return status, don't throw away state.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl32
-rw-r--r--src/rabbit_mirror_queue_master.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl1
4 files changed, 28 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4d95778947..ad81ba032c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -173,6 +173,8 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
+-spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) ->
+ 'ok' | error('queue_has_pending_acks') | error('queue_not_mirrored')).
-endif.
@@ -591,11 +593,7 @@ set_maximum_since_use(QPid, Age) ->
start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring).
stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
-sync_mirrors(Name) ->
- case lookup(Name) of
- {ok, #amqqueue{pid = QPid}} -> delegate_cast(QPid, sync_mirrors);
- _ -> ok
- end.
+sync_mirrors(#amqqueue{pid = QPid}) -> delegate_call(QPid, sync_mirrors).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 07f4c3b190..19872d8d0c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1171,6 +1171,24 @@ handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
reply(ok, State#q{backing_queue = BQ1,
backing_queue_state = BQS1});
+handle_call(sync_mirrors, From,
+ State = #q{q = #amqqueue{name = Name},
+ backing_queue = rabbit_mirror_queue_master = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:depth(BQS) - BQ:len(BQS) of
+ 0 ->
+ {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(Name),
+ gen_server2:reply(From, ok),
+ noreply(rabbit_mirror_queue_master:sync_mirrors(
+ SPids -- SSPids, Name, BQS));
+ _ ->
+ reply({error, queue_has_pending_acks}, State)
+ end;
+
+handle_call(sync_mirrors, _From, State) ->
+ reply({error, queue_not_mirrored}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -1300,20 +1318,6 @@ handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
end;
-handle_cast(sync_mirrors,
- State = #q{q = #amqqueue{name = Name},
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case BQ of
- rabbit_mirror_queue_master ->
- {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} =
- rabbit_amqqueue:lookup(Name),
- rabbit_mirror_queue_master:sync_mirrors(SPids -- SSPids, Name, BQS);
- _ ->
- ok
- end,
- noreply(State);
-
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 1664260423..ea9e04fedf 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,12 +127,12 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
-sync_mirrors([], Name, _State) ->
+sync_mirrors([], Name, State) ->
rabbit_log:info("Synchronising ~s: nothing to do~n",
[rabbit_misc:rs(Name)]),
- ok;
-sync_mirrors(SPids, Name, #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ State;
+sync_mirrors(SPids, Name, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
rabbit_log:info("Synchronising ~s with slaves ~p~n",
[rabbit_misc:rs(Name), SPids]),
Ref = make_ref(),
@@ -153,7 +153,7 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ,
end],
SPid1 =/= dead],
[erlang:demonitor(MRef) || {_, MRef} <- SPidsMRefs],
- {Total, _BQS} =
+ {Total, BQS1} =
BQ:fold(fun ({Msg, MsgProps}, I) ->
wait_for_credit(),
[begin
@@ -171,7 +171,7 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ,
[SPid ! {sync_complete, Ref} || SPid <- SPids1],
rabbit_log:info("Synchronising ~s: ~p messages; complete~n",
[rabbit_misc:rs(Name), Total]),
- ok.
+ State#state{backing_queue_state = BQS1}.
wait_for_credit() ->
case credit_flow:blocked() of
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d408c56eda..f4130e0250 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -857,6 +857,7 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
sync_loop(Ref, MRef, MPid, State);
{sync_complete, Ref} ->
erlang:demonitor(MRef),
+ %% We can only sync when there are no pending acks
set_delta(0, State);
{sync_message, Ref, Msg, Props0} ->
credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),