summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl45
1 files changed, 24 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 68032d7641..5841bf6287 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,7 +53,7 @@
publish_seqno,
unconfirmed_mq,
unconfirmed_qm,
- blocked_op,
+ blocked_ops,
queue_monitors,
dlx,
dlx_routing_key
@@ -140,7 +140,7 @@ init(Q) ->
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
- blocked_op = undefined,
+ blocked_ops = [],
queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
@@ -166,7 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
- blocked_op = undefined,
+ blocked_ops = [],
queue_monitors = dict:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
@@ -740,13 +740,19 @@ dead_letter_deleted_queue(_From, State = #q{dlx = undefined,
backing_queue = BQ}) ->
{stop, normal, {ok, BQ:len(BQS)}, State};
dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ,
+ blocked_ops = Ops}) ->
case BQ:len(BQS) of
0 -> dead_letter_deleted_queue(From, State#q{dlx = undefined});
_ -> BQS1 = BQ:dropwhile(fun (_) -> true end,
mk_dead_letter_fun(queue_deleted, State),
BQS),
- noreply(State#q{blocked_op = {delete, {From, BQ:len(BQS)}},
+ Ops1 =
+ case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of
+ true -> Ops; %% don't queue more than one delete
+ false -> [{delete, {From, BQ:len(BQS)}} | Ops]
+ end,
+ noreply(State#q{blocked_ops = Ops1,
backing_queue_state = BQS1})
end.
@@ -846,21 +852,17 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
backing_queue_state = BQS3}).
-cleanup_after_confirm(State = #q{blocked_op = Op,
+cleanup_after_confirm(State = #q{blocked_ops = Ops,
unconfirmed_mq = UMQ}) ->
- State1 = State#q{blocked_op = undefined},
- case {gb_trees:is_empty(UMQ), Op} of
- {true, {purge, {From, Count}}} ->
- gen_server2:reply(From, {ok, Count}),
- noreply(State1);
- {true, {delete, {From, Count}}} ->
- case From of
- undefined -> ok;
- _ -> gen_server2:reply(From, {ok, Count})
- end,
- {stop, normal, State1};
- _ ->
- noreply(State)
+ case gb_trees:is_empty(UMQ) andalso Ops =/= [] of
+ true -> [gen_server2:reply(From, {ok, Count}) ||
+ {_, {From, Count}} <- Ops, From =/= undefined],
+ State1 = State#q{blocked_ops = []},
+ case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of
+ true -> {stop, normal, State1};
+ false -> noreply(State)
+ end;
+ false -> noreply(State)
end.
already_been_here(_Delivery, #q{dlx = undefined}) ->
@@ -1243,7 +1245,8 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
reply({ok, Count}, State#q{backing_queue_state = BQS1});
handle_call(purge, From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ blocked_ops = Ops}) ->
BQS1 = BQ:dropwhile(fun (_) -> true end,
mk_dead_letter_fun(queue_purged, State),
BQS),
@@ -1251,7 +1254,7 @@ handle_call(purge, From, State = #q{backing_queue = BQ,
0 -> reply({ok, 0}, State#q{backing_queue_state = BQS1});
_ -> noreply(
State#q{backing_queue_state = BQS1,
- blocked_op = {purge, {From, BQ:len(BQS)}}})
+ blocked_ops = [{purge, {From, BQ:len(BQS)}} | Ops]})
end;
handle_call({requeue, AckTags, ChPid}, From, State) ->