summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-05-03 11:59:21 +0100
committerRob Harrop <rob@rabbitmq.com>2011-05-03 11:59:21 +0100
commit1a9334108bf69b9e2405ca9e006582f6d7873bdb (patch)
treea572cc144facd5f926e23321b8eaf00fe996fbca
parent11a0868f91ea3f3263c1dce53a87db6391c0fa81 (diff)
parentca1ad94464bde375a054162e2ee91a688ab9e7dd (diff)
downloadrabbitmq-server-git-1a9334108bf69b9e2405ca9e006582f6d7873bdb.tar.gz
Merge with default
-rw-r--r--packaging/common/rabbitmq-server.init3
-rw-r--r--src/rabbit_binding.erl116
-rw-r--r--src/rabbit_mnesia.erl5
-rw-r--r--src/rabbit_upgrade_functions.erl6
4 files changed, 69 insertions, 61 deletions
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index f3bdc3d2ad..d8a7a94d56 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -28,6 +28,7 @@ INIT_LOG_DIR=/var/log/rabbitmq
LOCK_FILE= # This is filled in when building packages
test -x $DAEMON || exit 0
+test -x $CONTROL || exit 0
RETVAL=0
set -e
@@ -94,7 +95,7 @@ status_rabbitmq() {
rotate_logs_rabbitmq() {
set +e
- $DAEMON rotate_logs ${ROTATE_SUFFIX}
+ $CONTROL rotate_logs ${ROTATE_SUFFIX}
if [ $? != 0 ] ; then
RETVAL=1
fi
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index c2c8dc1fff..dc119fbd5e 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -97,6 +97,15 @@ recover(XNames, QNames) ->
XNameSet = sets:from_list(XNames),
QNameSet = sets:from_list(QNames),
rabbit_misc:table_filter(
+ fun (Route) ->
+ mnesia:read({rabbit_semi_durable_route, Route}) =:= []
+ end,
+ fun (Route, true) ->
+ ok = mnesia:write(rabbit_semi_durable_route, Route, write);
+ (_Route, false) ->
+ ok
+ end, rabbit_durable_route),
+ rabbit_misc:table_filter(
fun (#route{binding = #binding{destination = Dst =
#resource{kind = Kind}}}) ->
sets:is_element(Dst, case Kind of
@@ -106,13 +115,13 @@ recover(XNames, QNames) ->
end,
fun (R = #route{binding = B = #binding{source = Src}}, Tx) ->
case Tx of
- true -> ok = sync_transient_binding(R, fun mnesia:write/3);
+ true -> ok = sync_transient_route(R, fun mnesia:write/3);
false -> ok
end,
{ok, X} = rabbit_exchange:lookup(Src),
rabbit_exchange:callback(X, add_binding, [Tx, X, B])
end,
- rabbit_durable_route),
+ rabbit_semi_durable_route),
ok.
exists(Binding) ->
@@ -140,9 +149,11 @@ add(Binding, InnerFun) ->
end).
add(Src, Dst, B) ->
- Durable = all_durable([Src, Dst]),
- case (not Durable orelse mnesia:read({rabbit_durable_route, B}) =:= []) of
- true -> ok = sync_binding(B, Durable, fun mnesia:write/3),
+ [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
+ case (not (SrcDurable andalso DstDurable) orelse
+ mnesia:read({rabbit_durable_route, B}) =:= []) of
+ true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
+ fun mnesia:write/3),
fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding,
[Tx, Src, B]),
rabbit_event:notify_if(not Tx, binding_created,
@@ -167,7 +178,8 @@ remove(Binding, InnerFun) ->
end).
remove(Src, Dst, B) ->
- ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:delete_object/3),
+ ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
+ fun mnesia:delete_object/3),
Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()),
fun (Tx) -> ok = process_deletions(Deletions, Tx) end.
@@ -228,32 +240,31 @@ has_for_source(SrcName) ->
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+ contains(rabbit_route, Match) orelse
+ contains(rabbit_semi_durable_route, Match).
remove_for_source(SrcName) ->
+ Match = #route{binding = #binding{source = SrcName, _ = '_'}},
+ Routes = lists:usort(
+ mnesia:match_object(rabbit_route, Match, write) ++
+ mnesia:match_object(rabbit_durable_route, Match, write)),
[begin
- ok = mnesia:delete_object(rabbit_reverse_route,
- reverse_route(Route), write),
- ok = delete_forward_routes(Route),
+ sync_route(Route, fun mnesia:delete_object/3),
Route#route.binding
- end || Route <- mnesia:match_object(
- rabbit_route,
- #route{binding = #binding{source = SrcName,
- _ = '_'}},
- write)].
+ end || Route <- Routes].
-remove_for_destination(DstName) ->
- remove_for_destination(DstName, fun delete_forward_routes/1).
+remove_for_destination(Dst) ->
+ remove_for_destination(
+ Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end).
-remove_transient_for_destination(DstName) ->
- remove_for_destination(DstName, fun delete_transient_forward_routes/1).
+remove_transient_for_destination(Dst) ->
+ remove_for_destination(
+ Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end).
%%----------------------------------------------------------------------------
-all_durable(Resources) ->
- lists:all(fun (#exchange{durable = D}) -> D;
- (#amqqueue{durable = D}) -> D
- end, Resources).
+durable(#exchange{durable = D}) -> D;
+durable(#amqqueue{durable = D}) -> D.
binding_action(Binding = #binding{source = SrcName,
destination = DstName,
@@ -265,17 +276,22 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
-sync_binding(Binding, true, Fun) ->
- ok = Fun(rabbit_durable_route, #route{binding = Binding}, write),
- ok = sync_transient_binding(Binding, Fun);
+sync_route(R, Fun) -> sync_route(R, true, true, Fun).
+
+sync_route(Route, true, true, Fun) ->
+ ok = Fun(rabbit_durable_route, Route, write),
+ sync_route(Route, false, true, Fun);
-sync_binding(Binding, false, Fun) ->
- ok = sync_transient_binding(Binding, Fun).
+sync_route(Route, false, true, Fun) ->
+ ok = Fun(rabbit_semi_durable_route, Route, write),
+ sync_route(Route, false, false, Fun);
-sync_transient_binding(Binding, Fun) ->
- {Route, ReverseRoute} = route_with_reverse(Binding),
+sync_route(Route, _SrcDurable, false, Fun) ->
+ sync_transient_route(Route, Fun).
+
+sync_transient_route(Route, Fun) ->
ok = Fun(rabbit_route, Route, write),
- ok = Fun(rabbit_reverse_route, ReverseRoute, write).
+ ok = Fun(rabbit_reverse_route, reverse_route(Route), write).
call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
@@ -302,22 +318,15 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-remove_for_destination(DstName, FwdDeleteFun) ->
- Bindings =
- [begin
- Route = reverse_route(ReverseRoute),
- ok = FwdDeleteFun(Route),
- ok = mnesia:delete_object(rabbit_reverse_route,
- ReverseRoute, write),
- Route#route.binding
- end || ReverseRoute
- <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(#route{
- binding = #binding{
- destination = DstName,
- _ = '_'}}),
- write)],
+remove_for_destination(DstName, DeleteFun) ->
+ Match = reverse_route(
+ #route{binding = #binding{destination = DstName, _ = '_'}}),
+ ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
+ Bindings = [begin
+ Route = reverse_route(ReverseRoute),
+ ok = DeleteFun(Route),
+ Route#route.binding
+ end || ReverseRoute <- ReverseRoutes],
group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
lists:keysort(#binding.source, Bindings)).
@@ -350,19 +359,6 @@ maybe_auto_delete(XName, Bindings, Deletions) ->
end,
add_deletion(XName, Entry, Deletions1).
-delete_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write),
- ok = mnesia:delete_object(rabbit_durable_route, Route, write).
-
-delete_transient_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write).
-
-route_with_reverse(#route{binding = Binding}) ->
- route_with_reverse(Binding);
-route_with_reverse(Binding = #binding{}) ->
- Route = #route{binding = Binding},
- {Route, reverse_route(Route)}.
-
reverse_route(#route{binding = Binding}) ->
#reverse_route{reverse_binding = reverse_binding(Binding)};
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index f2f31ef374..745ecc0140 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -187,6 +187,11 @@ table_definitions() ->
{attributes, record_info(fields, route)},
{disc_copies, [node()]},
{match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_semi_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_route,
[{record_name, route},
{attributes, record_info(fields, route)},
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7567c29ef3..842c3b4fac 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -26,6 +26,7 @@
-rabbit_upgrade({internal_exchanges, mnesia, []}).
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
-rabbit_upgrade({topic_trie, mnesia, []}).
+-rabbit_upgrade({semi_durable_route, mnesia, []}).
%% -------------------------------------------------------------------
@@ -37,6 +38,7 @@
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
+-spec(semi_durable_route/0 :: () -> 'ok').
-endif.
@@ -101,6 +103,10 @@ topic_trie() ->
{attributes, [trie_binding, value]},
{type, ordered_set}]).
+semi_durable_route() ->
+ create(rabbit_semi_durable_route, [{record_name, route},
+ {attributes, [binding, value]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->