summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl6
2 files changed, 12 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5f045b2764..b86cdd0428 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,7 +32,8 @@
-module(rabbit_amqqueue).
-export([start/0, declare/4, delete/3, purge/1]).
--export([internal_declare/2, internal_delete/1]).
+-export([internal_declare/2, internal_delete/1,
+ set_maximum_since_use/2]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
@@ -108,6 +109,7 @@
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
+-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -350,6 +352,9 @@ internal_delete(QueueName) ->
ok
end.
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
+
on_node_down(Node) ->
[Hook() ||
Hook <- rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 82e3e05ef3..d745a69c85 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -739,6 +739,8 @@ handle_call({claim_queue, ReaderPid}, _From,
end.
handle_cast({init, Recover}, State = #q{message_buffer = undefined}) ->
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
Messages = case Recover of
true -> rabbit_persister:queue_content(qname(State));
false -> []
@@ -815,6 +817,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
+ noreply(State);
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},