summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_exchange.erl48
3 files changed, 36 insertions, 28 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d0fc87e094..c40c4f2722 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -50,6 +50,8 @@
%% The spec field is made up of an {Queue, Binding, Exchange}
-record(reverse_binding, {spec, value = const}).
+-record(binding, {exchange, key, queue}).
+
-record(listener, {node, protocol, host, port}).
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d712f61077..897d063d21 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -138,7 +138,7 @@ declare(Resource = #resource{}, Durable, AutoDelete, Args) ->
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, Resource}) of
+ case mnesia:wread({amqqueue, Resource}) of
[] -> ok = recover_queue(Q),
Q;
[ExistingQ] -> ExistingQ
@@ -171,7 +171,7 @@ default_binding_spec(#resource{virtual_host = VHostPath, name = Name}) ->
% #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>),
% routing_key = Name,
% arguments = []}.
-
+
recover_bindings(Q = #amqqueue{name = QueueName}) ->
exit(recover_bindings).
% ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q),
@@ -179,7 +179,7 @@ recover_bindings(Q = #amqqueue{name = QueueName}) ->
% ok = rabbit_exchange:add_binding(B, Q)
% end, Specs),
% ok.
-
+
modify_bindings(Queue = #resource{}, X = #resource{}, RoutingKey, Arguments,
SpecPresentFun, SpecAbsentFun) ->
exit(modify_bindings).
@@ -203,7 +203,7 @@ modify_bindings(Queue = #resource{}, X = #resource{}, RoutingKey, Arguments,
% [] -> {error, queue_not_found}
% end
% end).
-
+
update_bindings(Q = #amqqueue{}, Spec,
UpdateSpecFun, UpdateExchangeFun) ->
exit(update_bindings).
@@ -252,7 +252,7 @@ with(Name, F, E) ->
end.
with(Name, F) ->
- with(Name, F, fun () -> {error, not_found} end).
+ with(Name, F, fun () -> {error, not_found} end).
with_or_die(Name, F) ->
with(Name, F, fun () -> rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(Name)])
@@ -356,8 +356,8 @@ delete_temp(Q = #amqqueue{name = QueueName}) ->
delete_queue(Q = #amqqueue{}) ->
ok = delete_temp(Q).
-
-on_node_down(Node) ->
+
+on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:fold(
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index b2375f0a68..b7113a6f11 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -32,7 +32,7 @@
list_vhost_exchanges/1, list_exchange_bindings/1,
simple_publish/6, simple_publish/3,
route/2]).
--export([add_binding/2, delete_binding/2]).
+-export([add_binding/1, delete_binding/2]).
-export([delete/2]).
-export([check_type/1, assert_type/2, topic_matches/2]).
@@ -218,29 +218,34 @@ delivery_key_for_type(fanout, Name, _RoutingKey) ->
delivery_key_for_type(_Type, Name, RoutingKey) ->
{Name, RoutingKey}.
-call_with_exchange(Name, Fun) ->
- case mnesia:wread({exchange, Name}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
+% Don't really like this double lookup
+% It seems very clunky
+% Can this get refactored to to avoid the duplication of the lookup/1 function?
+call_with_exchange_and_queue(Exchange, Queue, Fun) ->
+ case mnesia:wread({exchange, Exchange}) of
+ [] -> {error, exchange_not_found};
+ [X] ->
+ case mnesia:wread({amqqueue, Queue}) of
+ [] -> {error, queue_not_found};
+ [Q] ->
+ Fun(X,Q)
+ end
end.
+
make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) ->
exit(make_handler).
%#handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}.
-add_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName,
- % routing_key = RoutingKey},
- ,Q) ->
- exit(add_binding).
- % call_with_exchange(
- % ExchangeName,
- % fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
- % {error, durability_settings_incompatible};
- % true ->
- % internal_add_binding(
- % X, RoutingKey, make_handler(BindingSpec, Q))
- % end
- % end).
+add_binding(#binding{exchange = Exchange, key = Key, queue = Queue}) ->
+ call_with_exchange_and_queue(
+ Exchange, Queue,
+ fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
+ {error, durability_settings_incompatible};
+ true ->
+ internal_add_binding(X, Key, Q)
+ end
+ end).
delete_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName,
% routing_key = RoutingKey},
@@ -275,9 +280,10 @@ handler_qpids(Handlers) ->
%% Must run within a transaction.
internal_add_binding(#exchange{name = ExchangeName, type = Type},
- RoutingKey, Handler) ->
- BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey),
- ok = add_handler_to_binding(BindingKey, Handler).
+ RoutingKey, Queue) ->
+ ok.
+ %BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey),
+ %ok = add_handler_to_binding(BindingKey, Handler).
%% Must run within a transaction.
internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) ->