diff options
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
7 files changed, 16 insertions, 15 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 63f4493b72..7c83bb5213 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -34,7 +34,7 @@ ('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})). -spec(start/1 :: ([queue_name()]) -> 'ok'). --spec(init/2 :: (queue_name(), boolean()) -> state()). +-spec(init/3 :: (queue_name(), boolean(), boolean()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {non_neg_integer(), state()}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 668f4ae256..ee769d5508 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -146,7 +146,7 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], %% Issue inits to *all* the queues so that they all init at the same time - [ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue) || Q <- Qs], + [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs], [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs], rabbit_misc:execute_mnesia_transaction( fun () -> [ok = store_queue(Q) || Q <- Qs] end), @@ -158,7 +158,7 @@ declare(QueueName, Durable, AutoDelete, Args) -> auto_delete = AutoDelete, arguments = Args, pid = none}), - ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue), + ok = gen_server2:cast(Q#amqqueue.pid, {init, false}), ok = gen_server2:call(Q#amqqueue.pid, sync, infinity), internal_declare(Q, true). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8557cb947f..3b5bd82371 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -722,14 +722,14 @@ handle_call({claim_queue, ReaderPid}, _From, handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast(init_backing_queue, +handle_cast({init, Recover}, State = #q{q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined}) -> ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable)}); + noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)}); -handle_cast(init_backing_queue, State) -> +handle_cast({init, _Recover}, State) -> noreply(State); handle_cast({deliver, Txn, Message, ChPid}, State) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 7090d9cc59..f21c290f05 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -42,9 +42,10 @@ behaviour_info(callbacks) -> %% shared resources. {start, 1}, - %% Called with queue name and a boolean to indicate whether or - %% not the queue is durable. - {init, 2}, + %% Called with queue name, a boolean to indicate whether or + %% not the queue is durable, and a boolean to indicate whether + %% the queue contents should be attempted to be recovered. + {init, 3}, %% Called on queue shutdown when queue isn't being deleted {terminate, 1}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 7765069fb9..e5811c343f 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_invariable_queue). --export([init/2, terminate/1, delete_and_terminate/1, purge/1, publish/2, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, sync_callback/1, @@ -61,8 +61,8 @@ start(DurableQueues) -> ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]). -init(QName, IsDurable) -> - Q = queue:from_list(case IsDurable of +init(QName, IsDurable, Recover) -> + Q = queue:from_list(case IsDurable andalso Recover of true -> rabbit_persister:queue_content(QName); false -> [] end), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1ab4f22455..b99634002b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1384,7 +1384,7 @@ assert_prop(List, Prop, Value) -> fresh_variable_queue() -> stop_msg_store(), ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true), + VQ = rabbit_variable_queue:init(test_queue(), true, false), S0 = rabbit_variable_queue:status(VQ), assert_prop(S0, len, 0), assert_prop(S0, q1, 0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b33df24fc7..35d2b19145 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/2, terminate/1, publish/2, publish_delivered/3, +-export([init/3, terminate/1, publish/2, publish_delivered/3, set_ram_duration_target/2, ram_duration/1, fetch/2, ack/2, len/1, is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, sync_callback/1, @@ -266,7 +266,7 @@ start(DurableQueues) -> [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), PersistRefs, PersistStartFunState]). -init(QueueName, IsDurable) -> +init(QueueName, IsDurable, _Recover) -> PersistentStore = case IsDurable of true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE |
