summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_sync.erl24
2 files changed, 19 insertions, 7 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a25b664bb0..a3050c570f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -165,6 +165,8 @@ sync_mirrors(HandleInfo, EmitStats,
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
+ {cancelled, BQS1} -> Log(" synchronisation cancelled ", []),
+ {ok, S(BQS1)};
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
{sync_died, R, BQS1} -> Log("~p", [R]),
{ok, S(BQS1)};
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index fd64fd61b3..c230353cdf 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -148,12 +148,22 @@ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.
-master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
+master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent} = I, BQS) ->
receive
- {next, Ref} -> stop_syncer(Syncer, {done, Ref}),
- {ok, BQS};
- {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
- {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
+ {'$gen_call', From,
+ cancel_sync_mirrors} ->
+ stop_syncer(Syncer, {cancel, Ref}),
+ gen_server2:reply(From, ok),
+ {cancelled, BQS};
+ {cancelled, Ref} ->
+ {cancelled, BQS};
+ {next, Ref} ->
+ stop_syncer(Syncer, {done, Ref}),
+ {ok, BQS};
+ {'EXIT', Parent, Reason} ->
+ {shutdown, Reason, BQS};
+ {'EXIT', Syncer, Reason} ->
+ {sync_died, Reason, BQS}
end.
stop_syncer(Syncer, Msg) ->
@@ -230,7 +240,7 @@ syncer_check_resources(Ref, MPid, SPids) ->
syncer_loop(Ref, MPid, SPids);
true ->
case wait_for_resources(Ref, SPids) of
- cancel -> ok;
+ cancel -> MPid ! {cancelled, Ref};
SPids1 -> MPid ! {next, Ref},
syncer_loop(Ref, MPid, SPids1)
end
@@ -240,7 +250,7 @@ syncer_loop(Ref, MPid, SPids) ->
receive
{conserve_resources, memory, true} ->
case wait_for_resources(Ref, SPids) of
- cancel -> ok;
+ cancel -> MPid ! {cancelled, Ref};
SPids1 -> syncer_loop(Ref, MPid, SPids1)
end;
{conserve_resources, _, _} ->