diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 15:49:37 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 15:49:37 +0000 |
| commit | 3d23167b9c793d76469af0eef52347f4262ebb39 (patch) | |
| tree | ea7060066bb81b0a7e24036b073b8f17cee1e87a /src | |
| parent | 31459cd1ea66eab4555fc469e72247e8538a4373 (diff) | |
| download | rabbitmq-server-git-3d23167b9c793d76469af0eef52347f4262ebb39.tar.gz | |
Have queues monitor publishing channels so we can remove them from the flow data.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_flow.erl | 10 |
2 files changed, 41 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b15334dfe3..10b27e05df 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -327,6 +327,19 @@ lookup_ch(ChPid) -> C -> C end. +lookup_ch_publisher(ChPid) -> + case get({ch_publisher, ChPid}) of + undefined -> not_found; + C -> C + end. + +ch_record_publisher(ChPid) -> + Key = {ch_publisher, ChPid}, + case get(Key) of + undefined -> put(Key, erlang:monitor(process, ChPid)); + _ -> ok + end. + ch_record(ChPid) -> Key = {ch, ChPid}, case get(Key) of @@ -365,6 +378,12 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. +erase_ch_record_publisher(ChPid) -> + MRef = get({ch_publisher, ChPid}), + erlang:demonitor(MRef), + erase({ch_publisher, ChPid}), + ok. + update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> ok = rabbit_limiter:register(Limiter, self()), update_ch_record(C#cr{consumer_count = 1}); @@ -598,6 +617,7 @@ should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> + handle_ch_publisher_down(DownPid), case lookup_ch(DownPid) of not_found -> {ok, State}; @@ -620,6 +640,15 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> end end. +handle_ch_publisher_down(DownPid) -> + case lookup_ch_publisher(DownPid) of + not_found -> + ok; + _ -> + erase_ch_record_publisher(DownPid), + rabbit_flow:sender_down(DownPid) + end. + check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -1021,9 +1050,10 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - rabbit_flow:ack(Delivery#delivery.sender), + ch_record_publisher(Sender), + rabbit_flow:ack(Sender), noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, AckTags, ChPid}, State) -> diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl index 3c2c0eded4..5e327ff807 100644 --- a/src/rabbit_flow.erl +++ b/src/rabbit_flow.erl @@ -28,7 +28,8 @@ -define(MAX_CREDIT, 200). -define(MORE_CREDIT_AT, 150). --export([ack/1, handle_bump_msg/1, blocked/0, send/1, receiver_down/1]). +-export([ack/1, handle_bump_msg/1, blocked/0, send/1]). +-export([sender_down/1, receiver_down/1]). %%---------------------------------------------------------------------------- @@ -40,6 +41,7 @@ -spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). -spec(blocked/0 :: () -> boolean()). -spec(send/1 :: (pid()) -> 'ok'). +-spec(sender_down/1 :: (pid()) -> 'ok'). -spec(receiver_down/1 :: (pid()) -> 'ok'). -endif. @@ -83,6 +85,12 @@ send(From) -> end, put({credit_from, From}, Credit). +sender_down(To) -> + %% In theory we could remove it from credit_deferred here, but it + %% doesn't really matter; at some point later we will drain + %% credit_deferred and thus send messages into the void... + erase({credit_to, To}). + receiver_down(From) -> unblock(From), erase({credit_from, From}). |
