diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-07-21 19:43:23 +0100 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-07-21 19:43:23 +0100 |
| commit | 6c3c8aba51c6d56845a1f2b2f373ec959a9d486e (patch) | |
| tree | 0fb0abf6760a46dfb4d43f20c5e3050fa0a01c03 /src | |
| parent | c6210473559eed2ee8d959d30fc08dd60e09a425 (diff) | |
| download | rabbitmq-server-git-6c3c8aba51c6d56845a1f2b2f373ec959a9d486e.tar.gz | |
Re-keyed exchange and queue persistence on the vhost
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 15 |
6 files changed, 32 insertions, 35 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 2c5fd614cf..2cd04d0ae1 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -213,16 +213,12 @@ insert_default_data() -> {ok, DefaultPass} = application:get_env(default_pass), {ok, DefaultVHost} = application:get_env(default_vhost), ok = rabbit_access_control:add_vhost(DefaultVHost), - ok = insert_default_user(DefaultUser, DefaultPass, - [{DefaultVHost, [<<"/data">>, <<"/admin">>]}]), + ok = insert_default_user(DefaultUser, DefaultPass,DefaultVHost), ok. -insert_default_user(Username, Password, VHostSpecs) -> +insert_default_user(Username, Password, VHostPath) -> ok = rabbit_access_control:add_user(Username, Password), - lists:foreach( - fun (VHostPath) -> - ok = rabbit_access_control:map_user_vhost(Username, VHostPath) - end, VHostSpecs), + ok = rabbit_access_control:map_user_vhost(Username, VHostPath), ok. start_builtin_amq_applications() -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 0a41099132..a53ea30795 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -173,10 +173,10 @@ add_vhost(VHostPath) -> case mnesia:read({vhost, VHostPath}) of [] -> ok = mnesia:write(#vhost{virtual_host = VHostPath}), - #exchange{} = rabbit_exchange:declare(<<"">>, direct, true, false, []), - #exchange{} = rabbit_exchange:declare(<<"amq.direct">>, direct, true, false, []), - #exchange{} = rabbit_exchange:declare(<<"amq.topic">>, topic, true, false, []), - #exchange{} = rabbit_exchange:declare(<<"amq.fanout">>, fanout, true, false, []), + #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"">>), direct, true, false, []), + #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.direct">>), direct, true, false, []), + #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.topic">>), topic, true, false, []), + #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.fanout">>), fanout, true, false, []), ok; [_] -> mnesia:abort({vhost_already_exists, VHostPath}) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5a9849df45..7da0ab01ab 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -130,8 +130,8 @@ recover_durable_queues() -> ok end). -declare(NameBin, Durable, AutoDelete, Args) -> - Q = start_queue_process(#amqqueue{name = NameBin, +declare(Resource = #resource{name = Name}, Durable, AutoDelete, Args) -> + Q = start_queue_process(#amqqueue{name = Resource, durable = Durable, auto_delete = AutoDelete, arguments = Args, @@ -139,7 +139,7 @@ declare(NameBin, Durable, AutoDelete, Args) -> pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, NameBin}) of + case mnesia:wread({amqqueue, Resource}) of [] -> ok = recover_queue(Q), Q; [ExistingQ] -> ExistingQ @@ -172,20 +172,22 @@ default_binding_spec(Name) -> routing_key = Name, arguments = []}. -recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) -> - ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), +recover_bindings(Q = #amqqueue{name = #resource{name = QueueName}, + binding_specs = Specs}) -> + % TODO I don't this should be commented out + %ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), lists:foreach(fun (B) -> ok = rabbit_exchange:add_binding(B, Q) end, Specs), ok. -modify_bindings(#resource{name = QueueName}, ExchangeName, RoutingKey, Arguments, +modify_bindings(Queue = #resource{name = QueueName}, X = #resource{name = ExchangeName}, RoutingKey, Arguments, SpecPresentFun, SpecAbsentFun) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({amqqueue, Queue}) of [Q = #amqqueue{binding_specs = Specs0}] -> - Spec = #binding_spec{exchange_name = ExchangeName, + Spec = #binding_spec{exchange_name = X, routing_key = RoutingKey, arguments = Arguments}, case (case lists:member(Spec, Specs0) of @@ -242,7 +244,6 @@ delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> lookup(Name) -> rabbit_misc:dirty_read({amqqueue, Name}). -with(#resource{name = Name}, F, E) -> with(Name, F, E); with(Name, F, E) -> case lookup(Name) of {ok, Q} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e1a6b4e63b..05ce07f27b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -161,7 +161,8 @@ return_queue_declare_ok(State, NoWait, Q) -> rabbit_misc:with_exit_handler( fun () -> {ok, Q#amqqueue.name, 0, 0} end, fun () -> rabbit_amqqueue:stat(Q) end), - Reply = #'queue.declare_ok'{queue = ActualName, + QueueName = ActualName#resource.name, + Reply = #'queue.declare_ok'{queue = QueueName, message_count = MessageCount, consumer_count = ConsumerCount}, {reply, Reply, NewState} @@ -462,12 +463,12 @@ handle_method(#'exchange.declare'{ticket = TicketNumber, _, State = #ch{ virtual_host = VHostPath }) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), %% FIXME: clarify spec as per declare wrt differing realms - X = case rabbit_exchange:lookup( - rabbit_misc:r(VHostPath, exchange, ExchangeNameBin)) of + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> ActualNameBin = check_name('exchange', ExchangeNameBin), - rabbit_exchange:declare(ActualNameBin, + rabbit_exchange:declare(ExchangeName, CheckedType, Durable, AutoDelete, @@ -544,7 +545,7 @@ handle_method(#'queue.declare'{ticket = TicketNumber, <<>> -> rabbit_misc:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, - Finish(rabbit_amqqueue:declare(ActualNameBin, + Finish(rabbit_amqqueue:declare(rabbit_misc:r(VHostPath, queue, ActualNameBin), Durable, AutoDelete, Args)); @@ -567,7 +568,7 @@ handle_method(#'queue.delete'{ticket = TicketNumber, if_empty = IfEmpty, nowait = NoWait }, - _, State) -> + _, State = #ch{ virtual_host = VHostPath }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), case rabbit_amqqueue:with_or_die( QueueName, diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 5bc538d550..e4ce3aa3df 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -34,7 +34,7 @@ init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( - ?LOG_EXCH_NAME, + rabbit_misc:r(DefaultVHost,exchange,?LOG_EXCH_NAME), topic, true, false, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 380ff24354..970e874a42 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -90,15 +90,15 @@ recover_durable_exchanges() -> end, ok, durable_exchanges) end). -declare(NameBin, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = NameBin, +declare(Resource = #resource{name = Name}, Type, Durable, AutoDelete, Args) -> + Exchange = #exchange{name = Resource, type = Type, durable = Durable, auto_delete = AutoDelete, arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({exchange, NameBin}) of + case mnesia:wread({exchange, Resource}) of [] -> ok = mnesia:write(Exchange), if Durable -> ok = mnesia:write( @@ -131,9 +131,8 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> lookup(Name) -> rabbit_misc:dirty_read({exchange, Name}). -lookup_or_die(#resource{name = Name}) -> lookup_or_die(Name); -lookup_or_die(Name) -> - case lookup(Name) of +lookup_or_die(Resource = #resource{name = Name}) -> + case lookup(Resource) of {ok, X} -> X; {error, not_found} -> rabbit_misc:protocol_error( @@ -152,6 +151,7 @@ list_exchange_bindings(Name) -> queue = QueueName} <- Handlers]. bindings_for_exchange(Name) -> + Q1 = qlc:e(qlc:q([B1 || B1 = #binding{} <- mnesia:table(binding)])), qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding), element(1, K) == Name])). @@ -216,7 +216,7 @@ delivery_key_for_type(fanout, Name, _RoutingKey) -> delivery_key_for_type(_Type, Name, RoutingKey) -> {Name, RoutingKey}. -call_with_exchange(#resource{name = Name}, Fun) -> call_with_exchange(Name, Fun); +%call_with_exchange(R = #resource{name = Name}, Fun) -> call_with_exchange(R, Fun); call_with_exchange(Name, Fun) -> case mnesia:wread({exchange, Name}) of [] -> {error, not_found}; @@ -334,7 +334,6 @@ last_topic_match(P, R, []) -> last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). -delete(#resource{name = ExchangeName}, IfUnused) -> delete(ExchangeName, IfUnused); delete(ExchangeName, IfUnused) -> rabbit_misc:execute_mnesia_transaction( fun () -> internal_delete(ExchangeName, IfUnused) end). |
