summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl15
2 files changed, 17 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b6e92e0698..c14a28fe11 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -170,8 +170,9 @@ recover_durable_queues(DurableQueues) ->
end
end) of
true ->
- ok = gen_server2:cast(Q#amqqueue.pid,
- init_variable_queue),
+ ok = gen_server2:call(Q#amqqueue.pid,
+ init_variable_queue,
+ infinity),
[Q|Acc];
false -> exit(Q#amqqueue.pid, shutdown),
Acc
@@ -200,6 +201,9 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
true -> add_default_binding(Q);
false -> ok
end,
+ ok = gen_server2:call(
+ Q#amqqueue.pid,
+ init_variable_queue, infinity),
Q;
[_] -> not_found %% existing Q on stopped node
end;
@@ -209,8 +213,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
end) of
not_found -> exit(Q#amqqueue.pid, shutdown),
rabbit_misc:not_found(QueueName);
- Q -> ok = gen_server2:cast(Q#amqqueue.pid, init_variable_queue),
- Q;
+ Q -> Q;
ExistingQ -> exit(Q#amqqueue.pid, shutdown),
ExistingQ
end.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e6c8d238f8..1394f9db75 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -621,6 +621,16 @@ i(Item, _) ->
%---------------------------------------------------------------------------
+handle_call(init_variable_queue, From, State =
+ #q{variable_queue_state = undefined,
+ q = #amqqueue{name = QName}}) ->
+ gen_server2:reply(From, ok),
+ noreply(
+ State #q { variable_queue_state = rabbit_variable_queue:init(QName) });
+
+handle_call(init_variable_queue, _From, State) ->
+ reply(ok, State);
+
handle_call(sync, _From, State) ->
reply(ok, State);
@@ -829,11 +839,6 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(locked, State)
end.
-handle_cast(init_variable_queue, #q{variable_queue_state = undefined,
- q = #amqqueue{name = QName}} = State) ->
- noreply(
- State #q { variable_queue_state = rabbit_variable_queue:init(QName) });
-
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),