summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-27 05:40:03 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-27 05:40:03 +0100
commit9adca0465222f0b90b129daf9a505a24b30c394b (patch)
tree883c349552dd8db918ab5284011f9350fbb42d07 /src
parent40a5ea96100bc33ca209471e4ee331869fc36feb (diff)
downloadrabbitmq-server-git-9adca0465222f0b90b129daf9a505a24b30c394b.tar.gz
more selective backing queue initialisation
Tell BQ:init whether queue contents should be recovered. In the case of invariable_queue this allows us to suppress any interaction with the persister on queue declaration, which is beneficial since the persister can be a bottleneck. There may also be scope to utilise this knowledge in variable_queue at some point.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_invariable_queue.erl6
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl4
6 files changed, 15 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 668f4ae256..ee769d5508 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -146,7 +146,7 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
%% Issue inits to *all* the queues so that they all init at the same time
- [ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue) || Q <- Qs],
+ [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs],
[ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
rabbit_misc:execute_mnesia_transaction(
fun () -> [ok = store_queue(Q) || Q <- Qs] end),
@@ -158,7 +158,7 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
- ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue),
+ ok = gen_server2:cast(Q#amqqueue.pid, {init, false}),
ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
internal_declare(Q, true).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8557cb947f..3b5bd82371 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -722,14 +722,14 @@ handle_call({claim_queue, ReaderPid}, _From,
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-handle_cast(init_backing_queue,
+handle_cast({init, Recover},
State = #q{q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined}) ->
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable)});
+ noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)});
-handle_cast(init_backing_queue, State) ->
+handle_cast({init, _Recover}, State) ->
noreply(State);
handle_cast({deliver, Txn, Message, ChPid}, State) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 7090d9cc59..f21c290f05 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -42,9 +42,10 @@ behaviour_info(callbacks) ->
%% shared resources.
{start, 1},
- %% Called with queue name and a boolean to indicate whether or
- %% not the queue is durable.
- {init, 2},
+ %% Called with queue name, a boolean to indicate whether or
+ %% not the queue is durable, and a boolean to indicate whether
+ %% the queue contents should be attempted to be recovered.
+ {init, 3},
%% Called on queue shutdown when queue isn't being deleted
{terminate, 1},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 7765069fb9..e5811c343f 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_invariable_queue).
--export([init/2, terminate/1, delete_and_terminate/1, purge/1, publish/2,
+-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, sync_callback/1,
@@ -61,8 +61,8 @@
start(DurableQueues) ->
ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]).
-init(QName, IsDurable) ->
- Q = queue:from_list(case IsDurable of
+init(QName, IsDurable, Recover) ->
+ Q = queue:from_list(case IsDurable andalso Recover of
true -> rabbit_persister:queue_content(QName);
false -> []
end),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1ab4f22455..b99634002b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1384,7 +1384,7 @@ assert_prop(List, Prop, Value) ->
fresh_variable_queue() ->
stop_msg_store(),
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false),
S0 = rabbit_variable_queue:status(VQ),
assert_prop(S0, len, 0),
assert_prop(S0, q1, 0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b33df24fc7..35d2b19145 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/2, terminate/1, publish/2, publish_delivered/3,
+-export([init/3, terminate/1, publish/2, publish_delivered/3,
set_ram_duration_target/2, ram_duration/1, fetch/2, ack/2, len/1,
is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/3,
tx_ack/3, tx_rollback/2, tx_commit/3, sync_callback/1,
@@ -266,7 +266,7 @@ start(DurableQueues) ->
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
PersistRefs, PersistStartFunState]).
-init(QueueName, IsDurable) ->
+init(QueueName, IsDurable, _Recover) ->
PersistentStore = case IsDurable of
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE