summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-06-30 13:13:44 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-06-30 13:13:44 +0100
commit2b1ae05e3a4df0c48f52233bac905fd45efbfb5d (patch)
treeddd9654f945b9e6d8b43e3cc05d91bc9066918e6
parentce658f9bbab8ef4df386b4845a5705a8f0728ff9 (diff)
parent7168411651eae2181ab112762b74da08155c7172 (diff)
downloadrabbitmq-server-git-2b1ae05e3a4df0c48f52233bac905fd45efbfb5d.tar.gz
Merge bug21830 into default
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel.erl99
-rw-r--r--src/rabbit_tests.erl7
4 files changed, 59 insertions, 53 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index eebcfcb9a9..f0e536b5ee 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -66,7 +66,7 @@
-spec(start/0 :: () -> 'ok').
-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
- maybe(pid())) -> amqqueue()).
+ maybe(pid())) -> {'new' | 'existing', amqqueue()}).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5fdf0ffa90..70e6e75584 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -137,7 +137,7 @@ declare(Recover, From,
backing_queue = BQ, backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
- Q -> gen_server2:reply(From, Q),
+ Q -> gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use,
[self()]),
@@ -146,7 +146,7 @@ declare(Recover, From,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
noreply(State#q{backing_queue_state = BQS});
- Q1 -> {stop, normal, Q1, State}
+ Q1 -> {stop, normal, {existing, Q1}, State}
end.
terminate_shutdown(Fun, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8649ecc7f1..179a9a9de6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
Reader ! {channel_exit, Channel, Reason},
State#ch{state = terminating}.
-return_queue_declare_ok(State, NoWait, Q) ->
- NewState = State#ch{most_recently_declared_queue =
- (Q#amqqueue.name)#resource.name},
+return_queue_declare_ok(#resource{name = ActualName},
+ NoWait, MessageCount, ConsumerCount, State) ->
+ NewState = State#ch{most_recently_declared_queue = ActualName},
case NoWait of
true -> {noreply, NewState};
- false ->
- {ok, ActualName, MessageCount, ConsumerCount} =
- rabbit_misc:with_exit_handler(
- fun () -> {ok, Q#amqqueue.name, 0, 0} end,
- fun () -> rabbit_amqqueue:stat(Q) end),
- Reply = #'queue.declare_ok'{queue = ActualName#resource.name,
- message_count = MessageCount,
- consumer_count = ConsumerCount},
- {reply, Reply, NewState}
+ false -> Reply = #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount},
+ {reply, Reply, NewState}
end.
check_resource_access(Username, Resource, Perm) ->
@@ -710,13 +705,13 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = Durable,
- exclusive = ExclusiveDeclare,
- auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args},
+handle_method(Declare = #'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = Durable,
+ exclusive = ExclusiveDeclare,
+ auto_delete = AutoDelete,
+ nowait = NoWait,
+ arguments = Args},
_, State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid,
queue_collector_pid = CollectorPid}) ->
@@ -724,37 +719,43 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
true -> ReaderPid;
false -> none
end,
- %% We use this in both branches, because queue_declare may yet return an
- %% existing queue.
ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner) of
- #amqqueue{name = QueueName,
- durable = Durable1,
- auto_delete = AutoDelete1} = Q1
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q1, Owner, strict),
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1)
- end,
- Q1;
- %% non-equivalence trumps exclusivity arbitrarily
- #amqqueue{name = QueueName} ->
- rabbit_misc:protocol_error(
- precondition_failed, "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end,
- return_queue_declare_ok(State, NoWait, Q);
+ check_configure_permitted(QueueName, State),
+ case rabbit_amqqueue:with(QueueName,
+ fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of
+ {{ok, QueueName, MessageCount, ConsumerCount},
+ #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q}
+ when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
+ check_exclusive_access(Q, Owner, strict),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount,
+ ConsumerCount, State);
+ {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]);
+ {error, not_found} ->
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ {new, Q = #amqqueue{}} ->
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as
+ %% the connection shuts down.
+ ok = case Owner of
+ none -> ok;
+ _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ end,
+ return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
+ {existing, _Q} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, undefined, State)
+ end
+ end;
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
@@ -763,8 +764,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
- return_queue_declare_ok(State, NoWait, Q);
+ {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ check_exclusive_access(Q, ReaderPid, lax),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
+ State);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index cf78249793..34eec12183 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -792,10 +792,11 @@ test_server_status() ->
Writer = spawn(fun () -> receive shutdown -> ok end end),
Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
self()),
- [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
+ [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
+ {new, Queue = #amqqueue{}} <-
+ [rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, [], none) ||
- Name <- [<<"foo">>, <<"bar">>]],
+ false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),