diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-14 22:44:09 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-14 22:44:09 +0000 |
| commit | a928c53af58efe4a860cc425e5a8dca3578c3845 (patch) | |
| tree | dd8d4baee52656e6bad1f6e4080a113f9f279406 | |
| parent | 50a345df8f8d165e443ad5c25f57b118f7c4b139 (diff) | |
| download | rabbitmq-server-git-a928c53af58efe4a860cc425e5a8dca3578c3845.tar.gz | |
don't hang on multiple queue.purges
Following Matthias' suggestion, we don't ok multiple deletes for the same
queue: the first delete gets ok'd, but the other ones will get an exception.
Because confirming multiple purges on the same queue is fine, but confirming
multiple deletes on one queue is weird (if not wrong).
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 68032d7641..5841bf6287 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,7 +53,7 @@ publish_seqno, unconfirmed_mq, unconfirmed_qm, - blocked_op, + blocked_ops, queue_monitors, dlx, dlx_routing_key @@ -140,7 +140,7 @@ init(Q) -> publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), - blocked_op = undefined, + blocked_ops = [], queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, @@ -166,7 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), - blocked_op = undefined, + blocked_ops = [], queue_monitors = dict:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( @@ -740,13 +740,19 @@ dead_letter_deleted_queue(_From, State = #q{dlx = undefined, backing_queue = BQ}) -> {stop, normal, {ok, BQ:len(BQS)}, State}; dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ, + blocked_ops = Ops}) -> case BQ:len(BQS) of 0 -> dead_letter_deleted_queue(From, State#q{dlx = undefined}); _ -> BQS1 = BQ:dropwhile(fun (_) -> true end, mk_dead_letter_fun(queue_deleted, State), BQS), - noreply(State#q{blocked_op = {delete, {From, BQ:len(BQS)}}, + Ops1 = + case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of + true -> Ops; %% don't queue more than one delete + false -> [{delete, {From, BQ:len(BQS)}} | Ops] + end, + noreply(State#q{blocked_ops = Ops1, backing_queue_state = BQS1}) end. @@ -846,21 +852,17 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, backing_queue_state = BQS3}). -cleanup_after_confirm(State = #q{blocked_op = Op, +cleanup_after_confirm(State = #q{blocked_ops = Ops, unconfirmed_mq = UMQ}) -> - State1 = State#q{blocked_op = undefined}, - case {gb_trees:is_empty(UMQ), Op} of - {true, {purge, {From, Count}}} -> - gen_server2:reply(From, {ok, Count}), - noreply(State1); - {true, {delete, {From, Count}}} -> - case From of - undefined -> ok; - _ -> gen_server2:reply(From, {ok, Count}) - end, - {stop, normal, State1}; - _ -> - noreply(State) + case gb_trees:is_empty(UMQ) andalso Ops =/= [] of + true -> [gen_server2:reply(From, {ok, Count}) || + {_, {From, Count}} <- Ops, From =/= undefined], + State1 = State#q{blocked_ops = []}, + case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of + true -> {stop, normal, State1}; + false -> noreply(State) + end; + false -> noreply(State) end. already_been_here(_Delivery, #q{dlx = undefined}) -> @@ -1243,7 +1245,8 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, reply({ok, Count}, State#q{backing_queue_state = BQS1}); handle_call(purge, From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + blocked_ops = Ops}) -> BQS1 = BQ:dropwhile(fun (_) -> true end, mk_dead_letter_fun(queue_purged, State), BQS), @@ -1251,7 +1254,7 @@ handle_call(purge, From, State = #q{backing_queue = BQ, 0 -> reply({ok, 0}, State#q{backing_queue_state = BQS1}); _ -> noreply( State#q{backing_queue_state = BQS1, - blocked_op = {purge, {From, BQ:len(BQS)}}}) + blocked_ops = [{purge, {From, BQ:len(BQS)}} | Ops]}) end; handle_call({requeue, AckTags, ChPid}, From, State) -> |
