summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl11
1 files changed, 9 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1244d640e4..eeebae3fdf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -695,7 +695,11 @@ unblock(State, C = #cr{limiter = Limiter}) ->
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
- run_message_queue(State#q{active_consumers = AC1})
+ State1 = State#q{active_consumers = AC1},
+ [notify_decorators(
+ consumer_unblocked, [{consumer_tag, CTag}], State1) ||
+ {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
+ run_message_queue(State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -1201,7 +1205,10 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
AC1 = priority_queue:in({ChPid, Consumer},
consumer_priority({ChPid, Consumer}),
State1#q.active_consumers),
- reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
+ State2 = State1#q{active_consumers = AC1},
+ notify_decorators(
+ basic_consume, [{consumer_tag, ConsumerTag}], State2),
+ reply(ok, run_message_queue(State2))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,