summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl2
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_invariable_queue.erl6
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl4
7 files changed, 16 insertions, 15 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 63f4493b72..7c83bb5213 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -34,7 +34,7 @@
('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})).
-spec(start/1 :: ([queue_name()]) -> 'ok').
--spec(init/2 :: (queue_name(), boolean()) -> state()).
+-spec(init/3 :: (queue_name(), boolean(), boolean()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {non_neg_integer(), state()}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 668f4ae256..ee769d5508 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -146,7 +146,7 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
%% Issue inits to *all* the queues so that they all init at the same time
- [ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue) || Q <- Qs],
+ [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs],
[ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
rabbit_misc:execute_mnesia_transaction(
fun () -> [ok = store_queue(Q) || Q <- Qs] end),
@@ -158,7 +158,7 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
- ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue),
+ ok = gen_server2:cast(Q#amqqueue.pid, {init, false}),
ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
internal_declare(Q, true).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8557cb947f..3b5bd82371 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -722,14 +722,14 @@ handle_call({claim_queue, ReaderPid}, _From,
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-handle_cast(init_backing_queue,
+handle_cast({init, Recover},
State = #q{q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined}) ->
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable)});
+ noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)});
-handle_cast(init_backing_queue, State) ->
+handle_cast({init, _Recover}, State) ->
noreply(State);
handle_cast({deliver, Txn, Message, ChPid}, State) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 7090d9cc59..f21c290f05 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -42,9 +42,10 @@ behaviour_info(callbacks) ->
%% shared resources.
{start, 1},
- %% Called with queue name and a boolean to indicate whether or
- %% not the queue is durable.
- {init, 2},
+ %% Called with queue name, a boolean to indicate whether or
+ %% not the queue is durable, and a boolean to indicate whether
+ %% the queue contents should be attempted to be recovered.
+ {init, 3},
%% Called on queue shutdown when queue isn't being deleted
{terminate, 1},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 7765069fb9..e5811c343f 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_invariable_queue).
--export([init/2, terminate/1, delete_and_terminate/1, purge/1, publish/2,
+-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, sync_callback/1,
@@ -61,8 +61,8 @@
start(DurableQueues) ->
ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]).
-init(QName, IsDurable) ->
- Q = queue:from_list(case IsDurable of
+init(QName, IsDurable, Recover) ->
+ Q = queue:from_list(case IsDurable andalso Recover of
true -> rabbit_persister:queue_content(QName);
false -> []
end),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1ab4f22455..b99634002b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1384,7 +1384,7 @@ assert_prop(List, Prop, Value) ->
fresh_variable_queue() ->
stop_msg_store(),
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false),
S0 = rabbit_variable_queue:status(VQ),
assert_prop(S0, len, 0),
assert_prop(S0, q1, 0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b33df24fc7..35d2b19145 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/2, terminate/1, publish/2, publish_delivered/3,
+-export([init/3, terminate/1, publish/2, publish_delivered/3,
set_ram_duration_target/2, ram_duration/1, fetch/2, ack/2, len/1,
is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/3,
tx_ack/3, tx_rollback/2, tx_commit/3, sync_callback/1,
@@ -266,7 +266,7 @@ start(DurableQueues) ->
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
PersistRefs, PersistStartFunState]).
-init(QueueName, IsDurable) ->
+init(QueueName, IsDurable, _Recover) ->
PersistentStore = case IsDurable of
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE