summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-28 14:45:28 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-28 14:45:28 +0000
commit013608177ac3cddcecb3d204239ceb70b6bae4c7 (patch)
tree7ba4b7b8f5cef4812a1f05e57be35bae6863e305 /src
parentc554f590c987ed10178b5c8188e8f2ab40547206 (diff)
downloadrabbitmq-server-git-013608177ac3cddcecb3d204239ceb70b6bae4c7.tar.gz
don't hang if there are no DLQs bound to the DLX
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl37
1 files changed, 24 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 981501f1a1..3d1464bc2d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -743,9 +743,12 @@ dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS,
backing_queue_state = BQS1})
end.
-dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC,
- dlx = DLX}) ->
+dead_letter_msg(Msg, AckTag, Reason,
+ State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC,
+ dlx = DLX,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
rabbit_exchange:lookup_or_die(DLX),
{ok, _, QPids} =
@@ -754,9 +757,15 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
MsgSeqNo)),
State1 = lists:foldl(fun monitor_queue/2, State, QPids),
- UC1 = gb_trees:insert(MsgSeqNo, {gb_sets:from_list(QPids), AckTag}, UC),
- State1#q{publish_seqno = MsgSeqNo + 1,
- unconfirmed = UC1}.
+ State2 = State1#q{publish_seqno = MsgSeqNo + 1},
+ case QPids of
+ [] -> {_, BQS1} = BQ:ack([AckTag], undefined, BQS),
+ cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
+ _ -> noreply(State2#q{
+ unconfirmed = gb_trees:insert(
+ MsgSeqNo, {gb_sets:from_list(QPids),
+ AckTag}, UC)})
+ end.
monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
case dict:is_key(QPid, QMons) of
@@ -783,8 +792,7 @@ handle_queue_down(QPid, State = #q{queue_monitors = QMons,
handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC,
backing_queue = BQ,
- backing_queue_state = BQS,
- blocked_op = Op}) ->
+ backing_queue_state = BQS}) ->
{BQS3, UC3} =
lists:foldl(
fun (MsgSeqNo, {BQS1, UC1}) ->
@@ -798,10 +806,13 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC,
{QPids1, AckTag}, UC1)}
end
end, {BQS, UC}, MsgSeqNos),
- State1 = State#q{unconfirmed = UC3,
- backing_queue_state = BQS3,
- blocked_op = undefined},
- case {gb_trees:is_empty(UC3), Op} of
+ cleanup_after_confirm(State#q{unconfirmed = UC3,
+ backing_queue_state = BQS3}).
+
+cleanup_after_confirm(State = #q{blocked_op = Op,
+ unconfirmed = UC}) ->
+ State1 = State#q{blocked_op = undefined},
+ case {gb_trees:is_empty(UC), Op} of
{true, {purge, {From, Count}}} ->
gen_server2:reply(From, {ok, Count}),
noreply(State1);
@@ -1269,7 +1280,7 @@ handle_cast({confirm, MsgSeqNos, QPid}, State) ->
handle_confirm(MsgSeqNos, QPid, State);
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
- noreply(dead_letter_msg(Msg, AckTag, Reason, State)).
+ dead_letter_msg(Msg, AckTag, Reason, State).
handle_info(maybe_expire, State) ->
case is_unused(State) of