summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-05-24 11:37:15 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-05-24 11:37:15 +0100
commit3a6b32ec637d3bd5cb0d4a22dd43c69103b16750 (patch)
treea405512c6ca16f81c4aa6a2a4ee857b247f04b5a /src
parent2c8f1e389a7f771d4826d4ec845160571e2177b0 (diff)
downloadrabbitmq-server-git-3a6b32ec637d3bd5cb0d4a22dd43c69103b16750.tar.gz
Make sure we update the federation state when a consumer becomes inactive or is cancelled.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl20
1 files changed, 15 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2dc85d6373..71dbb9debe 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -440,12 +440,14 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
+ notify_federation(State),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
+ notify_federation(State),
{false, State};
{continue, Limiter} ->
AC1 = queue:in(E, State#q.active_consumers),
@@ -523,15 +525,22 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State = #q{q = Q}) ->
- {IsEmpty1, State1} = deliver_msgs_to_consumers(
+run_message_queue(State) ->
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
is_empty(State), State),
- case IsEmpty1 andalso active_unfederated(State1#q.active_consumers) of
+ notify_federation(State1),
+ State1.
+
+notify_federation(#q{q = Q,
+ active_consumers = ActiveConsumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ IsEmpty = BQ:is_empty(BQS),
+ case IsEmpty andalso active_unfederated(ActiveConsumers) of
true -> rabbit_federation_queue:run(Q);
false -> rabbit_federation_queue:stop(Q)
- end,
- State1.
+ end.
active_unfederated(Cs) ->
case queue:out(Cs) of
@@ -1194,6 +1203,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.active_consumers)},
+ notify_federation(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)