diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-01-14 15:41:52 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-01-14 15:41:52 +0000 |
| commit | 9ff954b2154b39bbe9adc2730733fd0fe3562d02 (patch) | |
| tree | 61926259b0cab72fee8a499021e21d8af32e7667 /src | |
| parent | 168c3076c55283681419dd7ec728634c9ad4bf0b (diff) | |
| parent | bda3866ea8029aba7a05d604f1c93164f4297e00 (diff) | |
| download | rabbitmq-server-git-9ff954b2154b39bbe9adc2730733fd0fe3562d02.tar.gz | |
Merge bug23675
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e2c3694b69..1e9096862b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -554,6 +554,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, _ -> add_tx_participants(DeliveredQPids, State2) end}; +handle_method(#'basic.nack'{delivery_tag = DeliveryTag, + multiple = Multiple, + requeue = Requeue}, + _, State) -> + reject(DeliveryTag, Requeue, Multiple, State); + handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, @@ -739,14 +745,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, - _, State = #ch{unacked_message_q = UAMQ}) -> - {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), - ok = fold_per_queue( - fun (QPid, MsgIds, ok) -> - rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) - end, ok, Acked), - ok = notify_limiter(State#ch.limiter_pid, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}; + _, State) -> + reject(DeliveryTag, Requeue, false, State); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, @@ -1064,6 +1064,15 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). +reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + ok = fold_per_queue( + fun (QPid, MsgIds, ok) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, ok, Acked), + ok = notify_limiter(State#ch.limiter_pid, Acked), + {noreply, State#ch{unacked_message_q = Remaining}}. + ack_record(DeliveryTag, ConsumerTag, _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> {DeliveryTag, ConsumerTag, {QPid, MsgId}}. |
