diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 |
2 files changed, 16 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9ebec3992f..4a3a8a5b1f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -269,7 +269,7 @@ redeliver(QPid, Messages) -> gen_server2:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server2:call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 29ebc87348..8b1d79c564 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -704,7 +704,21 @@ handle_call({delete, IfUnused, IfEmpty}, _From, handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> ok = purge_message_buffer(qname(State), MessageBuffer), reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}). + State#q{message_buffer = queue:new()}); + +handle_call({requeue, MsgIds, ChPid}, _From, State) -> + case lookup_ch(ChPid) of + not_found -> + rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", + [ChPid]), + reply(ok, State); + C = #cr{unacked_messages = UAM} -> + {Messages, NewUAM} = collect_messages(MsgIds, UAM), + store_ch_record(C#cr{unacked_messages = NewUAM}), + reply(ok, deliver_or_enqueue_n( + [{Message, true} || Message <- Messages], State)) + end. + handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -735,19 +749,6 @@ handle_cast({rollback, Txn}, State) -> handle_cast({redeliver, Messages}, State) -> noreply(deliver_or_enqueue_n(Messages, State)); -handle_cast({requeue, MsgIds, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> - rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", - [ChPid]), - noreply(State); - C = #cr{unacked_messages = UAM} -> - {Messages, NewUAM} = collect_messages(MsgIds, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)) - end; - handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, |
