diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2010-06-30 13:13:44 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2010-06-30 13:13:44 +0100 |
| commit | 2b1ae05e3a4df0c48f52233bac905fd45efbfb5d (patch) | |
| tree | ddd9654f945b9e6d8b43e3cc05d91bc9066918e6 | |
| parent | ce658f9bbab8ef4df386b4845a5705a8f0728ff9 (diff) | |
| parent | 7168411651eae2181ab112762b74da08155c7172 (diff) | |
| download | rabbitmq-server-git-2b1ae05e3a4df0c48f52233bac905fd45efbfb5d.tar.gz | |
Merge bug21830 into default
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 99 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 7 |
4 files changed, 59 insertions, 53 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eebcfcb9a9..f0e536b5ee 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -66,7 +66,7 @@ -spec(start/0 :: () -> 'ok'). -spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), - maybe(pid())) -> amqqueue()). + maybe(pid())) -> {'new' | 'existing', amqqueue()}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5fdf0ffa90..70e6e75584 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -137,7 +137,7 @@ declare(Recover, From, backing_queue = BQ, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; - Q -> gen_server2:reply(From, Q), + Q -> gen_server2:reply(From, {new, Q}), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), @@ -146,7 +146,7 @@ declare(Recover, From, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), noreply(State#q{backing_queue_state = BQS}); - Q1 -> {stop, normal, Q1, State} + Q1 -> {stop, normal, {existing, Q1}, State} end. terminate_shutdown(Fun, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8649ecc7f1..179a9a9de6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> Reader ! {channel_exit, Channel, Reason}, State#ch{state = terminating}. -return_queue_declare_ok(State, NoWait, Q) -> - NewState = State#ch{most_recently_declared_queue = - (Q#amqqueue.name)#resource.name}, +return_queue_declare_ok(#resource{name = ActualName}, + NoWait, MessageCount, ConsumerCount, State) -> + NewState = State#ch{most_recently_declared_queue = ActualName}, case NoWait of true -> {noreply, NewState}; - false -> - {ok, ActualName, MessageCount, ConsumerCount} = - rabbit_misc:with_exit_handler( - fun () -> {ok, Q#amqqueue.name, 0, 0} end, - fun () -> rabbit_amqqueue:stat(Q) end), - Reply = #'queue.declare_ok'{queue = ActualName#resource.name, - message_count = MessageCount, - consumer_count = ConsumerCount}, - {reply, Reply, NewState} + false -> Reply = #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}, + {reply, Reply, NewState} end. check_resource_access(Username, Resource, Perm) -> @@ -710,13 +705,13 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = Durable, - exclusive = ExclusiveDeclare, - auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args}, +handle_method(Declare = #'queue.declare'{queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, + auto_delete = AutoDelete, + nowait = NoWait, + arguments = Args}, _, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid, queue_collector_pid = CollectorPid}) -> @@ -724,37 +719,43 @@ handle_method(#'queue.declare'{queue = QueueNameBin, true -> ReaderPid; false -> none end, - %% We use this in both branches, because queue_declare may yet return an - %% existing queue. ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner) of - #amqqueue{name = QueueName, - durable = Durable1, - auto_delete = AutoDelete1} = Q1 - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q1, Owner, strict), - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1) - end, - Q1; - %% non-equivalence trumps exclusivity arbitrarily - #amqqueue{name = QueueName} -> - rabbit_misc:protocol_error( - precondition_failed, "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end, - return_queue_declare_ok(State, NoWait, Q); + check_configure_permitted(QueueName, State), + case rabbit_amqqueue:with(QueueName, + fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of + {{ok, QueueName, MessageCount, ConsumerCount}, + #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q} + when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> + check_exclusive_access(Q, Owner, strict), + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} -> + rabbit_misc:protocol_error( + precondition_failed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]); + {error, not_found} -> + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + {new, Q = #amqqueue{}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case Owner of + none -> ok; + _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + return_queue_declare_ok(QueueName, NoWait, 0, 0, State); + {existing, _Q} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, undefined, State) + end + end; handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, @@ -763,8 +764,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end), - return_queue_declare_ok(State, NoWait, Q); + {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + check_exclusive_access(Q, ReaderPid, lax), + return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, + State); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index cf78249793..34eec12183 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -792,10 +792,11 @@ test_server_status() -> Writer = spawn(fun () -> receive shutdown -> ok end end), Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, self()), - [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( + [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], + {new, Queue = #amqqueue{}} <- + [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), - false, false, [], none) || - Name <- [<<"foo">>, <<"bar">>]], + false, false, [], none)]], ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, <<"ctag">>, true, undefined), |
