summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-12 15:49:37 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-12 15:49:37 +0000
commit3d23167b9c793d76469af0eef52347f4262ebb39 (patch)
treeea7060066bb81b0a7e24036b073b8f17cee1e87a /src
parent31459cd1ea66eab4555fc469e72247e8538a4373 (diff)
downloadrabbitmq-server-git-3d23167b9c793d76469af0eef52347f4262ebb39.tar.gz
Have queues monitor publishing channels so we can remove them from the flow data.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl34
-rw-r--r--src/rabbit_flow.erl10
2 files changed, 41 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b15334dfe3..10b27e05df 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -327,6 +327,19 @@ lookup_ch(ChPid) ->
C -> C
end.
+lookup_ch_publisher(ChPid) ->
+ case get({ch_publisher, ChPid}) of
+ undefined -> not_found;
+ C -> C
+ end.
+
+ch_record_publisher(ChPid) ->
+ Key = {ch_publisher, ChPid},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process, ChPid));
+ _ -> ok
+ end.
+
ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
@@ -365,6 +378,12 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
+erase_ch_record_publisher(ChPid) ->
+ MRef = get({ch_publisher, ChPid}),
+ erlang:demonitor(MRef),
+ erase({ch_publisher, ChPid}),
+ ok.
+
update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
ok = rabbit_limiter:register(Limiter, self()),
update_ch_record(C#cr{consumer_count = 1});
@@ -598,6 +617,7 @@ should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
+ handle_ch_publisher_down(DownPid),
case lookup_ch(DownPid) of
not_found ->
{ok, State};
@@ -620,6 +640,15 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
end
end.
+handle_ch_publisher_down(DownPid) ->
+ case lookup_ch_publisher(DownPid) of
+ not_found ->
+ ok;
+ _ ->
+ erase_ch_record_publisher(DownPid),
+ rabbit_flow:sender_down(DownPid)
+ end.
+
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -1021,9 +1050,10 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- rabbit_flow:ack(Delivery#delivery.sender),
+ ch_record_publisher(Sender),
+ rabbit_flow:ack(Sender),
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl
index 3c2c0eded4..5e327ff807 100644
--- a/src/rabbit_flow.erl
+++ b/src/rabbit_flow.erl
@@ -28,7 +28,8 @@
-define(MAX_CREDIT, 200).
-define(MORE_CREDIT_AT, 150).
--export([ack/1, handle_bump_msg/1, blocked/0, send/1, receiver_down/1]).
+-export([ack/1, handle_bump_msg/1, blocked/0, send/1]).
+-export([sender_down/1, receiver_down/1]).
%%----------------------------------------------------------------------------
@@ -40,6 +41,7 @@
-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
-spec(blocked/0 :: () -> boolean()).
-spec(send/1 :: (pid()) -> 'ok').
+-spec(sender_down/1 :: (pid()) -> 'ok').
-spec(receiver_down/1 :: (pid()) -> 'ok').
-endif.
@@ -83,6 +85,12 @@ send(From) ->
end,
put({credit_from, From}, Credit).
+sender_down(To) ->
+ %% In theory we could remove it from credit_deferred here, but it
+ %% doesn't really matter; at some point later we will drain
+ %% credit_deferred and thus send messages into the void...
+ erase({credit_to, To}).
+
receiver_down(From) ->
unblock(From),
erase({credit_from, From}).