diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-14 19:30:41 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-14 19:30:41 +0000 |
| commit | 0e51a4c04597b9b4ced8dc6c91e056c29b360970 (patch) | |
| tree | 33b76d93a55040414888f5218b9118609b4432ed /src | |
| parent | 12f50e7fd4cf1e8f9f545dc297ba92c8b7b42aab (diff) | |
| download | rabbitmq-server-git-0e51a4c04597b9b4ced8dc6c91e056c29b360970.tar.gz | |
always issue credits, even when we drop a message
...due to a dead-letter cycle
and make the code a whole lot simpler
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 48 |
1 files changed, 18 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 298025048d..68032d7641 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -863,6 +863,8 @@ cleanup_after_confirm(State = #q{blocked_op = Op, noreply(State) end. +already_been_here(_Delivery, #q{dlx = undefined}) -> + false; already_been_here(#delivery{message = #basic_message{content = Content}}, State) -> #content{properties = #'P_basic'{headers = Headers}} = @@ -1262,37 +1264,23 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); handle_cast({deliver, Delivery = #delivery{sender = Sender, - msg_seq_no = MsgSeqNo}, - Flow}, State = #q{dlx = DLX}) -> + msg_seq_no = MsgSeqNo}, Flow}, + State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - ShouldDeliver = - case DLX of - undefined -> - true; - _ -> - case already_been_here(Delivery, State) of - false -> true; - Qs -> log_cycle_once(Qs), - rabbit_misc:confirm_to_sender(Sender, - [MsgSeqNo]), - false - end - end, - case ShouldDeliver of - false -> noreply(State); - true -> case Flow of - flow -> - Key = {ch_publisher, Sender}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, - Sender)); - _ -> ok - end, - credit_flow:ack(Sender); - noflow -> - ok - end, - noreply(deliver_or_enqueue(Delivery, State)) + case Flow of + flow -> Key = {ch_publisher, Sender}, + case get(Key) of + undefined -> put(Key, erlang:monitor(process, Sender)); + _ -> ok + end, + credit_flow:ack(Sender); + noflow -> ok + end, + case already_been_here(Delivery, State) of + false -> noreply(deliver_or_enqueue(Delivery, State)); + Qs -> log_cycle_once(Qs), + rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), + noreply(State) end; handle_cast({ack, AckTags, ChPid}, State) -> |
