diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 48 |
1 files changed, 22 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ba3ef78191..b5922bb6f3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -454,20 +454,19 @@ discard(#delivery{sender = SenderPid, run_message_queue(State) -> run_message_queue([], State). -run_message_queue(Blocked, State) -> +run_message_queue(Blocked, State = #q{consumers = Consumers}) -> case is_empty(State) of - true -> notify_decorators(lists:append(Blocked), State); + true -> blocked(lists:append(Blocked), Consumers, State); false -> case rabbit_queue_consumers:deliver( fun(AckRequired, State1) -> fetch(AckRequired, State1) - end, qname(State), State, State#q.consumers) of - {delivered, MoreBlocked, State2, Consumers} -> + end, qname(State), State, Consumers) of + {delivered, MoreBlocked, State2, Consumers1} -> run_message_queue([MoreBlocked | Blocked], - State2#q{consumers = Consumers}); - {undelivered, MoreBlocked, Consumers} -> - notify_decorators( - lists:append([MoreBlocked | Blocked]), - State#q{consumers = Consumers}) + State2#q{consumers = Consumers1}); + {undelivered, MoreBlocked, Consumers1} -> + blocked(lists:append([MoreBlocked | Blocked]), + Consumers1, State) end end. @@ -475,31 +474,28 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case rabbit_queue_consumers:deliver( - fun (true, BQS) -> - true = BQ:is_empty(BQS), - {AckTag, BQS1} = BQ:publish_delivered( - Message, Props, SenderPid, BQS), - {{Message, Delivered, AckTag}, {ack, BQS1}}; + fun (true, BQS1) -> + true = BQ:is_empty(BQS1), + {AckTag, BQS2} = BQ:publish_delivered( + Message, Props, SenderPid, BQS1), + {{Message, Delivered, AckTag}, {ack, BQS2}}; (false, _BQS) -> {{Message, Delivered, undefined}, noack} end, qname(State), BQS, State#q.consumers) of - {delivered, Blocked, {ack, BQS2}, Consumers} -> - {delivered, notify_decorators( - Blocked, State#q{backing_queue_state = BQS2, - consumers = Consumers})}; + {delivered, Blocked, {ack, BQS3}, Consumers} -> + {delivered, blocked(Blocked, Consumers, + State#q{backing_queue_state = BQS3})}; {delivered, Blocked, noack, Consumers} -> - {delivered, notify_decorators( - Blocked, discard(Delivery, - State#q{consumers = Consumers}))}; + {delivered, discard(Delivery, blocked(Blocked, Consumers, State))}; {undelivered, Blocked, Consumers} -> - {undelivered, notify_decorators( - Blocked, State#q{consumers = Consumers})} + {undelivered, blocked(Blocked, Consumers, State)} end. -notify_decorators(Blocked, State) -> - [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State) || +blocked(Blocked, Consumers, State) -> + State1 = State#q{consumers = Consumers}, + [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State1) || {_ChPid, CTag} <- Blocked], - State. + State1. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State = #q{backing_queue = BQ, |
