summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-04-01 17:22:15 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-04-01 17:22:15 +0100
commit2478fe80910fb07f20cafa80492f1bafffd85274 (patch)
tree16a5001835c637440af47e5255f5ff9a98d326b9
parent6b36086c0735916b62315a72f87682380f0b2eb8 (diff)
parent8118ea9c727e9c46bfc9deb5ef651ed5a42cc557 (diff)
downloadrabbitmq-server-git-2478fe80910fb07f20cafa80492f1bafffd85274.tar.gz
Merge bug24006 into bug24009.
-rw-r--r--src/rabbit_binding.erl33
-rw-r--r--src/rabbit_mnesia.erl5
-rw-r--r--src/rabbit_upgrade_functions.erl6
3 files changed, 31 insertions, 13 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 5ac9c8718b..5cd7a1beac 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -97,6 +97,10 @@
recover(XNames, QNames) ->
XNameSet = sets:from_list(XNames),
QNameSet = sets:from_list(QNames),
+ rabbit_misc:table_fold(
+ fun (Route, ok) ->
+ ok = mnesia:write(rabbit_semi_durable_route, Route, write)
+ end, ok, rabbit_durable_route),
XBs = rabbit_misc:table_fold(
fun (Route = #route{binding = B = #binding{source = Src}}, Acc) ->
case should_recover(B, XNameSet, QNameSet) of
@@ -107,7 +111,7 @@ recover(XNames, QNames) ->
rabbit_misc:dict_cons(Src, B, Acc);
false -> Acc
end
- end, dict:new(), rabbit_durable_route),
+ end, dict:new(), rabbit_semi_durable_route),
rabbit_misc:execute_mnesia_transaction(
fun () -> ok end,
fun (ok, Tx) ->
@@ -149,7 +153,7 @@ add(Binding, InnerFun) ->
case InnerFun(Src, Dst) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> ok = sync_binding(B, all_durable([Src, Dst]),
+ [] -> ok = sync_binding(B, Src, Dst,
fun mnesia:write/3),
fun (Tx) ->
ok = rabbit_exchange:callback(
@@ -177,7 +181,7 @@ remove(Binding, InnerFun) ->
[_] ->
case InnerFun(Src, Dst) of
ok ->
- ok = sync_binding(B, all_durable([Src, Dst]),
+ ok = sync_binding(B, Src, Dst,
fun mnesia:delete_object/3),
{ok, maybe_auto_delete(B#binding.source,
[B], new_deletions())};
@@ -250,7 +254,8 @@ has_for_source(SrcName) ->
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+ contains(rabbit_route, Match) orelse contains(rabbit_semi_durable_route,
+ Match).
remove_for_source(SrcName) ->
[begin
@@ -272,10 +277,8 @@ remove_transient_for_destination(DstName) ->
%%----------------------------------------------------------------------------
-all_durable(Resources) ->
- lists:all(fun (#exchange{durable = D}) -> D;
- (#amqqueue{durable = D}) -> D
- end, Resources).
+durable(#exchange{durable = D}) -> D;
+durable(#amqqueue{durable = D}) -> D.
binding_action(Binding = #binding{source = SrcName,
destination = DstName,
@@ -287,13 +290,16 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
-sync_binding(Binding, Durable, Fun) ->
- ok = case Durable of
- true -> Fun(rabbit_durable_route,
- #route{binding = Binding}, write);
+sync_binding(Binding, Src, Dst, Fun) ->
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = case durable(Src) andalso durable(Dst) of
+ true -> Fun(rabbit_durable_route, Route, write);
+ false -> ok
+ end,
+ ok = case durable(Dst) of
+ true -> Fun(rabbit_semi_durable_route, Route, write);
false -> ok
end,
- {Route, ReverseRoute} = route_with_reverse(Binding),
ok = Fun(rabbit_route, Route, write),
ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
@@ -374,6 +380,7 @@ maybe_auto_delete(XName, Bindings, Deletions) ->
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_semi_durable_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
delete_transient_forward_routes(Route) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index fbcf07ae77..77b06d0c08 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -187,6 +187,11 @@ table_definitions() ->
{attributes, record_info(fields, route)},
{disc_copies, [node()]},
{match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_semi_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_route,
[{record_name, route},
{attributes, record_info(fields, route)},
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7567c29ef3..842c3b4fac 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -26,6 +26,7 @@
-rabbit_upgrade({internal_exchanges, mnesia, []}).
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
-rabbit_upgrade({topic_trie, mnesia, []}).
+-rabbit_upgrade({semi_durable_route, mnesia, []}).
%% -------------------------------------------------------------------
@@ -37,6 +38,7 @@
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
+-spec(semi_durable_route/0 :: () -> 'ok').
-endif.
@@ -101,6 +103,10 @@ topic_trie() ->
{attributes, [trie_binding, value]},
{type, ordered_set}]).
+semi_durable_route() ->
+ create(rabbit_semi_durable_route, [{record_name, route},
+ {attributes, [binding, value]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->