summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-09-20 21:07:17 +0100
committerBen Hood <0x6e6562@gmail.com>2008-09-20 21:07:17 +0100
commit468a3335e3934b80792a368567d1deb040e405bd (patch)
treebe3eae2b9e92a5e4c75e8afe55849920fc14f143
parentf998dffa1cb86355d2439fc7da4d6134cc36773f (diff)
downloadrabbitmq-server-git-468a3335e3934b80792a368567d1deb040e405bd.tar.gz
Implemented durable routes
-rw-r--r--src/rabbit_exchange.erl35
-rw-r--r--src/rabbit_mnesia.erl4
2 files changed, 33 insertions, 6 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2eaf1ad99f..bcd5499a6e 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -85,9 +85,22 @@
%%----------------------------------------------------------------------------
recover() ->
+ % These two functions share commonalities - maybe a refactoring target
ok = recover_durable_exchanges(),
+ ok = recover_durable_routes(),
ok.
+recover_durable_routes() ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:foldl(fun (Route, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(Route),
+ ok = mnesia:write(ReverseRoute),
+ Acc
+ end, ok, durable_routes)
+ end).
+
recover_durable_exchanges() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -264,10 +277,10 @@ delete_binding(QueueName, ExchangeName, RoutingKey, _Arguments) ->
add_binding(Binding) ->
call_with_exchange_and_queue(
Binding,
- fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
+ fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
- internal_add_binding(Binding)
+ internal_add_binding(Binding, Q#amqqueue.durable)
end
end).
@@ -275,7 +288,7 @@ add_binding(Binding) ->
delete_binding(Binding) ->
call_with_exchange_and_queue(
Binding,
- fun (X, _Q) -> ok = internal_delete_binding(Binding),
+ fun (X, Q) -> ok = internal_delete_binding(Binding, Q#amqqueue.durable),
maybe_auto_delete(X)
end).
@@ -295,16 +308,26 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, queue_name = Queue
#reverse_binding{exchange_name = Exchange, key = Key, queue_name = Queue}.
%% Must run within a transaction.
-internal_add_binding(Binding) ->
+internal_add_binding(Binding, Durable) ->
+ synchronize_durable_binding(Binding, write, Durable),
[ok, ok] = [mnesia:write(R) || R <- tuple_to_list(route_with_reverse(Binding))],
ok.
%% Must run within a transaction.
-internal_delete_binding(Binding) ->
+internal_delete_binding(Binding, Durable) ->
+ synchronize_durable_binding(Binding, delete, Durable),
[ok,ok] = [mnesia:delete_object(R) || R <- tuple_to_list(route_with_reverse(Binding))],
ok.
-route_with_reverse(Binding) ->
+synchronize_durable_binding(_, _, false) ->
+ ok;
+%% Must run within a transaction.
+synchronize_durable_binding(Binding, Function, true) ->
+ ok = mnesia:Function(durable_routes, #route{binding = Binding}, write).
+
+route_with_reverse(#route{binding = Binding}) ->
+ route_with_reverse(Binding);
+route_with_reverse(Binding = #binding{}) ->
Route = #route{ binding = Binding },
ReverseRoute = #reverse_route{ reverse_binding = reverse_binding(Binding) },
{Route, ReverseRoute}.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 3e031c3b31..958c380b2c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -105,6 +105,10 @@ table_definitions() ->
{rabbit_config, [{disc_copies, [node()]}]},
{listener, [{type, bag},
{attributes, record_info(fields, listener)}]},
+ {durable_routes, [{disc_copies, [node()]},
+ {type,ordered_set},
+ {record_name, route},
+ {attributes, record_info(fields, route)}]},
{route, [{type,ordered_set},{attributes, record_info(fields, route)}]},
{reverse_route, [{type,ordered_set},{attributes, record_info(fields, reverse_route)}]},
{durable_exchanges, [{disc_copies, [node()]},