diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 17 |
3 files changed, 53 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3018582f94..c941f30786 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -124,19 +124,32 @@ recover() -> recover_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + lists:foreach( + fun (RecoveredQ) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> ok; + false -> exit(Q#amqqueue.pid, shutdown) + end + end, + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end), - Queues = lists:map(fun start_queue_process/1, R), - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun store_queue/1, Queues), - ok - end). + node(Pid) == Node])) + end)), + ok. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 19efd9fc22..a57e8076bf 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -103,24 +103,17 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:foldl( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - Acc - end, ok, rabbit_durable_exchange), - mnesia:foldl( - fun (Route, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write), - Acc - end, ok, rabbit_durable_route), - ok - end). + ok = rabbit_misc:table_foreach( + fun(Exchange) -> ok = mnesia:write(rabbit_exchange, + Exchange, write) + end, rabbit_durable_exchange), + ok = rabbit_misc:table_foreach( + fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write) + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5d176f8fac..de7bc010b2 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,6 +46,7 @@ -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). +-export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). @@ -98,6 +99,7 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). @@ -298,6 +300,21 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). +%% For each entry in a table, execute a function in a transaction. +%% This is often far more efficient than wrapping a tx around the lot. +%% +%% We ignore entries that have been modified or removed. +table_foreach(F, TableName) -> + lists:foreach( + fun (E) -> execute_mnesia_transaction( + fun () -> case mnesia:match_object(TableName, E, read) of + [] -> ok; + _ -> F(E) + end + end) + end, dirty_read_all(TableName)), + ok. + dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). |
