summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-02-14 19:30:41 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-02-14 19:30:41 +0000
commit0e51a4c04597b9b4ced8dc6c91e056c29b360970 (patch)
tree33b76d93a55040414888f5218b9118609b4432ed /src
parent12f50e7fd4cf1e8f9f545dc297ba92c8b7b42aab (diff)
downloadrabbitmq-server-git-0e51a4c04597b9b4ced8dc6c91e056c29b360970.tar.gz
always issue credits, even when we drop a message
...due to a dead-letter cycle and make the code a whole lot simpler
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl48
1 files changed, 18 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 298025048d..68032d7641 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -863,6 +863,8 @@ cleanup_after_confirm(State = #q{blocked_op = Op,
noreply(State)
end.
+already_been_here(_Delivery, #q{dlx = undefined}) ->
+ false;
already_been_here(#delivery{message = #basic_message{content = Content}},
State) ->
#content{properties = #'P_basic'{headers = Headers}} =
@@ -1262,37 +1264,23 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
handle_cast({deliver, Delivery = #delivery{sender = Sender,
- msg_seq_no = MsgSeqNo},
- Flow}, State = #q{dlx = DLX}) ->
+ msg_seq_no = MsgSeqNo}, Flow},
+ State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- ShouldDeliver =
- case DLX of
- undefined ->
- true;
- _ ->
- case already_been_here(Delivery, State) of
- false -> true;
- Qs -> log_cycle_once(Qs),
- rabbit_misc:confirm_to_sender(Sender,
- [MsgSeqNo]),
- false
- end
- end,
- case ShouldDeliver of
- false -> noreply(State);
- true -> case Flow of
- flow ->
- Key = {ch_publisher, Sender},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process,
- Sender));
- _ -> ok
- end,
- credit_flow:ack(Sender);
- noflow ->
- ok
- end,
- noreply(deliver_or_enqueue(Delivery, State))
+ case Flow of
+ flow -> Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process, Sender));
+ _ -> ok
+ end,
+ credit_flow:ack(Sender);
+ noflow -> ok
+ end,
+ case already_been_here(Delivery, State) of
+ false -> noreply(deliver_or_enqueue(Delivery, State));
+ Qs -> log_cycle_once(Qs),
+ rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
+ noreply(State)
end;
handle_cast({ack, AckTags, ChPid}, State) ->