diff options
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 48 |
3 files changed, 36 insertions, 28 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d0fc87e094..c40c4f2722 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -50,6 +50,8 @@ %% The spec field is made up of an {Queue, Binding, Exchange} -record(reverse_binding, {spec, value = const}). +-record(binding, {exchange, key, queue}). + -record(listener, {node, protocol, host, port}). -record(basic_message, {exchange_name, routing_key, content, persistent_key}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d712f61077..897d063d21 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -138,7 +138,7 @@ declare(Resource = #resource{}, Durable, AutoDelete, Args) -> pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, Resource}) of + case mnesia:wread({amqqueue, Resource}) of [] -> ok = recover_queue(Q), Q; [ExistingQ] -> ExistingQ @@ -171,7 +171,7 @@ default_binding_spec(#resource{virtual_host = VHostPath, name = Name}) -> % #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>), % routing_key = Name, % arguments = []}. - + recover_bindings(Q = #amqqueue{name = QueueName}) -> exit(recover_bindings). % ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), @@ -179,7 +179,7 @@ recover_bindings(Q = #amqqueue{name = QueueName}) -> % ok = rabbit_exchange:add_binding(B, Q) % end, Specs), % ok. - + modify_bindings(Queue = #resource{}, X = #resource{}, RoutingKey, Arguments, SpecPresentFun, SpecAbsentFun) -> exit(modify_bindings). @@ -203,7 +203,7 @@ modify_bindings(Queue = #resource{}, X = #resource{}, RoutingKey, Arguments, % [] -> {error, queue_not_found} % end % end). - + update_bindings(Q = #amqqueue{}, Spec, UpdateSpecFun, UpdateExchangeFun) -> exit(update_bindings). @@ -252,7 +252,7 @@ with(Name, F, E) -> end. with(Name, F) -> - with(Name, F, fun () -> {error, not_found} end). + with(Name, F, fun () -> {error, not_found} end). with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(Name)]) @@ -356,8 +356,8 @@ delete_temp(Q = #amqqueue{name = QueueName}) -> delete_queue(Q = #amqqueue{}) -> ok = delete_temp(Q). - -on_node_down(Node) -> + +on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:fold( diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index b2375f0a68..b7113a6f11 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -32,7 +32,7 @@ list_vhost_exchanges/1, list_exchange_bindings/1, simple_publish/6, simple_publish/3, route/2]). --export([add_binding/2, delete_binding/2]). +-export([add_binding/1, delete_binding/2]). -export([delete/2]). -export([check_type/1, assert_type/2, topic_matches/2]). @@ -218,29 +218,34 @@ delivery_key_for_type(fanout, Name, _RoutingKey) -> delivery_key_for_type(_Type, Name, RoutingKey) -> {Name, RoutingKey}. -call_with_exchange(Name, Fun) -> - case mnesia:wread({exchange, Name}) of - [] -> {error, not_found}; - [X] -> Fun(X) +% Don't really like this double lookup +% It seems very clunky +% Can this get refactored to to avoid the duplication of the lookup/1 function? +call_with_exchange_and_queue(Exchange, Queue, Fun) -> + case mnesia:wread({exchange, Exchange}) of + [] -> {error, exchange_not_found}; + [X] -> + case mnesia:wread({amqqueue, Queue}) of + [] -> {error, queue_not_found}; + [Q] -> + Fun(X,Q) + end end. + make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) -> exit(make_handler). %#handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. -add_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName, - % routing_key = RoutingKey}, - ,Q) -> - exit(add_binding). - % call_with_exchange( - % ExchangeName, - % fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) -> - % {error, durability_settings_incompatible}; - % true -> - % internal_add_binding( - % X, RoutingKey, make_handler(BindingSpec, Q)) - % end - % end). +add_binding(#binding{exchange = Exchange, key = Key, queue = Queue}) -> + call_with_exchange_and_queue( + Exchange, Queue, + fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> + internal_add_binding(X, Key, Q) + end + end). delete_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName, % routing_key = RoutingKey}, @@ -275,9 +280,10 @@ handler_qpids(Handlers) -> %% Must run within a transaction. internal_add_binding(#exchange{name = ExchangeName, type = Type}, - RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - ok = add_handler_to_binding(BindingKey, Handler). + RoutingKey, Queue) -> + ok. + %BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), + %ok = add_handler_to_binding(BindingKey, Handler). %% Must run within a transaction. internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) -> |
