diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 8 |
2 files changed, 15 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index abda1c1f46..28f3673d07 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -299,7 +299,7 @@ handle_info({'DOWN', MRef, process, QPid, Reason}, noreply( case dict:find(MRef, ConsumerMonitors) of error -> - handle_non_consumer_down(QPid, Reason, State); + handle_queue_down(QPid, Reason, State); {ok, ConsumerTag} -> handle_consumer_down(MRef, ConsumerTag, State) end). @@ -717,7 +717,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, {Q, undefined}, ConsumerMapping)}, {noreply, - maybe_monitor_consumer(NoWait, ActualConsumerTag, State1)}; + case NoWait of + true -> monitor_consumer(ActualConsumerTag, State1); + false -> State1 + end}; {{error, exclusive_consume_unavailable}, _Q} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -1085,29 +1088,24 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -maybe_monitor_consumer(true, ConsumerTag, State) -> - monitor_consumer(ConsumerTag, State); -maybe_monitor_consumer(false, _ConsumerTag, State) -> - State. - monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, consumer_monitors = ConsumerMonitors, capabilities = Capabilities}) -> - case {dict:find(ConsumerTag, ConsumerMapping), - rabbit_misc:table_lookup( - Capabilities, <<"consumer_death_notification">>)} of - {{ok, {#amqqueue{pid = QPid} = Q, undefined}}, {bool, true}} -> + {#amqqueue{pid = QPid} = Q, undefined} = dict:fetch(ConsumerTag, + ConsumerMapping), + case rabbit_misc:table_lookup( + Capabilities, <<"consumer_cancel_notify">>) of + {bool, true} -> MRef = erlang:monitor(process, QPid), State#ch{consumer_mapping = dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping), consumer_monitors = dict:store(MRef, ConsumerTag, ConsumerMonitors)}; _ -> - %% either already received the cancel or incapable client State end. -handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> +handle_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); none -> [] diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c5d6ecc4af..aa7d277536 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -163,10 +163,10 @@ server_properties(Protocol) -> NormalizedConfigServerProps). server_capabilities(rabbit_framing_amqp_0_9_1) -> - [{<<"publisher_confirms">>, bool, true}, - {<<"exchange_exchange_bindings">>, bool, true}, - {<<"basic.nack">>, bool, true}, - {<<"consumer_death_notification">>, bool, true}]; + [{<<"publisher_confirms">>, bool, true}, + {<<"exchange_exchange_bindings">>, bool, true}, + {<<"basic.nack">>, bool, true}, + {<<"consumer_cancel_notify">>, bool, true}]; server_capabilities(_) -> []. |
