summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-03-19 15:43:56 +0000
committerMatthias Radestock <matthias@lshift.net>2009-03-19 15:43:56 +0000
commit5981a69b23d1b15840d391cd12aafd6c5bc7503f (patch)
tree8ba8cd1b4f7785c3b6ca639a2883cd2c2356a015
parentf8fa3edf69227d62ab0fd4978affadebaeb89dae (diff)
downloadrabbitmq-server-git-5981a69b23d1b15840d391cd12aafd6c5bc7503f.tar.gz
recover exchanges/bindings/queues in per-item transactions
Because recovering them in large, single transactions is incredibly slow, with complexity that is far worse than linear in the number of entries we recover, presumably due to the way mnesia represents transaction-local storage.
-rw-r--r--src/rabbit_amqqueue.erl39
-rw-r--r--src/rabbit_exchange.erl25
-rw-r--r--src/rabbit_misc.erl17
3 files changed, 52 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b9abb2990..69c97dfe8e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -122,19 +122,32 @@ recover() ->
recover_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
- node(Pid) == Node]))
- end),
- Queues = lists:map(fun start_queue_process/1, R),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun store_queue/1, Queues),
- ok
- end).
+ lists:foreach(
+ fun (RecoveredQ) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ durable_queues, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> ok;
+ false -> exit(Q#amqqueue.pid, shutdown)
+ end
+ end,
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(durable_queues),
+ node(Pid) == Node]))
+ end)),
+ ok.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 925c335cee..e72669accb 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -102,22 +102,15 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:foldl(
- fun (Exchange, Acc) ->
- ok = mnesia:write(Exchange),
- Acc
- end, ok, durable_exchanges),
- mnesia:foldl(
- fun (Route, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(Route),
- ok = mnesia:write(ReverseRoute),
- Acc
- end, ok, durable_routes),
- ok
- end).
+ ok = rabbit_misc:table_foreach(
+ fun(Exchange) -> ok = mnesia:write(Exchange) end,
+ durable_exchanges),
+ ok = rabbit_misc:table_foreach(
+ fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(Route),
+ ok = mnesia:write(ReverseRoute)
+ end, durable_routes),
+ ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 053bde54cc..5730fdc003 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,6 +46,7 @@
-export([ensure_ok/2]).
-export([localnode/1, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
+-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
@@ -97,6 +98,7 @@
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
@@ -295,6 +297,21 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
+%% For each entry in a table, execute a function in a transaction.
+%% This is often far more efficient than wrapping a tx around the lot.
+%%
+%% We ignore entries that have been modified or removed.
+table_foreach(F, TableName) ->
+ lists:foreach(
+ fun (E) -> execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(TableName, E, read) of
+ [] -> ok;
+ _ -> F(E)
+ end
+ end)
+ end, dirty_read_all(TableName)),
+ ok.
+
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).