diff options
| author | Michael Bridgen <mikeb@lshift.net> | 2009-11-17 18:31:43 +0000 |
|---|---|---|
| committer | Michael Bridgen <mikeb@lshift.net> | 2009-11-17 18:31:43 +0000 |
| commit | 9d8f56e2a79242a057f0a1e4a03eb3453bf956a2 (patch) | |
| tree | d35f579d00bbd51c0772b3aae09a36caf844d6fe | |
| parent | 6e59f6bb13d5d457ffdf095bc73808c76164fe51 (diff) | |
| download | rabbitmq-server-git-9d8f56e2a79242a057f0a1e4a03eb3453bf956a2.tar.gz | |
Bug 21986: Use gen_server2:call instead of gen_server2:cast to make
this synchronous. This means recover_async is now also synchronous,
but it's deprecated, and anyway we do the same thing with no_wait
elsewhere. This way we have just the one code path.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 |
2 files changed, 16 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9ebec3992f..4a3a8a5b1f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -269,7 +269,7 @@ redeliver(QPid, Messages) -> gen_server2:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server2:call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 29ebc87348..8b1d79c564 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -704,7 +704,21 @@ handle_call({delete, IfUnused, IfEmpty}, _From, handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> ok = purge_message_buffer(qname(State), MessageBuffer), reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}). + State#q{message_buffer = queue:new()}); + +handle_call({requeue, MsgIds, ChPid}, _From, State) -> + case lookup_ch(ChPid) of + not_found -> + rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", + [ChPid]), + reply(ok, State); + C = #cr{unacked_messages = UAM} -> + {Messages, NewUAM} = collect_messages(MsgIds, UAM), + store_ch_record(C#cr{unacked_messages = NewUAM}), + reply(ok, deliver_or_enqueue_n( + [{Message, true} || Message <- Messages], State)) + end. + handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -735,19 +749,6 @@ handle_cast({rollback, Txn}, State) -> handle_cast({redeliver, Messages}, State) -> noreply(deliver_or_enqueue_n(Messages, State)); -handle_cast({requeue, MsgIds, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> - rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", - [ChPid]), - noreply(State); - C = #cr{unacked_messages = UAM} -> - {Messages, NewUAM} = collect_messages(MsgIds, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)) - end; - handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, |
