summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-07-31 00:44:02 +0100
committerBen Hood <0x6e6562@gmail.com>2008-07-31 00:44:02 +0100
commit0a1d35c5306efbf4e27b298f1a6299b94a8be207 (patch)
tree2bcc82a35873f13e9be2168667e3585dc304965d /src
parentfbab5e9e4950211cbe30365a303687b2935a3eb6 (diff)
downloadrabbitmq-server-git-0a1d35c5306efbf4e27b298f1a6299b94a8be207.tar.gz
Savepoint for changing the routing structure
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl116
-rw-r--r--src/rabbit_exchange.erl231
-rw-r--r--src/rabbit_mnesia.erl3
3 files changed, 185 insertions, 165 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c3fac99864..d712f61077 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -84,7 +84,7 @@
-spec(commit/2 :: (pid(), txn()) -> 'ok').
-spec(rollback/2 :: (pid(), txn()) -> 'ok').
-spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok').
--spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
+%-spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
@@ -135,7 +135,6 @@ declare(Resource = #resource{}, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
- binding_specs = [],
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -168,48 +167,52 @@ recover_queue(Q) ->
ok.
default_binding_spec(#resource{virtual_host = VHostPath, name = Name}) ->
- #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>),
- routing_key = Name,
- arguments = []}.
-
-recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) ->
- ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q),
- lists:foreach(fun (B) ->
- ok = rabbit_exchange:add_binding(B, Q)
- end, Specs),
- ok.
-
+ exit(default_binding_spec).
+ % #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>),
+ % routing_key = Name,
+ % arguments = []}.
+
+recover_bindings(Q = #amqqueue{name = QueueName}) ->
+ exit(recover_bindings).
+ % ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q),
+ % lists:foreach(fun (B) ->
+ % ok = rabbit_exchange:add_binding(B, Q)
+ % end, Specs),
+ % ok.
+
modify_bindings(Queue = #resource{}, X = #resource{}, RoutingKey, Arguments,
SpecPresentFun, SpecAbsentFun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({amqqueue, Queue}) of
- [Q = #amqqueue{binding_specs = Specs0}] ->
- Spec = #binding_spec{exchange_name = X,
- routing_key = RoutingKey,
- arguments = Arguments},
- case (case lists:member(Spec, Specs0) of
- true -> SpecPresentFun;
- false -> SpecAbsentFun
- end)(Q, Spec) of
- {ok, #amqqueue{binding_specs = Specs}} ->
- {ok, length(Specs)};
- {error, not_found} ->
- {error, exchange_not_found};
- Other -> Other
- end;
- [] -> {error, queue_not_found}
- end
- end).
-
-update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec,
+ exit(modify_bindings).
+ % rabbit_misc:execute_mnesia_transaction(
+ % fun () ->
+ % case mnesia:wread({amqqueue, Queue}) of
+ % [Q = #amqqueue{binding_specs = Specs0}] ->
+ % Spec = #binding_spec{exchange_name = X,
+ % routing_key = RoutingKey,
+ % arguments = Arguments},
+ % case (case lists:member(Spec, Specs0) of
+ % true -> SpecPresentFun;
+ % false -> SpecAbsentFun
+ % end)(Q, Spec) of
+ % {ok, #amqqueue{binding_specs = Specs}} ->
+ % {ok, length(Specs)};
+ % {error, not_found} ->
+ % {error, exchange_not_found};
+ % Other -> Other
+ % end;
+ % [] -> {error, queue_not_found}
+ % end
+ % end).
+
+update_bindings(Q = #amqqueue{}, Spec,
UpdateSpecFun, UpdateExchangeFun) ->
- Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)},
- case UpdateExchangeFun(Spec, Q1) of
- ok -> store_queue(Q1),
- {ok, Q1};
- Other -> Other
- end.
+ exit(update_bindings).
+ % Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)},
+ % case UpdateExchangeFun(Spec, Q1) of
+ % ok -> store_queue(Q1),
+ % {ok, Q1};
+ % Other -> Other
+ % end.
add_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
modify_bindings(
@@ -297,15 +300,16 @@ notify_down(#amqqueue{ pid = QPid }, ChPid) ->
gen_server:call(QPid, {notify_down, ChPid}).
binding_forcibly_removed(BindingSpec, QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
- [] -> ok;
- [Q = #amqqueue{binding_specs = Specs}] ->
- store_queue(Q#amqqueue{binding_specs =
- lists:delete(BindingSpec, Specs)})
- end
- end).
+ exit(binding_forcibly_removed).
+ % rabbit_misc:execute_mnesia_transaction(
+ % fun () ->
+ % case mnesia:wread({amqqueue, QueueName}) of
+ % [] -> ok;
+ % [Q = #amqqueue{binding_specs = Specs}] ->
+ % store_queue(Q#amqqueue{binding_specs =
+ % lists:delete(BindingSpec, Specs)})
+ % end
+ % end).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
gen_server:call(QPid, {claim_queue, ReaderPid}).
@@ -324,11 +328,12 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server:cast(QPid, {notify_sent, ChPid}).
-delete_bindings(Q = #amqqueue{binding_specs = Specs}) ->
- lists:foreach(fun (BindingSpec) ->
- ok = rabbit_exchange:delete_binding(
- BindingSpec, Q)
- end, Specs).
+delete_bindings(Q = #amqqueue{}) ->
+ exit(delete_bindings).
+ % lists:foreach(fun (BindingSpec) ->
+ % ok = rabbit_exchange:delete_binding(
+ % BindingSpec, Q)
+ % end, Specs).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
@@ -368,5 +373,4 @@ pseudo_queue(NameBin, Pid) ->
durable = false,
auto_delete = false,
arguments = [],
- binding_specs = [],
pid = Pid}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 17d9fd97ad..b2375f0a68 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -64,11 +64,11 @@
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
--spec(add_binding/2 :: (binding_spec(), amqqueue()) ->
- 'ok' | not_found() |
- {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/2 :: (binding_spec(), amqqueue()) ->
- 'ok' | not_found()).
+% -spec(add_binding/2 :: (binding_spec(), amqqueue()) ->
+% 'ok' | not_found() |
+% {'error', 'durability_settings_incompatible'}).
+% -spec(delete_binding/2 :: (binding_spec(), amqqueue()) ->
+% 'ok' | not_found()).
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
@@ -144,15 +144,17 @@ list_vhost_exchanges(VHostPath) ->
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
list_exchange_bindings(Name) ->
- [{QueueName, RoutingKey, Arguments} ||
- #binding{handlers = Handlers} <- bindings_for_exchange(Name),
- #handler{binding_spec = #binding_spec{routing_key = RoutingKey,
- arguments = Arguments},
- queue = QueueName} <- Handlers].
-
+ exit(list_exchange_bindings).
+ % [{QueueName, RoutingKey, Arguments} ||
+ % #binding{handlers = Handlers} <- bindings_for_exchange(Name),
+ % #handler{binding_spec = #binding_spec{routing_key = RoutingKey,
+ % arguments = Arguments},
+ % queue = QueueName} <- Handlers].
+
bindings_for_exchange(Name) ->
- qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding),
- element(1, K) == Name])).
+ exit(bindings_for_exchange).
+ % qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding),
+ % element(1, K) == Name])).
empty_handlers() ->
[].
@@ -189,25 +191,27 @@ simple_publish(Mandatory, Immediate,
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
route(#exchange{name = Name, type = topic}, RoutingKey) ->
- sets:to_list(
- sets:union(
- mnesia:activity(
- async_dirty,
- fun () ->
- qlc:e(qlc:q([handler_qpids(H) ||
- #binding{key = {Name1, PatternKey},
- handlers = H}
- <- mnesia:table(binding),
- Name == Name1,
- topic_matches(PatternKey, RoutingKey)]))
- end)));
+ exit(route);
+ % sets:to_list(
+ % sets:union(
+ % mnesia:activity(
+ % async_dirty,
+ % fun () ->
+ % qlc:e(qlc:q([handler_qpids(H) ||
+ % #binding{key = {Name1, PatternKey},
+ % handlers = H}
+ % <- mnesia:table(binding),
+ % Name == Name1,
+ % topic_matches(PatternKey, RoutingKey)]))
+ % end)));
route(#exchange{name = Name, type = Type}, RoutingKey) ->
- BindingKey = delivery_key_for_type(Type, Name, RoutingKey),
- case rabbit_misc:dirty_read({binding, BindingKey}) of
- {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H));
- {error, not_found} -> []
- end.
+ exit(route).
+ % BindingKey = delivery_key_for_type(Type, Name, RoutingKey),
+ % case rabbit_misc:dirty_read({binding, BindingKey}) of
+ % {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H));
+ % {error, not_found} -> []
+ % end.
delivery_key_for_type(fanout, Name, _RoutingKey) ->
{Name, fanout};
@@ -221,28 +225,33 @@ call_with_exchange(Name, Fun) ->
end.
make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) ->
- #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}.
-
-add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
- routing_key = RoutingKey}, Q) ->
- call_with_exchange(
- ExchangeName,
- fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true ->
- internal_add_binding(
- X, RoutingKey, make_handler(BindingSpec, Q))
- end
- end).
-
-delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
- routing_key = RoutingKey}, Q) ->
- call_with_exchange(
- ExchangeName,
- fun (X) -> ok = internal_delete_binding(
- X, RoutingKey, make_handler(BindingSpec, Q)),
- maybe_auto_delete(X)
- end).
+ exit(make_handler).
+ %#handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}.
+
+add_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName,
+ % routing_key = RoutingKey},
+ ,Q) ->
+ exit(add_binding).
+ % call_with_exchange(
+ % ExchangeName,
+ % fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
+ % {error, durability_settings_incompatible};
+ % true ->
+ % internal_add_binding(
+ % X, RoutingKey, make_handler(BindingSpec, Q))
+ % end
+ % end).
+
+delete_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName,
+ % routing_key = RoutingKey},
+ ,Q) ->
+ exit(delete_binding).
+ % call_with_exchange(
+ % ExchangeName,
+ % fun (X) -> ok = internal_delete_binding(
+ % X, RoutingKey, make_handler(BindingSpec, Q)),
+ % maybe_auto_delete(X)
+ % end).
%% Must run within a transaction.
maybe_auto_delete(#exchange{auto_delete = false}) ->
@@ -261,7 +270,8 @@ extend_handlers(Handlers, Handler) -> [Handler | Handlers].
delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers).
handler_qpids(Handlers) ->
- sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]).
+ exit(handler_qpids).
+ %sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]).
%% Must run within a transaction.
internal_add_binding(#exchange{name = ExchangeName, type = Type},
@@ -277,32 +287,34 @@ internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey,
%% Must run within a transaction.
add_handler_to_binding(BindingKey, Handler) ->
- ok = case mnesia:wread({binding, BindingKey}) of
- [] ->
- ok = mnesia:write(
- #binding{key = BindingKey,
- handlers = extend_handlers(
- empty_handlers(), Handler)});
- [B = #binding{handlers = H}] ->
- ok = mnesia:write(
- B#binding{handlers = extend_handlers(H, Handler)})
- end.
+ exit(add_handler_to_binding).
+ % ok = case mnesia:wread({binding, BindingKey}) of
+ % [] ->
+ % ok = mnesia:write(
+ % #binding{key = BindingKey,
+ % handlers = extend_handlers(
+ % empty_handlers(), Handler)});
+ % [B = #binding{handlers = H}] ->
+ % ok = mnesia:write(
+ % B#binding{handlers = extend_handlers(H, Handler)})
+ % end.
%% Must run within a transaction.
remove_handler_from_binding(BindingKey, Handler) ->
- case mnesia:wread({binding, BindingKey}) of
- [] -> empty;
- [B = #binding{handlers = H}] ->
- H1 = delete_handler(H, Handler),
- case handlers_isempty(H1) of
- true ->
- ok = mnesia:delete({binding, BindingKey}),
- empty;
- _ ->
- ok = mnesia:write(B#binding{handlers = H1}),
- not_empty
- end
- end.
+ exit(remove_handler_from_binding).
+ % case mnesia:wread({binding, BindingKey}) of
+ % [] -> empty;
+ % [B = #binding{handlers = H}] ->
+ % H1 = delete_handler(H, Handler),
+ % case handlers_isempty(H1) of
+ % true ->
+ % ok = mnesia:delete({binding, BindingKey}),
+ % empty;
+ % _ ->
+ % ok,% = mnesia:write(B#binding{handlers = H1}),
+ % not_empty
+ % end
+ % end.
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
@@ -336,41 +348,44 @@ delete(ExchangeName, IfUnused) ->
fun () -> internal_delete(ExchangeName, IfUnused) end).
internal_delete(ExchangeName, _IfUnused = true) ->
- Bindings = bindings_for_exchange(ExchangeName),
- case Bindings of
- [] -> do_internal_delete(ExchangeName, Bindings);
- _ ->
- case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end,
- Bindings) of
- true ->
- %% There are no handlers anywhere in any of the
- %% bindings for this exchange.
- do_internal_delete(ExchangeName, Bindings);
- false ->
- %% There was at least one real handler
- %% present. It's still in use.
- {error, in_use}
- end
- end;
+ exit(internal_delete);
+ % Bindings = bindings_for_exchange(ExchangeName),
+ % case Bindings of
+ % [] -> do_internal_delete(ExchangeName, Bindings);
+ % _ ->
+ % case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end,
+ % Bindings) of
+ % true ->
+ % %% There are no handlers anywhere in any of the
+ % %% bindings for this exchange.
+ % do_internal_delete(ExchangeName, Bindings);
+ % false ->
+ % %% There was at least one real handler
+ % %% present. It's still in use.
+ % {error, in_use}
+ % end
+ % end;
internal_delete(ExchangeName, false) ->
do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)).
forcibly_remove_handlers(Handlers) ->
- lists:foreach(
- fun (#handler{binding_spec = BindingSpec, queue = QueueName}) ->
- ok = rabbit_amqqueue:binding_forcibly_removed(
- BindingSpec, QueueName)
- end, Handlers),
- ok.
+ exit(forcibly_remove_handlers).
+ % lists:foreach(
+ % fun (#handler{binding_spec = BindingSpec, queue = QueueName}) ->
+ % ok = rabbit_amqqueue:binding_forcibly_removed(
+ % BindingSpec, QueueName)
+ % end, Handlers),
+ % ok.
do_internal_delete(ExchangeName, Bindings) ->
- case mnesia:wread({exchange, ExchangeName}) of
- [] -> {error, not_found};
- _ ->
- lists:foreach(fun (#binding{key = K, handlers = H}) ->
- ok = forcibly_remove_handlers(H),
- ok = mnesia:delete({binding, K})
- end, Bindings),
- ok = mnesia:delete({durable_exchanges, ExchangeName}),
- ok = mnesia:delete({exchange, ExchangeName})
- end.
+ exit(do_internal_delete).
+ % case mnesia:wread({exchange, ExchangeName}) of
+ % [] -> {error, not_found};
+ % _ ->
+ % lists:foreach(fun (#binding{key = K, handlers = H}) ->
+ % ok = forcibly_remove_handlers(H),
+ % ok = mnesia:delete({binding, K})
+ % end, Bindings),
+ % ok = mnesia:delete({durable_exchanges, ExchangeName}),
+ % ok = mnesia:delete({exchange, ExchangeName})
+ % end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index b1ab3da241..0420a416d4 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -105,7 +105,8 @@ table_definitions() ->
{rabbit_config, [{disc_copies, [node()]}]},
{listener, [{type, bag},
{attributes, record_info(fields, listener)}]},
- {binding, [{attributes, record_info(fields, binding)}]},
+ {forwards_binding, [{type,ordered_set},{attributes, record_info(fields, forwards_binding)}]},
+ {reverse_binding, [{type,ordered_set},{attributes, record_info(fields, reverse_binding)}]},
{durable_exchanges, [{disc_copies, [node()]},
{record_name, exchange},
{attributes, record_info(fields, exchange)}]},