diff options
| -rw-r--r-- | src/rabbit_exchange.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 4 |
2 files changed, 33 insertions, 6 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2eaf1ad99f..bcd5499a6e 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -85,9 +85,22 @@ %%---------------------------------------------------------------------------- recover() -> + % These two functions share commonalities - maybe a refactoring target ok = recover_durable_exchanges(), + ok = recover_durable_routes(), ok. +recover_durable_routes() -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:foldl(fun (Route, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(Route), + ok = mnesia:write(ReverseRoute), + Acc + end, ok, durable_routes) + end). + recover_durable_exchanges() -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -264,10 +277,10 @@ delete_binding(QueueName, ExchangeName, RoutingKey, _Arguments) -> add_binding(Binding) -> call_with_exchange_and_queue( Binding, - fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> + fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> - internal_add_binding(Binding) + internal_add_binding(Binding, Q#amqqueue.durable) end end). @@ -275,7 +288,7 @@ add_binding(Binding) -> delete_binding(Binding) -> call_with_exchange_and_queue( Binding, - fun (X, _Q) -> ok = internal_delete_binding(Binding), + fun (X, Q) -> ok = internal_delete_binding(Binding, Q#amqqueue.durable), maybe_auto_delete(X) end). @@ -295,16 +308,26 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, queue_name = Queue #reverse_binding{exchange_name = Exchange, key = Key, queue_name = Queue}. %% Must run within a transaction. -internal_add_binding(Binding) -> +internal_add_binding(Binding, Durable) -> + synchronize_durable_binding(Binding, write, Durable), [ok, ok] = [mnesia:write(R) || R <- tuple_to_list(route_with_reverse(Binding))], ok. %% Must run within a transaction. -internal_delete_binding(Binding) -> +internal_delete_binding(Binding, Durable) -> + synchronize_durable_binding(Binding, delete, Durable), [ok,ok] = [mnesia:delete_object(R) || R <- tuple_to_list(route_with_reverse(Binding))], ok. -route_with_reverse(Binding) -> +synchronize_durable_binding(_, _, false) -> + ok; +%% Must run within a transaction. +synchronize_durable_binding(Binding, Function, true) -> + ok = mnesia:Function(durable_routes, #route{binding = Binding}, write). + +route_with_reverse(#route{binding = Binding}) -> + route_with_reverse(Binding); +route_with_reverse(Binding = #binding{}) -> Route = #route{ binding = Binding }, ReverseRoute = #reverse_route{ reverse_binding = reverse_binding(Binding) }, {Route, ReverseRoute}. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 3e031c3b31..958c380b2c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -105,6 +105,10 @@ table_definitions() -> {rabbit_config, [{disc_copies, [node()]}]}, {listener, [{type, bag}, {attributes, record_info(fields, listener)}]}, + {durable_routes, [{disc_copies, [node()]}, + {type,ordered_set}, + {record_name, route}, + {attributes, record_info(fields, route)}]}, {route, [{type,ordered_set},{attributes, record_info(fields, route)}]}, {reverse_route, [{type,ordered_set},{attributes, record_info(fields, reverse_route)}]}, {durable_exchanges, [{disc_copies, [node()]}, |
