diff options
| author | Rob Harrop <rob@rabbitmq.com> | 2011-05-03 11:59:21 +0100 |
|---|---|---|
| committer | Rob Harrop <rob@rabbitmq.com> | 2011-05-03 11:59:21 +0100 |
| commit | 1a9334108bf69b9e2405ca9e006582f6d7873bdb (patch) | |
| tree | a572cc144facd5f926e23321b8eaf00fe996fbca | |
| parent | 11a0868f91ea3f3263c1dce53a87db6391c0fa81 (diff) | |
| parent | ca1ad94464bde375a054162e2ee91a688ab9e7dd (diff) | |
| download | rabbitmq-server-git-1a9334108bf69b9e2405ca9e006582f6d7873bdb.tar.gz | |
Merge with default
| -rw-r--r-- | packaging/common/rabbitmq-server.init | 3 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 116 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 6 |
4 files changed, 69 insertions, 61 deletions
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index f3bdc3d2ad..d8a7a94d56 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -28,6 +28,7 @@ INIT_LOG_DIR=/var/log/rabbitmq LOCK_FILE= # This is filled in when building packages test -x $DAEMON || exit 0 +test -x $CONTROL || exit 0 RETVAL=0 set -e @@ -94,7 +95,7 @@ status_rabbitmq() { rotate_logs_rabbitmq() { set +e - $DAEMON rotate_logs ${ROTATE_SUFFIX} + $CONTROL rotate_logs ${ROTATE_SUFFIX} if [ $? != 0 ] ; then RETVAL=1 fi diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index c2c8dc1fff..dc119fbd5e 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -97,6 +97,15 @@ recover(XNames, QNames) -> XNameSet = sets:from_list(XNames), QNameSet = sets:from_list(QNames), rabbit_misc:table_filter( + fun (Route) -> + mnesia:read({rabbit_semi_durable_route, Route}) =:= [] + end, + fun (Route, true) -> + ok = mnesia:write(rabbit_semi_durable_route, Route, write); + (_Route, false) -> + ok + end, rabbit_durable_route), + rabbit_misc:table_filter( fun (#route{binding = #binding{destination = Dst = #resource{kind = Kind}}}) -> sets:is_element(Dst, case Kind of @@ -106,13 +115,13 @@ recover(XNames, QNames) -> end, fun (R = #route{binding = B = #binding{source = Src}}, Tx) -> case Tx of - true -> ok = sync_transient_binding(R, fun mnesia:write/3); + true -> ok = sync_transient_route(R, fun mnesia:write/3); false -> ok end, {ok, X} = rabbit_exchange:lookup(Src), rabbit_exchange:callback(X, add_binding, [Tx, X, B]) end, - rabbit_durable_route), + rabbit_semi_durable_route), ok. exists(Binding) -> @@ -140,9 +149,11 @@ add(Binding, InnerFun) -> end). add(Src, Dst, B) -> - Durable = all_durable([Src, Dst]), - case (not Durable orelse mnesia:read({rabbit_durable_route, B}) =:= []) of - true -> ok = sync_binding(B, Durable, fun mnesia:write/3), + [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], + case (not (SrcDurable andalso DstDurable) orelse + mnesia:read({rabbit_durable_route, B}) =:= []) of + true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, + fun mnesia:write/3), fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding, [Tx, Src, B]), rabbit_event:notify_if(not Tx, binding_created, @@ -167,7 +178,8 @@ remove(Binding, InnerFun) -> end). remove(Src, Dst, B) -> - ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:delete_object/3), + ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), + fun mnesia:delete_object/3), Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()), fun (Tx) -> ok = process_deletions(Deletions, Tx) end. @@ -228,32 +240,31 @@ has_for_source(SrcName) -> %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure - contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). + contains(rabbit_route, Match) orelse + contains(rabbit_semi_durable_route, Match). remove_for_source(SrcName) -> + Match = #route{binding = #binding{source = SrcName, _ = '_'}}, + Routes = lists:usort( + mnesia:match_object(rabbit_route, Match, write) ++ + mnesia:match_object(rabbit_durable_route, Match, write)), [begin - ok = mnesia:delete_object(rabbit_reverse_route, - reverse_route(Route), write), - ok = delete_forward_routes(Route), + sync_route(Route, fun mnesia:delete_object/3), Route#route.binding - end || Route <- mnesia:match_object( - rabbit_route, - #route{binding = #binding{source = SrcName, - _ = '_'}}, - write)]. + end || Route <- Routes]. -remove_for_destination(DstName) -> - remove_for_destination(DstName, fun delete_forward_routes/1). +remove_for_destination(Dst) -> + remove_for_destination( + Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end). -remove_transient_for_destination(DstName) -> - remove_for_destination(DstName, fun delete_transient_forward_routes/1). +remove_transient_for_destination(Dst) -> + remove_for_destination( + Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end). %%---------------------------------------------------------------------------- -all_durable(Resources) -> - lists:all(fun (#exchange{durable = D}) -> D; - (#amqqueue{durable = D}) -> D - end, Resources). +durable(#exchange{durable = D}) -> D; +durable(#amqqueue{durable = D}) -> D. binding_action(Binding = #binding{source = SrcName, destination = DstName, @@ -265,17 +276,22 @@ binding_action(Binding = #binding{source = SrcName, Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). -sync_binding(Binding, true, Fun) -> - ok = Fun(rabbit_durable_route, #route{binding = Binding}, write), - ok = sync_transient_binding(Binding, Fun); +sync_route(R, Fun) -> sync_route(R, true, true, Fun). + +sync_route(Route, true, true, Fun) -> + ok = Fun(rabbit_durable_route, Route, write), + sync_route(Route, false, true, Fun); -sync_binding(Binding, false, Fun) -> - ok = sync_transient_binding(Binding, Fun). +sync_route(Route, false, true, Fun) -> + ok = Fun(rabbit_semi_durable_route, Route, write), + sync_route(Route, false, false, Fun); -sync_transient_binding(Binding, Fun) -> - {Route, ReverseRoute} = route_with_reverse(Binding), +sync_route(Route, _SrcDurable, false, Fun) -> + sync_transient_route(Route, Fun). + +sync_transient_route(Route, Fun) -> ok = Fun(rabbit_route, Route, write), - ok = Fun(rabbit_reverse_route, ReverseRoute, write). + ok = Fun(rabbit_reverse_route, reverse_route(Route), write). call_with_source_and_destination(SrcName, DstName, Fun) -> SrcTable = table_for_resource(SrcName), @@ -302,22 +318,15 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_destination(DstName, FwdDeleteFun) -> - Bindings = - [begin - Route = reverse_route(ReverseRoute), - ok = FwdDeleteFun(Route), - ok = mnesia:delete_object(rabbit_reverse_route, - ReverseRoute, write), - Route#route.binding - end || ReverseRoute - <- mnesia:match_object( - rabbit_reverse_route, - reverse_route(#route{ - binding = #binding{ - destination = DstName, - _ = '_'}}), - write)], +remove_for_destination(DstName, DeleteFun) -> + Match = reverse_route( + #route{binding = #binding{destination = DstName, _ = '_'}}), + ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write), + Bindings = [begin + Route = reverse_route(ReverseRoute), + ok = DeleteFun(Route), + Route#route.binding + end || ReverseRoute <- ReverseRoutes], group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), lists:keysort(#binding.source, Bindings)). @@ -350,19 +359,6 @@ maybe_auto_delete(XName, Bindings, Deletions) -> end, add_deletion(XName, Entry, Deletions1). -delete_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write), - ok = mnesia:delete_object(rabbit_durable_route, Route, write). - -delete_transient_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write). - -route_with_reverse(#route{binding = Binding}) -> - route_with_reverse(Binding); -route_with_reverse(Binding = #binding{}) -> - Route = #route{binding = Binding}, - {Route, reverse_route(Route)}. - reverse_route(#route{binding = Binding}) -> #reverse_route{reverse_binding = reverse_binding(Binding)}; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f2f31ef374..745ecc0140 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -187,6 +187,11 @@ table_definitions() -> {attributes, record_info(fields, route)}, {disc_copies, [node()]}, {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_semi_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7567c29ef3..842c3b4fac 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -26,6 +26,7 @@ -rabbit_upgrade({internal_exchanges, mnesia, []}). -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). -rabbit_upgrade({topic_trie, mnesia, []}). +-rabbit_upgrade({semi_durable_route, mnesia, []}). %% ------------------------------------------------------------------- @@ -37,6 +38,7 @@ -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -spec(topic_trie/0 :: () -> 'ok'). +-spec(semi_durable_route/0 :: () -> 'ok'). -endif. @@ -101,6 +103,10 @@ topic_trie() -> {attributes, [trie_binding, value]}, {type, ordered_set}]). +semi_durable_route() -> + create(rabbit_semi_durable_route, [{record_name, route}, + {attributes, [binding, value]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |
