summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl29
2 files changed, 16 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9ebec3992f..4a3a8a5b1f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -269,7 +269,7 @@ redeliver(QPid, Messages) ->
gen_server2:cast(QPid, {redeliver, Messages}).
requeue(QPid, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
+ gen_server2:call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 29ebc87348..8b1d79c564 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -704,7 +704,21 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
ok = purge_message_buffer(qname(State), MessageBuffer),
reply({ok, queue:len(MessageBuffer)},
- State#q{message_buffer = queue:new()}).
+ State#q{message_buffer = queue:new()});
+
+handle_call({requeue, MsgIds, ChPid}, _From, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
+ [ChPid]),
+ reply(ok, State);
+ C = #cr{unacked_messages = UAM} ->
+ {Messages, NewUAM} = collect_messages(MsgIds, UAM),
+ store_ch_record(C#cr{unacked_messages = NewUAM}),
+ reply(ok, deliver_or_enqueue_n(
+ [{Message, true} || Message <- Messages], State))
+ end.
+
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -735,19 +749,6 @@ handle_cast({rollback, Txn}, State) ->
handle_cast({redeliver, Messages}, State) ->
noreply(deliver_or_enqueue_n(Messages, State));
-handle_cast({requeue, MsgIds, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found ->
- rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
- [ChPid]),
- noreply(State);
- C = #cr{unacked_messages = UAM} ->
- {Messages, NewUAM} = collect_messages(MsgIds, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM}),
- noreply(deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State))
- end;
-
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,