summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-07 18:27:17 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-07 18:27:17 +0100
commitff481de59d0562cccaf1514a553f8e333941fa56 (patch)
tree5afd8817c91791dda7ea0153664e6fa335609bc0
parentd371f89d7602ddf38b3beb532fbd7afa21fa5729 (diff)
parentacdf16cf0ca9547832b2a1ba9f535b4fcb8391c6 (diff)
downloadrabbitmq-server-git-ff481de59d0562cccaf1514a553f8e333941fa56.tar.gz
Merging bug 22821 onto default
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_channel.erl69
-rw-r--r--src/rabbit_exchange.erl23
3 files changed, 47 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1756640a90..764882556f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -195,7 +195,8 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [],
+ fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8bc53b4a80..9127c44b62 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -329,21 +329,18 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
+check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
+ ok;
+check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
+ ok;
+check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]).
+
with_exclusive_access_or_die(QName, ReaderPid, F) ->
- case rabbit_amqqueue:with_or_die(
- QName, fun (Q = #amqqueue{exclusive_owner = Owner})
- when Owner =:= none orelse Owner =:= ReaderPid ->
- F(Q);
- (_) ->
- {error, wrong_exclusive_owner}
- end) of
- {error, wrong_exclusive_owner} ->
- rabbit_misc:protocol_error(
- resource_locked, "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(QName)]);
- Other ->
- Other
- end.
+ rabbit_amqqueue:with_or_die(
+ QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end).
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
@@ -489,11 +486,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
@@ -735,25 +733,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
%% We use this in both branches, because queue_declare may yet return an
%% existing queue.
- Finish =
- fun (#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q)
- when Owner =:= Owner1 ->
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
- end,
- Q;
- (#amqqueue{name = QueueName}) ->
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(QueueName)])
- end,
+ Finish = fun (#amqqueue{name = QueueName} = Q) ->
+ check_exclusive_access(Q, Owner, strict),
+ check_configure_permitted(QueueName, State),
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ end,
+ Q
+ end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
Finish) of
@@ -807,7 +795,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -815,7 +803,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
@@ -918,7 +906,9 @@ issue_flow(Active, State) ->
pending = {Ref, TRef}}}.
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ ReturnMethod, NoWait,
+ State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -929,7 +919,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 835b1468ae..d237134f5d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2]).
--export([add_binding/4, delete_binding/4, list_bindings/1]).
+-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([check_type/1, assert_type/2]).
@@ -58,6 +58,8 @@
'queue_not_found' |
'exchange_not_found' |
'exchange_and_queue_not_found'}).
+-type(inner_fun() :: fun((exchange(), queue()) -> any())).
+
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
amqp_table()) -> exchange()).
@@ -72,11 +74,11 @@
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(add_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(add_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(delete_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
@@ -367,10 +369,14 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end
end).
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% failing on e.g., the durability being different.
+ InnerFun(X, Q),
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
@@ -392,14 +398,15 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
Err
end.
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
+ _ -> InnerFun(X, Q),
+ ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3),
{maybe_auto_delete(X), B}
end