summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-10-26 18:07:16 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-10-26 18:07:16 +0300
commit7cb4835a7f3bee6c42d3469a14b7acd0bbe9ea56 (patch)
treee9d8e55ddabc9e18d108f250a469d7b8b1f17823 /src
parented1ef0aa2b545f64cbde5ef018c6386f90a9bc19 (diff)
parente0866b7c2ecb50c13bd137eb3d5600816f9ad96d (diff)
downloadrabbitmq-server-git-7cb4835a7f3bee6c42d3469a14b7acd0bbe9ea56.tar.gz
Merge branch 'stable' into rabbitmq-server-960
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl19
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
3 files changed, 27 insertions, 7 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 74e19ee6fd..41aa01f04d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -1175,11 +1175,20 @@ record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
try
Group = #gm_group { members = Members, version = Ver } =
check_membership(Left, read_group(GroupName)),
- {Prefix, [Left | Suffix]} =
- lists:splitwith(fun (M) -> M =/= Left end, Members),
- write_group(Group #gm_group {
- members = Prefix ++ [Left, NewMember | Suffix],
- version = Ver + 1 })
+ case lists:member(NewMember, Members) of
+ true ->
+ %% This avois duplicates during partial partitions,
+ %% as inconsistent views might happen during them
+ rabbit_log:warning("(~p) GM avoiding duplicate of ~p",
+ [self(), NewMember]),
+ Group;
+ false ->
+ {Prefix, [Left | Suffix]} =
+ lists:splitwith(fun (M) -> M =/= Left end, Members),
+ write_group(Group #gm_group {
+ members = Prefix ++ [Left, NewMember | Suffix],
+ version = Ver + 1 })
+ end
catch
lost_membership ->
%% The transaction must not be abruptly crashed, but
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3111afca2e..bfa868c651 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1300,7 +1300,13 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
%% This also has the side effect of waking us up so we emit a
%% stats event - so event consumers see the changed policy.
{ok, Q} = rabbit_amqqueue:lookup(Name),
- noreply(process_args_policy(State#q{q = Q})).
+ noreply(process_args_policy(State#q{q = Q}));
+
+handle_cast({sync_start, _, _}, State = #q{q = #amqqueue{name = Name}}) ->
+ %% Only a slave should receive this, it means we are a duplicated master
+ rabbit_mirror_queue_misc:log_warning(
+ Name, "Stopping after receiving sync_start from another master", []),
+ stop(State).
handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) ->
case is_unused(State) of
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4770018f9e..6017e5a028 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -314,7 +314,12 @@ handle_cast({set_ram_duration_target, Duration},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- noreply(State #state { backing_queue_state = BQS1 }).
+ noreply(State #state { backing_queue_state = BQS1 });
+
+handle_cast(policy_changed, State) ->
+ %% During partial partitions, we might end up receiving messages expected by a master
+ %% Ignore them
+ noreply(State).
handle_info(update_ram_duration, State = #state{backing_queue = BQ,
backing_queue_state = BQS}) ->