diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 16 |
3 files changed, 42 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 06bb18f5d3..eb076e94d6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -305,28 +305,29 @@ internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [Q] -> - ok = delete_queue(Q), + [] -> {error, not_found}; + [_] -> + ok = rabbit_exchange:delete_queue_bindings(QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), ok = mnesia:delete({rabbit_durable_queue, QueueName}), ok end end). -delete_queue(#amqqueue{name = QueueName}) -> - ok = rabbit_exchange:delete_bindings_for_queue(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - ok. - on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:fold( - fun (Q, Acc) -> ok = delete_queue(Q), Acc end, + fun (QueueName, Acc) -> + ok = rabbit_exchange:delete_transient_queue_bindings( + QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), + Acc + end, ok, - qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) + qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) end). pseudo_queue(QueueName, Pid) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a57e8076bf..40fdcf4ec4 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -40,7 +40,7 @@ route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). --export([delete_bindings_for_queue/1]). +-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API @@ -86,7 +86,8 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). +-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> @@ -293,7 +294,7 @@ lookup_qpids(Queues) -> %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? -delete_bindings_for_exchange(ExchangeName) -> +delete_exchange_bindings(ExchangeName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -305,10 +306,16 @@ delete_bindings_for_exchange(ExchangeName) -> write)], ok. -delete_bindings_for_queue(QueueName) -> +delete_queue_bindings(QueueName) -> + delete_queue_bindings(QueueName, fun delete_forward_routes/1). + +delete_transient_queue_bindings(QueueName) -> + delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). + +delete_queue_bindings(QueueName, FwdDeleteFun) -> Exchanges = exchanges_for_queue(QueueName), [begin - ok = delete_forward_routes(reverse_route(Route)), + ok = FwdDeleteFun(reverse_route(Route)), ok = mnesia:delete_object(rabbit_reverse_route, Route, write) end || Route <- mnesia:match_object( rabbit_reverse_route, @@ -326,6 +333,9 @@ delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). +delete_transient_forward_routes(Route) -> + ok = mnesia:delete_object(rabbit_route, Route, write). + exchanges_for_queue(QueueName) -> MatchHead = reverse_route( #route{binding = #binding{exchange_name = '$1', @@ -558,7 +568,7 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> end. unconditional_delete(#exchange{name = ExchangeName}) -> - ok = delete_bindings_for_exchange(ExchangeName), + ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index de7bc010b2..af3454f206 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -375,9 +375,19 @@ ensure_parent_dirs_exist(Filename) -> end. format_stderr(Fmt, Args) -> - Port = open_port({fd, 0, 2}, [out]), - port_command(Port, io_lib:format(Fmt, Args)), - port_close(Port). + case os:type() of + {unix, _} -> + Port = open_port({fd, 0, 2}, [out]), + port_command(Port, io_lib:format(Fmt, Args)), + port_close(Port); + {win32, _} -> + %% stderr on Windows is buffered and I can't figure out a + %% way to trigger a fflush(stderr) in Erlang. So rather + %% than risk losing output we write to stdout instead, + %% which appears to be unbuffered. + io:format(Fmt, Args) + end, + ok. manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> Iterate(fun (App, Acc) -> |
