summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl24
-rw-r--r--src/rabbit_reader.erl8
2 files changed, 15 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index abda1c1f46..28f3673d07 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -299,7 +299,7 @@ handle_info({'DOWN', MRef, process, QPid, Reason},
noreply(
case dict:find(MRef, ConsumerMonitors) of
error ->
- handle_non_consumer_down(QPid, Reason, State);
+ handle_queue_down(QPid, Reason, State);
{ok, ConsumerTag} ->
handle_consumer_down(MRef, ConsumerTag, State)
end).
@@ -717,7 +717,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
{Q, undefined},
ConsumerMapping)},
{noreply,
- maybe_monitor_consumer(NoWait, ActualConsumerTag, State1)};
+ case NoWait of
+ true -> monitor_consumer(ActualConsumerTag, State1);
+ false -> State1
+ end};
{{error, exclusive_consume_unavailable}, _Q} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -1085,29 +1088,24 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
-maybe_monitor_consumer(true, ConsumerTag, State) ->
- monitor_consumer(ConsumerTag, State);
-maybe_monitor_consumer(false, _ConsumerTag, State) ->
- State.
-
monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
consumer_monitors = ConsumerMonitors,
capabilities = Capabilities}) ->
- case {dict:find(ConsumerTag, ConsumerMapping),
- rabbit_misc:table_lookup(
- Capabilities, <<"consumer_death_notification">>)} of
- {{ok, {#amqqueue{pid = QPid} = Q, undefined}}, {bool, true}} ->
+ {#amqqueue{pid = QPid} = Q, undefined} = dict:fetch(ConsumerTag,
+ ConsumerMapping),
+ case rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_cancel_notify">>) of
+ {bool, true} ->
MRef = erlang:monitor(process, QPid),
State#ch{consumer_mapping =
dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
consumer_monitors =
dict:store(MRef, ConsumerTag, ConsumerMonitors)};
_ ->
- %% either already received the cancel or incapable client
State
end.
-handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
+handle_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
none -> []
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c5d6ecc4af..aa7d277536 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -163,10 +163,10 @@ server_properties(Protocol) ->
NormalizedConfigServerProps).
server_capabilities(rabbit_framing_amqp_0_9_1) ->
- [{<<"publisher_confirms">>, bool, true},
- {<<"exchange_exchange_bindings">>, bool, true},
- {<<"basic.nack">>, bool, true},
- {<<"consumer_death_notification">>, bool, true}];
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true}];
server_capabilities(_) ->
[].