diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 |
2 files changed, 17 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b6e92e0698..c14a28fe11 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -170,8 +170,9 @@ recover_durable_queues(DurableQueues) -> end end) of true -> - ok = gen_server2:cast(Q#amqqueue.pid, - init_variable_queue), + ok = gen_server2:call(Q#amqqueue.pid, + init_variable_queue, + infinity), [Q|Acc]; false -> exit(Q#amqqueue.pid, shutdown), Acc @@ -200,6 +201,9 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> true -> add_default_binding(Q); false -> ok end, + ok = gen_server2:call( + Q#amqqueue.pid, + init_variable_queue, infinity), Q; [_] -> not_found %% existing Q on stopped node end; @@ -209,8 +213,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> end) of not_found -> exit(Q#amqqueue.pid, shutdown), rabbit_misc:not_found(QueueName); - Q -> ok = gen_server2:cast(Q#amqqueue.pid, init_variable_queue), - Q; + Q -> Q; ExistingQ -> exit(Q#amqqueue.pid, shutdown), ExistingQ end. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e6c8d238f8..1394f9db75 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -621,6 +621,16 @@ i(Item, _) -> %--------------------------------------------------------------------------- +handle_call(init_variable_queue, From, State = + #q{variable_queue_state = undefined, + q = #amqqueue{name = QName}}) -> + gen_server2:reply(From, ok), + noreply( + State #q { variable_queue_state = rabbit_variable_queue:init(QName) }); + +handle_call(init_variable_queue, _From, State) -> + reply(ok, State); + handle_call(sync, _From, State) -> reply(ok, State); @@ -829,11 +839,6 @@ handle_call({claim_queue, ReaderPid}, _From, reply(locked, State) end. -handle_cast(init_variable_queue, #q{variable_queue_state = undefined, - q = #amqqueue{name = QName}} = State) -> - noreply( - State #q { variable_queue_state = rabbit_variable_queue:init(QName) }); - handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), |
