summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-30 17:31:42 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-30 17:31:42 +0100
commit8d60005b21990e744fab93c15c7b2d0baed4592f (patch)
tree3b93be0e728c2bf4d24a9a6166b3faf5605e048d
parentb4b40403d6d3a8431ec174e15bea84a1b0f615dc (diff)
downloadrabbitmq-server-git-8d60005b21990e744fab93c15c7b2d0baed4592f.tar.gz
hook queue processes up to file handle cache
This doesn't do anything here, since in their current form the queue processes do not use the fhc, but that will change in the future. This change was cherry-picked from the bug21673 branch.
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl6
2 files changed, 12 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5f045b2764..b86cdd0428 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,7 +32,8 @@
-module(rabbit_amqqueue).
-export([start/0, declare/4, delete/3, purge/1]).
--export([internal_declare/2, internal_delete/1]).
+-export([internal_declare/2, internal_delete/1,
+ set_maximum_since_use/2]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
@@ -108,6 +109,7 @@
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
+-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -350,6 +352,9 @@ internal_delete(QueueName) ->
ok
end.
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
+
on_node_down(Node) ->
[Hook() ||
Hook <- rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 82e3e05ef3..d745a69c85 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -739,6 +739,8 @@ handle_call({claim_queue, ReaderPid}, _From,
end.
handle_cast({init, Recover}, State = #q{message_buffer = undefined}) ->
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
Messages = case Recover of
true -> rabbit_persister:queue_content(qname(State));
false -> []
@@ -815,6 +817,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
+ noreply(State);
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},