diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 68032d7641..5841bf6287 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,7 +53,7 @@ publish_seqno, unconfirmed_mq, unconfirmed_qm, - blocked_op, + blocked_ops, queue_monitors, dlx, dlx_routing_key @@ -140,7 +140,7 @@ init(Q) -> publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), - blocked_op = undefined, + blocked_ops = [], queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, @@ -166,7 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), - blocked_op = undefined, + blocked_ops = [], queue_monitors = dict:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( @@ -740,13 +740,19 @@ dead_letter_deleted_queue(_From, State = #q{dlx = undefined, backing_queue = BQ}) -> {stop, normal, {ok, BQ:len(BQS)}, State}; dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ, + blocked_ops = Ops}) -> case BQ:len(BQS) of 0 -> dead_letter_deleted_queue(From, State#q{dlx = undefined}); _ -> BQS1 = BQ:dropwhile(fun (_) -> true end, mk_dead_letter_fun(queue_deleted, State), BQS), - noreply(State#q{blocked_op = {delete, {From, BQ:len(BQS)}}, + Ops1 = + case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of + true -> Ops; %% don't queue more than one delete + false -> [{delete, {From, BQ:len(BQS)}} | Ops] + end, + noreply(State#q{blocked_ops = Ops1, backing_queue_state = BQS1}) end. @@ -846,21 +852,17 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, backing_queue_state = BQS3}). -cleanup_after_confirm(State = #q{blocked_op = Op, +cleanup_after_confirm(State = #q{blocked_ops = Ops, unconfirmed_mq = UMQ}) -> - State1 = State#q{blocked_op = undefined}, - case {gb_trees:is_empty(UMQ), Op} of - {true, {purge, {From, Count}}} -> - gen_server2:reply(From, {ok, Count}), - noreply(State1); - {true, {delete, {From, Count}}} -> - case From of - undefined -> ok; - _ -> gen_server2:reply(From, {ok, Count}) - end, - {stop, normal, State1}; - _ -> - noreply(State) + case gb_trees:is_empty(UMQ) andalso Ops =/= [] of + true -> [gen_server2:reply(From, {ok, Count}) || + {_, {From, Count}} <- Ops, From =/= undefined], + State1 = State#q{blocked_ops = []}, + case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of + true -> {stop, normal, State1}; + false -> noreply(State) + end; + false -> noreply(State) end. already_been_here(_Delivery, #q{dlx = undefined}) -> @@ -1243,7 +1245,8 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, reply({ok, Count}, State#q{backing_queue_state = BQS1}); handle_call(purge, From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + blocked_ops = Ops}) -> BQS1 = BQ:dropwhile(fun (_) -> true end, mk_dead_letter_fun(queue_purged, State), BQS), @@ -1251,7 +1254,7 @@ handle_call(purge, From, State = #q{backing_queue = BQ, 0 -> reply({ok, 0}, State#q{backing_queue_state = BQS1}); _ -> noreply( State#q{backing_queue_state = BQS1, - blocked_op = {purge, {From, BQ:len(BQS)}}}) + blocked_ops = [{purge, {From, BQ:len(BQS)}} | Ops]}) end; handle_call({requeue, AckTags, ChPid}, From, State) -> |
