summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2016-06-28 02:20:56 +0300
committerGitHub <noreply@github.com>2016-06-28 02:20:56 +0300
commitdd25bf9445eccdf148dfbd9fdfe59180b1f6b1d4 (patch)
tree4aa40738d4d72ddf7b89ccf5e623ff0a74ab926e /src
parentebd33bab54474ea13653a2861314e93151aad4bc (diff)
parentb6aaf53513ae5b8d909a714f53389fc5bd3445c8 (diff)
downloadrabbitmq-server-git-dd25bf9445eccdf148dfbd9fdfe59180b1f6b1d4.tar.gz
Merge pull request #857 from rabbitmq/rabbitmq-server-802rabbitmq_v3_6_3_rc1
Infinity loop in synchronisation of priority queues
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_priority_queue.erl7
2 files changed, 6 insertions, 3 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9edb99c4d7..c04c82f45e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -120,7 +120,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
- ok = rabbit_queue_index:erase(QName), %% For crash recovery
+ _ = BQ:delete_crashed(Q), %% For crash recovery
BQS = bq_init(BQ, Q1, new),
State = #state { q = Q1,
gm = GM,
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index ae8a38daf0..b7a3afd129 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -43,7 +43,7 @@
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).
--record(state, {bq, bqss}).
+-record(state, {bq, bqss, max_priority}).
-record(passthrough, {bq, bqs}).
%% See 'note on suffixes' below
@@ -157,7 +157,8 @@ init(Q, Recover, AsyncCallback) ->
[{P, Init(P, Term)} || {P, Term} <- PsTerms]
end,
#state{bq = BQ,
- bqss = BQSs}
+ bqss = BQSs,
+ max_priority = hd(Ps)}
end.
%% [0] collapse_recovery has the effect of making a list of recovery
%% terms in priority order, even for non priority queues. It's easier
@@ -419,6 +420,8 @@ info(Item, #passthrough{bq = BQ, bqs = BQS}) ->
invoke(Mod, {P, Fun}, State = #state{bq = BQ}) ->
pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State);
+invoke(Mod, Fun, State = #state{bq = BQ, max_priority = P}) ->
+ pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State);
invoke(Mod, Fun, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(invoke(Mod, Fun, BQS)).