summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-09 16:41:20 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-09 16:41:20 +0000
commit6a1af9f1b5330b606f690a848fda821f26720f55 (patch)
treebd378ff472df324c0c6c67178209ee2478156922 /src
parentd1ed0eca8be6d5edb366ffd3bfe3c9507236e4fe (diff)
downloadrabbitmq-server-git-6a1af9f1b5330b606f690a848fda821f26720f55.tar.gz
Permit queue processes to be initialised with existing state. This raises the need for the ability to change the callbacks in rabbit_memory_monitor and fhc
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl24
1 files changed, 24 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cecc85d0a5..027a82e7db 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -48,6 +48,8 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
+-export([init_with_backing_queue_state/3]).
+
-import(queue).
-import(erlang).
-import(lists).
@@ -133,6 +135,28 @@ init(Q) ->
guid_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+init_with_backing_queue_state(Q, BQ, BQS) ->
+ ?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ process_flag(trap_exit, true),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ process_args(#q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ guid_to_channel = dict:new()}).
+
terminate(shutdown, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->