diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-07 18:27:17 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-07 18:27:17 +0100 |
| commit | ff481de59d0562cccaf1514a553f8e333941fa56 (patch) | |
| tree | 5afd8817c91791dda7ea0153664e6fa335609bc0 | |
| parent | d371f89d7602ddf38b3beb532fbd7afa21fa5729 (diff) | |
| parent | acdf16cf0ca9547832b2a1ba9f535b4fcb8391c6 (diff) | |
| download | rabbitmq-server-git-ff481de59d0562cccaf1514a553f8e333941fa56.tar.gz | |
Merging bug 22821 onto default
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 69 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 23 |
3 files changed, 47 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1756640a90..764882556f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -195,7 +195,8 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> Exchange = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], + fun (_X, _Q) -> ok end), ok. lookup(Name) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8bc53b4a80..9127c44b62 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -329,21 +329,18 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]). + with_exclusive_access_or_die(QName, ReaderPid, F) -> - case rabbit_amqqueue:with_or_die( - QName, fun (Q = #amqqueue{exclusive_owner = Owner}) - when Owner =:= none orelse Owner =:= ReaderPid -> - F(Q); - (_) -> - {error, wrong_exclusive_owner} - end) of - {error, wrong_exclusive_owner} -> - rabbit_misc:protocol_error( - resource_locked, "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(QName)]); - Other -> - Other - end. + rabbit_amqqueue:with_or_die( + QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end). expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( @@ -489,11 +486,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{ writer_pid = WriterPid, + reader_pid = ReaderPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, @@ -735,25 +733,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, %% We use this in both branches, because queue_declare may yet return an %% existing queue. - Finish = - fun (#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q) - when Owner =:= Owner1 -> - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) - end, - Q; - (#amqqueue{name = QueueName}) -> - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(QueueName)]) - end, + Finish = fun (#amqqueue{name = QueueName} = Q) -> + check_exclusive_access(Q, Owner, strict), + check_configure_permitted(QueueName, State), + case Owner of + none -> ok; + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + Q + end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), Finish) of @@ -807,7 +795,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -815,7 +803,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); @@ -918,7 +906,9 @@ issue_flow(Active, State) -> pending = {Ref, TRef}}}. binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + ReturnMethod, NoWait, + State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - @@ -929,7 +919,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 835b1468ae..d237134f5d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2]). --export([add_binding/4, delete_binding/4, list_bindings/1]). +-export([add_binding/5, delete_binding/5, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([check_type/1, assert_type/2]). @@ -58,6 +58,8 @@ 'queue_not_found' | 'exchange_not_found' | 'exchange_and_queue_not_found'}). +-type(inner_fun() :: fun((exchange(), queue()) -> any())). + -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(), amqp_table()) -> exchange()). @@ -72,11 +74,11 @@ -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(add_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> +-spec(add_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> +-spec(delete_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). @@ -367,10 +369,14 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end end). -add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% failing on e.g., the durability being different. + InnerFun(X, Q), if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> @@ -392,14 +398,15 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> Err end. -delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, + _ -> InnerFun(X, Q), + ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:delete_object/3), {maybe_auto_delete(X), B} end |
