summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@lshift.net>2009-11-03 17:13:10 +0000
committerMichael Bridgen <mikeb@lshift.net>2009-11-03 17:13:10 +0000
commitd239db13197fc07b56843c5d0594556ecaa03c82 (patch)
tree0b9787b3dfec5f71cc40af0060a9e2cb5c7d01e6
parent1ce6443849ac8086ac7afb10ee53ff3f91a07104 (diff)
downloadrabbitmq-server-git-d239db13197fc07b56843c5d0594556ecaa03c82.tar.gz
bug 21385: check exclusivity for queue.bind as well. Because I need
to do the check inside the transaction, I had to change rabbit_exchange:add_binding/4 (and delete_binding/4) to take another parameter for the check. I also made the exclusivity check more obviously run for its side-effect.
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_channel.erl25
-rw-r--r--src/rabbit_exchange.erl24
3 files changed, 29 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 52c5475424..b958f306ee 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -191,7 +191,7 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7403095e82..baa3975221 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -241,7 +241,7 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
-exclusive_access_or_locked(ReaderPid, Q) ->
+check_queue_exclusivity(ReaderPid, Q) ->
case Q of
#amqqueue{ exclusive_owner = none} -> Q;
#amqqueue{ exclusive_owner = ReaderPid } -> Q;
@@ -422,9 +422,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) ->
+ check_queue_exclusivity(ReaderPid, Q),
rabbit_amqqueue:basic_consume(
- exclusive_access_or_locked(ReaderPid, Q),
- NoAck, ReaderPid, self(), LimiterPid,
+ Q, NoAck, ReaderPid, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -674,7 +674,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
reader_pid = ReaderPid }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- CheckExclusive = fun(Q) -> exclusive_access_or_locked(ReaderPid, Q) end,
+ CheckExclusive = fun(Q) -> check_queue_exclusivity(ReaderPid, Q) end,
Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
return_queue_declare_ok(State, NoWait, Q);
@@ -689,8 +689,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) ->
- rabbit_amqqueue:delete(exclusive_access_or_locked(ReaderPid, Q),
- IfUnused, IfEmpty)
+ check_queue_exclusivity(ReaderPid, Q),
+ rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -709,7 +709,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -717,7 +717,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
@@ -729,7 +729,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
{ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) ->
- exclusive_access_or_locked(ReaderPid, Q),
+ check_queue_exclusivity(ReaderPid, Q),
rabbit_amqqueue:purge(Q)
end),
return_ok(State, NoWait,
@@ -772,7 +772,9 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ ReturnMethod, NoWait,
+ State = #ch{ virtual_host = VHostPath,
+ reader_pid = ReaderPid }) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -783,7 +785,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ CheckExclusive = fun(_X, Q) -> check_queue_exclusivity(ReaderPid, Q) end,
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, CheckExclusive) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 37a1357d41..4b7a9ac5b0 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -37,7 +37,7 @@
-export([recover/0, declare/4, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
publish/2]).
--export([add_binding/4, delete_binding/4, list_bindings/1]).
+-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
@@ -60,6 +60,8 @@
'queue_not_found' |
'exchange_not_found' |
'exchange_and_queue_not_found'}).
+-type(inner_fun() :: fun((exchange(), queue()) -> any())).
+
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
@@ -72,11 +74,11 @@
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(add_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(add_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(delete_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
@@ -364,21 +366,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end
end).
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (_X, Q, B) ->
+ fun (X, Q, B) ->
+ InnerFun(X, Q),
ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:write/3)
end).
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (_X, Q, B) ->
+ fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
+ _ -> InnerFun(X, Q),
+ ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3)
end
end).