summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/gm.erl19
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
3 files changed, 27 insertions, 7 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 74e19ee6fd..41aa01f04d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -1175,11 +1175,20 @@ record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
try
Group = #gm_group { members = Members, version = Ver } =
check_membership(Left, read_group(GroupName)),
- {Prefix, [Left | Suffix]} =
- lists:splitwith(fun (M) -> M =/= Left end, Members),
- write_group(Group #gm_group {
- members = Prefix ++ [Left, NewMember | Suffix],
- version = Ver + 1 })
+ case lists:member(NewMember, Members) of
+ true ->
+ %% This avois duplicates during partial partitions,
+ %% as inconsistent views might happen during them
+ rabbit_log:warning("(~p) GM avoiding duplicate of ~p",
+ [self(), NewMember]),
+ Group;
+ false ->
+ {Prefix, [Left | Suffix]} =
+ lists:splitwith(fun (M) -> M =/= Left end, Members),
+ write_group(Group #gm_group {
+ members = Prefix ++ [Left, NewMember | Suffix],
+ version = Ver + 1 })
+ end
catch
lost_membership ->
%% The transaction must not be abruptly crashed, but
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3111afca2e..bfa868c651 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1300,7 +1300,13 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
%% This also has the side effect of waking us up so we emit a
%% stats event - so event consumers see the changed policy.
{ok, Q} = rabbit_amqqueue:lookup(Name),
- noreply(process_args_policy(State#q{q = Q})).
+ noreply(process_args_policy(State#q{q = Q}));
+
+handle_cast({sync_start, _, _}, State = #q{q = #amqqueue{name = Name}}) ->
+ %% Only a slave should receive this, it means we are a duplicated master
+ rabbit_mirror_queue_misc:log_warning(
+ Name, "Stopping after receiving sync_start from another master", []),
+ stop(State).
handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) ->
case is_unused(State) of
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4770018f9e..6017e5a028 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -314,7 +314,12 @@ handle_cast({set_ram_duration_target, Duration},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- noreply(State #state { backing_queue_state = BQS1 }).
+ noreply(State #state { backing_queue_state = BQS1 });
+
+handle_cast(policy_changed, State) ->
+ %% During partial partitions, we might end up receiving messages expected by a master
+ %% Ignore them
+ noreply(State).
handle_info(update_ram_duration, State = #state{backing_queue = BQ,
backing_queue_state = BQS}) ->