diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-07-31 00:44:02 +0100 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-07-31 00:44:02 +0100 |
| commit | 0a1d35c5306efbf4e27b298f1a6299b94a8be207 (patch) | |
| tree | 2bcc82a35873f13e9be2168667e3585dc304965d /src | |
| parent | fbab5e9e4950211cbe30365a303687b2935a3eb6 (diff) | |
| download | rabbitmq-server-git-0a1d35c5306efbf4e27b298f1a6299b94a8be207.tar.gz | |
Savepoint for changing the routing structure
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 116 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 231 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 3 |
3 files changed, 185 insertions, 165 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c3fac99864..d712f61077 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -84,7 +84,7 @@ -spec(commit/2 :: (pid(), txn()) -> 'ok'). -spec(rollback/2 :: (pid(), txn()) -> 'ok'). -spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok'). --spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). +%-spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -135,7 +135,6 @@ declare(Resource = #resource{}, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args, - binding_specs = [], pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> @@ -168,48 +167,52 @@ recover_queue(Q) -> ok. default_binding_spec(#resource{virtual_host = VHostPath, name = Name}) -> - #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>), - routing_key = Name, - arguments = []}. - -recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) -> - ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), - lists:foreach(fun (B) -> - ok = rabbit_exchange:add_binding(B, Q) - end, Specs), - ok. - + exit(default_binding_spec). + % #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>), + % routing_key = Name, + % arguments = []}. + +recover_bindings(Q = #amqqueue{name = QueueName}) -> + exit(recover_bindings). + % ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), + % lists:foreach(fun (B) -> + % ok = rabbit_exchange:add_binding(B, Q) + % end, Specs), + % ok. + modify_bindings(Queue = #resource{}, X = #resource{}, RoutingKey, Arguments, SpecPresentFun, SpecAbsentFun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, Queue}) of - [Q = #amqqueue{binding_specs = Specs0}] -> - Spec = #binding_spec{exchange_name = X, - routing_key = RoutingKey, - arguments = Arguments}, - case (case lists:member(Spec, Specs0) of - true -> SpecPresentFun; - false -> SpecAbsentFun - end)(Q, Spec) of - {ok, #amqqueue{binding_specs = Specs}} -> - {ok, length(Specs)}; - {error, not_found} -> - {error, exchange_not_found}; - Other -> Other - end; - [] -> {error, queue_not_found} - end - end). - -update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec, + exit(modify_bindings). + % rabbit_misc:execute_mnesia_transaction( + % fun () -> + % case mnesia:wread({amqqueue, Queue}) of + % [Q = #amqqueue{binding_specs = Specs0}] -> + % Spec = #binding_spec{exchange_name = X, + % routing_key = RoutingKey, + % arguments = Arguments}, + % case (case lists:member(Spec, Specs0) of + % true -> SpecPresentFun; + % false -> SpecAbsentFun + % end)(Q, Spec) of + % {ok, #amqqueue{binding_specs = Specs}} -> + % {ok, length(Specs)}; + % {error, not_found} -> + % {error, exchange_not_found}; + % Other -> Other + % end; + % [] -> {error, queue_not_found} + % end + % end). + +update_bindings(Q = #amqqueue{}, Spec, UpdateSpecFun, UpdateExchangeFun) -> - Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)}, - case UpdateExchangeFun(Spec, Q1) of - ok -> store_queue(Q1), - {ok, Q1}; - Other -> Other - end. + exit(update_bindings). + % Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)}, + % case UpdateExchangeFun(Spec, Q1) of + % ok -> store_queue(Q1), + % {ok, Q1}; + % Other -> Other + % end. add_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> modify_bindings( @@ -297,15 +300,16 @@ notify_down(#amqqueue{ pid = QPid }, ChPid) -> gen_server:call(QPid, {notify_down, ChPid}). binding_forcibly_removed(BindingSpec, QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, QueueName}) of - [] -> ok; - [Q = #amqqueue{binding_specs = Specs}] -> - store_queue(Q#amqqueue{binding_specs = - lists:delete(BindingSpec, Specs)}) - end - end). + exit(binding_forcibly_removed). + % rabbit_misc:execute_mnesia_transaction( + % fun () -> + % case mnesia:wread({amqqueue, QueueName}) of + % [] -> ok; + % [Q = #amqqueue{binding_specs = Specs}] -> + % store_queue(Q#amqqueue{binding_specs = + % lists:delete(BindingSpec, Specs)}) + % end + % end). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server:call(QPid, {claim_queue, ReaderPid}). @@ -324,11 +328,12 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server:cast(QPid, {notify_sent, ChPid}). -delete_bindings(Q = #amqqueue{binding_specs = Specs}) -> - lists:foreach(fun (BindingSpec) -> - ok = rabbit_exchange:delete_binding( - BindingSpec, Q) - end, Specs). +delete_bindings(Q = #amqqueue{}) -> + exit(delete_bindings). + % lists:foreach(fun (BindingSpec) -> + % ok = rabbit_exchange:delete_binding( + % BindingSpec, Q) + % end, Specs). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( @@ -368,5 +373,4 @@ pseudo_queue(NameBin, Pid) -> durable = false, auto_delete = false, arguments = [], - binding_specs = [], pid = Pid}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 17d9fd97ad..b2375f0a68 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -64,11 +64,11 @@ publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). -spec(route/2 :: (exchange(), routing_key()) -> [pid()]). --spec(add_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found() | - {'error', 'durability_settings_incompatible'}). --spec(delete_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found()). +% -spec(add_binding/2 :: (binding_spec(), amqqueue()) -> +% 'ok' | not_found() | +% {'error', 'durability_settings_incompatible'}). +% -spec(delete_binding/2 :: (binding_spec(), amqqueue()) -> +% 'ok' | not_found()). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). @@ -144,15 +144,17 @@ list_vhost_exchanges(VHostPath) -> #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). list_exchange_bindings(Name) -> - [{QueueName, RoutingKey, Arguments} || - #binding{handlers = Handlers} <- bindings_for_exchange(Name), - #handler{binding_spec = #binding_spec{routing_key = RoutingKey, - arguments = Arguments}, - queue = QueueName} <- Handlers]. - + exit(list_exchange_bindings). + % [{QueueName, RoutingKey, Arguments} || + % #binding{handlers = Handlers} <- bindings_for_exchange(Name), + % #handler{binding_spec = #binding_spec{routing_key = RoutingKey, + % arguments = Arguments}, + % queue = QueueName} <- Handlers]. + bindings_for_exchange(Name) -> - qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding), - element(1, K) == Name])). + exit(bindings_for_exchange). + % qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding), + % element(1, K) == Name])). empty_handlers() -> []. @@ -189,25 +191,27 @@ simple_publish(Mandatory, Immediate, %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. route(#exchange{name = Name, type = topic}, RoutingKey) -> - sets:to_list( - sets:union( - mnesia:activity( - async_dirty, - fun () -> - qlc:e(qlc:q([handler_qpids(H) || - #binding{key = {Name1, PatternKey}, - handlers = H} - <- mnesia:table(binding), - Name == Name1, - topic_matches(PatternKey, RoutingKey)])) - end))); + exit(route); + % sets:to_list( + % sets:union( + % mnesia:activity( + % async_dirty, + % fun () -> + % qlc:e(qlc:q([handler_qpids(H) || + % #binding{key = {Name1, PatternKey}, + % handlers = H} + % <- mnesia:table(binding), + % Name == Name1, + % topic_matches(PatternKey, RoutingKey)])) + % end))); route(#exchange{name = Name, type = Type}, RoutingKey) -> - BindingKey = delivery_key_for_type(Type, Name, RoutingKey), - case rabbit_misc:dirty_read({binding, BindingKey}) of - {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H)); - {error, not_found} -> [] - end. + exit(route). + % BindingKey = delivery_key_for_type(Type, Name, RoutingKey), + % case rabbit_misc:dirty_read({binding, BindingKey}) of + % {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H)); + % {error, not_found} -> [] + % end. delivery_key_for_type(fanout, Name, _RoutingKey) -> {Name, fanout}; @@ -221,28 +225,33 @@ call_with_exchange(Name, Fun) -> end. make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) -> - #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. - -add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> - call_with_exchange( - ExchangeName, - fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - internal_add_binding( - X, RoutingKey, make_handler(BindingSpec, Q)) - end - end). - -delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> - call_with_exchange( - ExchangeName, - fun (X) -> ok = internal_delete_binding( - X, RoutingKey, make_handler(BindingSpec, Q)), - maybe_auto_delete(X) - end). + exit(make_handler). + %#handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. + +add_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName, + % routing_key = RoutingKey}, + ,Q) -> + exit(add_binding). + % call_with_exchange( + % ExchangeName, + % fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) -> + % {error, durability_settings_incompatible}; + % true -> + % internal_add_binding( + % X, RoutingKey, make_handler(BindingSpec, Q)) + % end + % end). + +delete_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName, + % routing_key = RoutingKey}, + ,Q) -> + exit(delete_binding). + % call_with_exchange( + % ExchangeName, + % fun (X) -> ok = internal_delete_binding( + % X, RoutingKey, make_handler(BindingSpec, Q)), + % maybe_auto_delete(X) + % end). %% Must run within a transaction. maybe_auto_delete(#exchange{auto_delete = false}) -> @@ -261,7 +270,8 @@ extend_handlers(Handlers, Handler) -> [Handler | Handlers]. delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers). handler_qpids(Handlers) -> - sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]). + exit(handler_qpids). + %sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]). %% Must run within a transaction. internal_add_binding(#exchange{name = ExchangeName, type = Type}, @@ -277,32 +287,34 @@ internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, %% Must run within a transaction. add_handler_to_binding(BindingKey, Handler) -> - ok = case mnesia:wread({binding, BindingKey}) of - [] -> - ok = mnesia:write( - #binding{key = BindingKey, - handlers = extend_handlers( - empty_handlers(), Handler)}); - [B = #binding{handlers = H}] -> - ok = mnesia:write( - B#binding{handlers = extend_handlers(H, Handler)}) - end. + exit(add_handler_to_binding). + % ok = case mnesia:wread({binding, BindingKey}) of + % [] -> + % ok = mnesia:write( + % #binding{key = BindingKey, + % handlers = extend_handlers( + % empty_handlers(), Handler)}); + % [B = #binding{handlers = H}] -> + % ok = mnesia:write( + % B#binding{handlers = extend_handlers(H, Handler)}) + % end. %% Must run within a transaction. remove_handler_from_binding(BindingKey, Handler) -> - case mnesia:wread({binding, BindingKey}) of - [] -> empty; - [B = #binding{handlers = H}] -> - H1 = delete_handler(H, Handler), - case handlers_isempty(H1) of - true -> - ok = mnesia:delete({binding, BindingKey}), - empty; - _ -> - ok = mnesia:write(B#binding{handlers = H1}), - not_empty - end - end. + exit(remove_handler_from_binding). + % case mnesia:wread({binding, BindingKey}) of + % [] -> empty; + % [B = #binding{handlers = H}] -> + % H1 = delete_handler(H, Handler), + % case handlers_isempty(H1) of + % true -> + % ok = mnesia:delete({binding, BindingKey}), + % empty; + % _ -> + % ok,% = mnesia:write(B#binding{handlers = H1}), + % not_empty + % end + % end. split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), @@ -336,41 +348,44 @@ delete(ExchangeName, IfUnused) -> fun () -> internal_delete(ExchangeName, IfUnused) end). internal_delete(ExchangeName, _IfUnused = true) -> - Bindings = bindings_for_exchange(ExchangeName), - case Bindings of - [] -> do_internal_delete(ExchangeName, Bindings); - _ -> - case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end, - Bindings) of - true -> - %% There are no handlers anywhere in any of the - %% bindings for this exchange. - do_internal_delete(ExchangeName, Bindings); - false -> - %% There was at least one real handler - %% present. It's still in use. - {error, in_use} - end - end; + exit(internal_delete); + % Bindings = bindings_for_exchange(ExchangeName), + % case Bindings of + % [] -> do_internal_delete(ExchangeName, Bindings); + % _ -> + % case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end, + % Bindings) of + % true -> + % %% There are no handlers anywhere in any of the + % %% bindings for this exchange. + % do_internal_delete(ExchangeName, Bindings); + % false -> + % %% There was at least one real handler + % %% present. It's still in use. + % {error, in_use} + % end + % end; internal_delete(ExchangeName, false) -> do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)). forcibly_remove_handlers(Handlers) -> - lists:foreach( - fun (#handler{binding_spec = BindingSpec, queue = QueueName}) -> - ok = rabbit_amqqueue:binding_forcibly_removed( - BindingSpec, QueueName) - end, Handlers), - ok. + exit(forcibly_remove_handlers). + % lists:foreach( + % fun (#handler{binding_spec = BindingSpec, queue = QueueName}) -> + % ok = rabbit_amqqueue:binding_forcibly_removed( + % BindingSpec, QueueName) + % end, Handlers), + % ok. do_internal_delete(ExchangeName, Bindings) -> - case mnesia:wread({exchange, ExchangeName}) of - [] -> {error, not_found}; - _ -> - lists:foreach(fun (#binding{key = K, handlers = H}) -> - ok = forcibly_remove_handlers(H), - ok = mnesia:delete({binding, K}) - end, Bindings), - ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}) - end. + exit(do_internal_delete). + % case mnesia:wread({exchange, ExchangeName}) of + % [] -> {error, not_found}; + % _ -> + % lists:foreach(fun (#binding{key = K, handlers = H}) -> + % ok = forcibly_remove_handlers(H), + % ok = mnesia:delete({binding, K}) + % end, Bindings), + % ok = mnesia:delete({durable_exchanges, ExchangeName}), + % ok = mnesia:delete({exchange, ExchangeName}) + % end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index b1ab3da241..0420a416d4 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -105,7 +105,8 @@ table_definitions() -> {rabbit_config, [{disc_copies, [node()]}]}, {listener, [{type, bag}, {attributes, record_info(fields, listener)}]}, - {binding, [{attributes, record_info(fields, binding)}]}, + {forwards_binding, [{type,ordered_set},{attributes, record_info(fields, forwards_binding)}]}, + {reverse_binding, [{type,ordered_set},{attributes, record_info(fields, reverse_binding)}]}, {durable_exchanges, [{disc_copies, [node()]}, {record_name, exchange}, {attributes, record_info(fields, exchange)}]}, |
