summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-01-14 15:41:52 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-01-14 15:41:52 +0000
commit9ff954b2154b39bbe9adc2730733fd0fe3562d02 (patch)
tree61926259b0cab72fee8a499021e21d8af32e7667 /src
parent168c3076c55283681419dd7ec728634c9ad4bf0b (diff)
parentbda3866ea8029aba7a05d604f1c93164f4297e00 (diff)
downloadrabbitmq-server-git-9ff954b2154b39bbe9adc2730733fd0fe3562d02.tar.gz
Merge bug23675
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl25
1 files changed, 17 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e2c3694b69..1e9096862b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -554,6 +554,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
_ -> add_tx_participants(DeliveredQPids, State2)
end};
+handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = Multiple,
+ requeue = Requeue},
+ _, State) ->
+ reject(DeliveryTag, Requeue, Multiple, State);
+
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
@@ -739,14 +745,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
- _, State = #ch{unacked_message_q = UAMQ}) ->
- {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}};
+ _, State) ->
+ reject(DeliveryTag, Requeue, false, State);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -1064,6 +1064,15 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
+reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}}.
+
ack_record(DeliveryTag, ConsumerTag,
_MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.