summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-14 22:44:09 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-14 22:44:09 +0000
commita928c53af58efe4a860cc425e5a8dca3578c3845 (patch)
treedd8d4baee52656e6bad1f6e4080a113f9f279406 /src
parent50a345df8f8d165e443ad5c25f57b118f7c4b139 (diff)
downloadrabbitmq-server-git-a928c53af58efe4a860cc425e5a8dca3578c3845.tar.gz
don't hang on multiple queue.purges
Following Matthias' suggestion, we don't ok multiple deletes for the same queue: the first delete gets ok'd, but the other ones will get an exception. Because confirming multiple purges on the same queue is fine, but confirming multiple deletes on one queue is weird (if not wrong).
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl45
1 files changed, 24 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 68032d7641..5841bf6287 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,7 +53,7 @@
publish_seqno,
unconfirmed_mq,
unconfirmed_qm,
- blocked_op,
+ blocked_ops,
queue_monitors,
dlx,
dlx_routing_key
@@ -140,7 +140,7 @@ init(Q) ->
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
- blocked_op = undefined,
+ blocked_ops = [],
queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
@@ -166,7 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
- blocked_op = undefined,
+ blocked_ops = [],
queue_monitors = dict:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
@@ -740,13 +740,19 @@ dead_letter_deleted_queue(_From, State = #q{dlx = undefined,
backing_queue = BQ}) ->
{stop, normal, {ok, BQ:len(BQS)}, State};
dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ,
+ blocked_ops = Ops}) ->
case BQ:len(BQS) of
0 -> dead_letter_deleted_queue(From, State#q{dlx = undefined});
_ -> BQS1 = BQ:dropwhile(fun (_) -> true end,
mk_dead_letter_fun(queue_deleted, State),
BQS),
- noreply(State#q{blocked_op = {delete, {From, BQ:len(BQS)}},
+ Ops1 =
+ case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of
+ true -> Ops; %% don't queue more than one delete
+ false -> [{delete, {From, BQ:len(BQS)}} | Ops]
+ end,
+ noreply(State#q{blocked_ops = Ops1,
backing_queue_state = BQS1})
end.
@@ -846,21 +852,17 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
backing_queue_state = BQS3}).
-cleanup_after_confirm(State = #q{blocked_op = Op,
+cleanup_after_confirm(State = #q{blocked_ops = Ops,
unconfirmed_mq = UMQ}) ->
- State1 = State#q{blocked_op = undefined},
- case {gb_trees:is_empty(UMQ), Op} of
- {true, {purge, {From, Count}}} ->
- gen_server2:reply(From, {ok, Count}),
- noreply(State1);
- {true, {delete, {From, Count}}} ->
- case From of
- undefined -> ok;
- _ -> gen_server2:reply(From, {ok, Count})
- end,
- {stop, normal, State1};
- _ ->
- noreply(State)
+ case gb_trees:is_empty(UMQ) andalso Ops =/= [] of
+ true -> [gen_server2:reply(From, {ok, Count}) ||
+ {_, {From, Count}} <- Ops, From =/= undefined],
+ State1 = State#q{blocked_ops = []},
+ case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of
+ true -> {stop, normal, State1};
+ false -> noreply(State)
+ end;
+ false -> noreply(State)
end.
already_been_here(_Delivery, #q{dlx = undefined}) ->
@@ -1243,7 +1245,8 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
reply({ok, Count}, State#q{backing_queue_state = BQS1});
handle_call(purge, From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ blocked_ops = Ops}) ->
BQS1 = BQ:dropwhile(fun (_) -> true end,
mk_dead_letter_fun(queue_purged, State),
BQS),
@@ -1251,7 +1254,7 @@ handle_call(purge, From, State = #q{backing_queue = BQ,
0 -> reply({ok, 0}, State#q{backing_queue_state = BQS1});
_ -> noreply(
State#q{backing_queue_state = BQS1,
- blocked_op = {purge, {From, BQ:len(BQS)}}})
+ blocked_ops = [{purge, {From, BQ:len(BQS)}} | Ops]})
end;
handle_call({requeue, AckTags, ChPid}, From, State) ->