summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-17 17:20:01 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-17 17:20:01 +0000
commit47791dccd45c0182cdd2e28e4339e5c4e896d2cf (patch)
tree8a752d7b41408d9714cfe13d10a31078cba2deeb /src
parente1abf94150145a0654275584e0e59cf3bb185654 (diff)
downloadrabbitmq-server-git-47791dccd45c0182cdd2e28e4339e5c4e896d2cf.tar.gz
Tidy up on DOWN and emit basic.cancel
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl46
1 files changed, 32 insertions, 14 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 346ec371f9..1da8c95945 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -281,20 +281,15 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{unconfirmed = UC}) ->
- %% TODO: this does a complete scan and partial rebuild of the
- %% tree, which is quite efficient. To do better we'd need to
- %% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MXs, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}, State),
- erase_queue_stats(QPid),
- State1 = case Reason of
- normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
- _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
- end,
- noreply(queue_blocked(QPid, State1)).
+handle_info({'DOWN', MRef, process, QPid, Reason},
+ State = #ch{consumer_monitors = ConsumerMonitors}) ->
+ noreply(
+ case dict:find(MRef, ConsumerMonitors) of
+ error ->
+ handle_non_consumer_down(QPid, Reason, State);
+ {ok, ConsumerTag} ->
+ handle_consumer_down(MRef, ConsumerTag, State)
+ end).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -1061,6 +1056,29 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
+ %% TODO: this does a complete scan and partial rebuild of the
+ %% tree, which is quite efficient. To do better we'd need to
+ %% maintain a secondary mapping, from QPids to MsgSeqNos.
+ {MXs, UC1} = remove_queue_unconfirmed(
+ gb_trees:next(gb_trees:iterator(UC)), QPid,
+ {[], UC}, State),
+ erase_queue_stats(QPid),
+ State1 = case Reason of
+ normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
+ _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
+ end,
+ queue_blocked(QPid, State1).
+
+handle_consumer_down(MRef, ConsumerTag,
+ State = #ch{consumer_monitors = ConsumerMonitors,
+ writer_pid = WriterPid}) ->
+ ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
+ Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
+ nowait = true},
+ ok = rabbit_writer:send_command(WriterPid, Cancel),
+ State#ch{consumer_monitors = ConsumerMonitors1}.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,