diff options
| author | Tony Garnock-Jones <tonyg@lshift.net> | 2009-01-27 12:51:00 +0000 |
|---|---|---|
| committer | Tony Garnock-Jones <tonyg@lshift.net> | 2009-01-27 12:51:00 +0000 |
| commit | 0a90720b0f3e8ad4d8619fd5e24d1601b71bd614 (patch) | |
| tree | 554449232947d3303cfe343a32d0d1555896e31d | |
| parent | 25964d18a56f65e94b64dace82bb409e64dab4e9 (diff) | |
| parent | 7f080eeb74d94c8d61ccda594dc7eadf486f56ca (diff) | |
| download | rabbitmq-server-git-0a90720b0f3e8ad4d8619fd5e24d1601b71bd614.tar.gz | |
merge bug20193 into default
| -rw-r--r-- | src/rabbit_access_control.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 98 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 70 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 8 |
6 files changed, 151 insertions, 103 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 0d9632b5d8..394eb2b124 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -117,7 +117,7 @@ internal_lookup_vhost_access(Username, VHostPath) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({user_permission, + case mnesia:read({rabbit_user_permission, #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> not_found; @@ -149,7 +149,7 @@ check_resource_access(_Username, check_resource_access(Username, R = #resource{virtual_host = VHostPath, name = Name}, Permission) -> - Res = case mnesia:dirty_read({user_permission, + Res = case mnesia:dirty_read({rabbit_user_permission, #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> @@ -171,10 +171,12 @@ check_resource_access(Username, add_user(Username, Password) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({user, Username}) of + case mnesia:wread({rabbit_user, Username}) of [] -> - ok = mnesia:write(#user{username = Username, - password = Password}); + ok = mnesia:write(rabbit_user, + #user{username = Username, + password = Password}, + write); _ -> mnesia:abort({user_already_exists, Username}) end @@ -187,13 +189,16 @@ delete_user(Username) -> rabbit_misc:with_user( Username, fun () -> - ok = mnesia:delete({user, Username}), - [ok = mnesia:delete_object(R) || + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permissions, R, write) || R <- mnesia:match_object( + rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ username = Username, virtual_host = '_'}, - permission = '_'})], + permission = '_'}, + write)], ok end)), rabbit_log:info("Deleted user ~p~n", [Username]), @@ -204,24 +209,28 @@ change_password(Username, Password) -> rabbit_misc:with_user( Username, fun () -> - ok = mnesia:write(#user{username = Username, - password = Password}) + ok = mnesia:write(rabbit_user, + #user{username = Username, + password = Password}, + write) end)), rabbit_log:info("Changed password for user ~p~n", [Username]), R. list_users() -> - mnesia:dirty_all_keys(user). + mnesia:dirty_all_keys(rabbit_user). lookup_user(Username) -> - rabbit_misc:dirty_read({user, Username}). + rabbit_misc:dirty_read({rabbit_user, Username}). add_vhost(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({vhost, VHostPath}) of + case mnesia:wread({rabbit_vhost, VHostPath}) of [] -> - ok = mnesia:write(#vhost{virtual_host = VHostPath}), + ok = mnesia:write(rabbit_vhost, + #vhost{virtual_host = VHostPath}, + write), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, []) || @@ -267,11 +276,11 @@ internal_delete_vhost(VHostPath) -> ok = clear_permissions(Username, VHostPath) end, list_vhost_permissions(VHostPath)), - ok = mnesia:delete({vhost, VHostPath}), + ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. list_vhosts() -> - mnesia:dirty_all_keys(vhost). + mnesia:dirty_all_keys(rabbit_vhost). validate_regexp(RegexpBin) -> Regexp = binary_to_list(RegexpBin), @@ -287,12 +296,14 @@ set_permissions(Username, VHostPath, ConfigurationPerm, MessagingPerm) -> rabbit_misc:with_user_and_vhost( Username, VHostPath, fun () -> ok = mnesia:write( + rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ username = Username, virtual_host = VHostPath}, permission = #permission{ configuration = ConfigurationPerm, - messaging = MessagingPerm}}) + messaging = MessagingPerm}}, + write) end)). clear_permissions(Username, VHostPath) -> @@ -300,7 +311,7 @@ clear_permissions(Username, VHostPath) -> rabbit_misc:with_user_and_vhost( Username, VHostPath, fun () -> - ok = mnesia:delete({user_permission, + ok = mnesia:delete({rabbit_user_permission, #user_vhost{username = Username, virtual_host = VHostPath}}) end)). @@ -329,8 +340,10 @@ list_permissions(QueryThunk) -> match_user_vhost(Username, VHostPath) -> fun () -> mnesia:match_object( + rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ username = Username, virtual_host = VHostPath}, - permission = '_'}) + permission = '_'}, + read) end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index abbdce66d1..3018582f94 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -128,7 +128,7 @@ recover_durable_queues() -> R = rabbit_misc:execute_mnesia_transaction( fun () -> qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(durable_queues), + <- mnesia:table(rabbit_durable_queue), node(Pid) == Node])) end), Queues = lists:map(fun start_queue_process/1, R), @@ -146,7 +146,7 @@ declare(QueueName, Durable, AutoDelete, Args) -> pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({rabbit_queue, QueueName}) of [] -> ok = store_queue(Q), ok = add_default_binding(Q), Q; @@ -159,11 +159,11 @@ declare(QueueName, Durable, AutoDelete, Args) -> end. store_queue(Q = #amqqueue{durable = true}) -> - ok = mnesia:write(durable_queues, Q, write), - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_durable_queue, Q, write), + ok = mnesia:write(rabbit_queue, Q, write), ok; store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_queue, Q, write), ok. start_queue_process(Q) -> @@ -177,7 +177,7 @@ add_default_binding(#amqqueue{name = QueueName}) -> ok. lookup(Name) -> - rabbit_misc:dirty_read({amqqueue, Name}). + rabbit_misc:dirty_read({rabbit_queue, Name}). with(Name, F, E) -> case lookup(Name) of @@ -194,6 +194,7 @@ with_or_die(Name, F) -> list(VHostPath) -> mnesia:dirty_match_object( + rabbit_queue, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). @@ -214,7 +215,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). + lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). @@ -290,18 +291,18 @@ unblock(QPid, ChPid) -> internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; [Q] -> ok = delete_queue(Q), - ok = mnesia:delete({durable_queues, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), ok end end). delete_queue(#amqqueue{name = QueueName}) -> ok = rabbit_exchange:delete_bindings_for_queue(QueueName), - ok = mnesia:delete({amqqueue, QueueName}), + ok = mnesia:delete({rabbit_queue, QueueName}), ok. on_node_down(Node) -> @@ -311,7 +312,7 @@ on_node_down(Node) -> fun (Q, Acc) -> ok = delete_queue(Q), Acc end, ok, qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(amqqueue), + <- mnesia:table(rabbit_queue), node(Pid) == Node])) end). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 960e4945fe..19efd9fc22 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -107,16 +107,18 @@ recover() -> fun () -> mnesia:foldl( fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), + ok = mnesia:write(rabbit_exchange, Exchange, write), Acc - end, ok, durable_exchanges), + end, ok, rabbit_durable_exchange), mnesia:foldl( fun (Route, Acc) -> {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(Route), - ok = mnesia:write(ReverseRoute), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write), Acc - end, ok, durable_routes), + end, ok, rabbit_durable_route), ok end). @@ -128,11 +130,11 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({exchange, ExchangeName}) of - [] -> ok = mnesia:write(Exchange), + case mnesia:wread({rabbit_exchange, ExchangeName}) of + [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), if Durable -> - ok = mnesia:write( - durable_exchanges, Exchange, write); + ok = mnesia:write(rabbit_durable_exchange, + Exchange, write); true -> ok end, Exchange; @@ -161,7 +163,7 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> [rabbit_misc:rs(Name), ActualType, RequiredType]). lookup(Name) -> - rabbit_misc:dirty_read({exchange, Name}). + rabbit_misc:dirty_read({rabbit_exchange, Name}). lookup_or_die(Name) -> case lookup(Name) of @@ -173,6 +175,7 @@ lookup_or_die(Name) -> list(VHostPath) -> mnesia:dirty_match_object( + rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). map(VHostPath, F) -> @@ -259,7 +262,7 @@ match_bindings(#exchange{name = Name}, Match) -> Query = qlc:q([QName || #route{binding = Binding = #binding{ exchange_name = ExchangeName, queue_name = QName}} <- - mnesia:table(route), + mnesia:table(rabbit_route), ExchangeName == Name, Match(Binding)]), lookup_qpids( @@ -271,6 +274,7 @@ match_bindings(#exchange{name = Name}, Match) -> [QName || #route{binding = Binding = #binding{ queue_name = QName}} <- mnesia:dirty_match_object( + rabbit_route, #route{binding = #binding{exchange_name = Name, _ = '_'}}), Match(Binding)] @@ -281,12 +285,12 @@ match_routing_key(#exchange{name = Name}, RoutingKey) -> queue_name = '$1', key = RoutingKey, _ = '_'}}, - lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> sets:fold( fun(Key, Acc) -> - case mnesia:dirty_read({amqqueue, Key}) of + case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end @@ -297,33 +301,37 @@ lookup_qpids(Queues) -> %% to be implemented for 0.91 ? delete_bindings_for_exchange(ExchangeName) -> - indexed_delete( - #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - fun delete_forward_routes/1, fun mnesia:delete_object/1). + [begin + ok = mnesia:delete_object(rabbit_reverse_route, + reverse_route(Route), write), + ok = delete_forward_routes(Route) + end || Route <- mnesia:match_object( + rabbit_route, + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + write)], + ok. delete_bindings_for_queue(QueueName) -> Exchanges = exchanges_for_queue(QueueName), - indexed_delete( - reverse_route(#route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - fun mnesia:delete_object/1, fun delete_forward_routes/1), [begin - [X] = mnesia:read({exchange, ExchangeName}), + ok = delete_forward_routes(reverse_route(Route)), + ok = mnesia:delete_object(rabbit_reverse_route, Route, write) + end || Route <- mnesia:match_object( + rabbit_reverse_route, + reverse_route( + #route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + write)], + [begin + [X] = mnesia:read({rabbit_exchange, ExchangeName}), ok = maybe_auto_delete(X) end || ExchangeName <- Exchanges], ok. -indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) -> - [begin - ok = ReverseDeleteFun(reverse_route(Route)), - ok = ForwardsDeleteFun(Route) - end || Route <- mnesia:match_object(Match)], - ok. - delete_forward_routes(Route) -> - ok = mnesia:delete_object(Route), - ok = mnesia:delete_object(durable_routes, Route, write). + ok = mnesia:delete_object(rabbit_route, Route, write), + ok = mnesia:delete_object(rabbit_durable_route, Route, write). exchanges_for_queue(QueueName) -> MatchHead = reverse_route( @@ -332,17 +340,18 @@ exchanges_for_queue(QueueName) -> _ = '_'}}), sets:to_list( sets:from_list( - mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). has_bindings(ExchangeName) -> MatchHead = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, try - continue(mnesia:select(route, [{MatchHead, [], ['$_']}], 1, read)) + continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}], + 1, read)) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - case mnesia:match_object(MatchHead) of + case mnesia:match_object(rabbit_route, MatchHead, read) of [] -> false; [_|_] -> true end @@ -354,7 +363,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case mnesia:read({exchange, Exchange}) of + fun() -> case mnesia:read({rabbit_exchange, Exchange}) of [] -> {error, exchange_not_found}; [X] -> Fun(X) end @@ -363,7 +372,7 @@ call_with_exchange(Exchange, Fun) -> call_with_exchange_and_queue(Exchange, Queue, Fun) -> call_with_exchange( Exchange, - fun(X) -> case mnesia:read({amqqueue, Queue}) of + fun(X) -> case mnesia:read({rabbit_queue, Queue}) of [] -> {error, queue_not_found}; [Q] -> Fun(X, Q) end @@ -397,11 +406,13 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> key = RoutingKey, args = sort_arguments(Arguments)}, ok = case Durable of - true -> Fun(durable_routes, #route{binding = Binding}, write); + true -> Fun(rabbit_durable_route, + #route{binding = Binding}, write); false -> ok end, - [ok, ok] = [Fun(element(1, R), R, write) || - R <- tuple_to_list(route_with_reverse(Binding))], + {Route, ReverseRoute} = route_with_reverse(Binding), + ok = Fun(rabbit_route, Route, write), + ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. list_bindings(VHostPath) -> @@ -412,6 +423,7 @@ list_bindings(VHostPath) -> queue_name = QueueName, args = Arguments}} <- mnesia:dirty_match_object( + rabbit_route, #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), _ = '_'}, @@ -554,8 +566,8 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> unconditional_delete(#exchange{name = ExchangeName}) -> ok = delete_bindings_for_exchange(ExchangeName), - ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}). + ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), + ok = mnesia:delete({rabbit_exchange, ExchangeName}). %%---------------------------------------------------------------------------- %% EXTENDED API @@ -571,7 +583,7 @@ list_exchange_bindings(ExchangeName) -> #route{binding = #binding{queue_name = QueueName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(Route)]. + <- mnesia:dirty_match_object(rabbit_route, Route)]. % Refactoring is left as an exercise for the reader list_queue_bindings(QueueName) -> @@ -581,4 +593,4 @@ list_queue_bindings(QueueName) -> #route{binding = #binding{exchange_name = ExchangeName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(Route)]. + <- mnesia:dirty_match_object(rabbit_route, Route)]. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 85db50d79d..214c952834 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -237,7 +237,7 @@ filter_exit_map(F, L) -> with_user(Username, Thunk) -> fun () -> - case mnesia:read({user, Username}) of + case mnesia:read({rabbit_user, Username}) of [] -> mnesia:abort({no_such_user, Username}); [_U] -> @@ -247,7 +247,7 @@ with_user(Username, Thunk) -> with_vhost(VHostPath, Thunk) -> fun () -> - case mnesia:read({vhost, VHostPath}) of + case mnesia:read({rabbit_vhost, VHostPath}) of [] -> mnesia:abort({no_such_vhost, VHostPath}); [_V] -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index b7f3dd0a89..c0bf3d254e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -100,31 +100,51 @@ force_reset() -> reset(true). %%-------------------------------------------------------------------- table_definitions() -> - [{user, [{disc_copies, [node()]}, - {attributes, record_info(fields, user)}]}, - {user_permission, [{disc_copies, [node()]}, - {attributes, record_info(fields, user_permission)}]}, - {vhost, [{disc_copies, [node()]}, - {attributes, record_info(fields, vhost)}]}, - {rabbit_config, [{disc_copies, [node()]}]}, - {listener, [{type, bag}, - {attributes, record_info(fields, listener)}]}, - {durable_routes, [{disc_copies, [node()]}, - {record_name, route}, - {attributes, record_info(fields, route)}]}, - {route, [{type, ordered_set}, - {attributes, record_info(fields, route)}]}, - {reverse_route, [{type, ordered_set}, - {attributes, record_info(fields, reverse_route)}]}, - {durable_exchanges, [{disc_copies, [node()]}, - {record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, - {exchange, [{attributes, record_info(fields, exchange)}]}, - {durable_queues, [{disc_copies, [node()]}, - {record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}, - {amqqueue, [{attributes, record_info(fields, amqqueue)}, - {index, [pid]}]}]. + [{rabbit_user, + [{record_name, user}, + {attributes, record_info(fields, user)}, + {disc_copies, [node()]}]}, + {rabbit_user_permission, + [{record_name, user_permission}, + {attributes, record_info(fields, user_permission)}, + {disc_copies, [node()]}]}, + {rabbit_vhost, + [{record_name, vhost}, + {attributes, record_info(fields, vhost)}, + {disc_copies, [node()]}]}, + {rabbit_config, + [{disc_copies, [node()]}]}, + {rabbit_listener, + [{record_name, listener}, + {attributes, record_info(fields, listener)}, + {type, bag}]}, + {rabbit_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {disc_copies, [node()]}]}, + {rabbit_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}]}, + {rabbit_reverse_route, + [{record_name, reverse_route}, + {attributes, record_info(fields, reverse_route)}, + {type, ordered_set}]}, + {rabbit_durable_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}, + {disc_copies, [node()]}]}, + {rabbit_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}]}, + {rabbit_durable_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}, + {disc_copies, [node()]}]}, + {rabbit_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}, + {index, [pid]}]}]. table_names() -> [Tab || {Tab, _} <- table_definitions()]. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 99ea37d884..2dbd5a5af2 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -123,6 +123,7 @@ stop_tcp_listener(Host, Port) -> tcp_listener_started(IPAddress, Port) -> ok = mnesia:dirty_write( + rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), @@ -130,19 +131,20 @@ tcp_listener_started(IPAddress, Port) -> tcp_listener_stopped(IPAddress, Port) -> ok = mnesia:dirty_delete_object( + rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), port = Port}). active_listeners() -> - rabbit_misc:dirty_read_all(listener). + rabbit_misc:dirty_read_all(rabbit_listener). node_listeners(Node) -> - mnesia:dirty_read(listener, Node). + mnesia:dirty_read(rabbit_listener, Node). on_node_down(Node) -> - ok = mnesia:dirty_delete(listener, Node). + ok = mnesia:dirty_delete(rabbit_listener, Node). start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), |
