summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-07-21 19:43:23 +0100
committerBen Hood <0x6e6562@gmail.com>2008-07-21 19:43:23 +0100
commit6c3c8aba51c6d56845a1f2b2f373ec959a9d486e (patch)
tree0fb0abf6760a46dfb4d43f20c5e3050fa0a01c03 /src
parentc6210473559eed2ee8d959d30fc08dd60e09a425 (diff)
downloadrabbitmq-server-git-6c3c8aba51c6d56845a1f2b2f373ec959a9d486e.tar.gz
Re-keyed exchange and queue persistence on the vhost
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl10
-rw-r--r--src/rabbit_access_control.erl8
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_channel.erl13
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl15
6 files changed, 32 insertions, 35 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2c5fd614cf..2cd04d0ae1 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -213,16 +213,12 @@ insert_default_data() ->
{ok, DefaultPass} = application:get_env(default_pass),
{ok, DefaultVHost} = application:get_env(default_vhost),
ok = rabbit_access_control:add_vhost(DefaultVHost),
- ok = insert_default_user(DefaultUser, DefaultPass,
- [{DefaultVHost, [<<"/data">>, <<"/admin">>]}]),
+ ok = insert_default_user(DefaultUser, DefaultPass,DefaultVHost),
ok.
-insert_default_user(Username, Password, VHostSpecs) ->
+insert_default_user(Username, Password, VHostPath) ->
ok = rabbit_access_control:add_user(Username, Password),
- lists:foreach(
- fun (VHostPath) ->
- ok = rabbit_access_control:map_user_vhost(Username, VHostPath)
- end, VHostSpecs),
+ ok = rabbit_access_control:map_user_vhost(Username, VHostPath),
ok.
start_builtin_amq_applications() ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 0a41099132..a53ea30795 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -173,10 +173,10 @@ add_vhost(VHostPath) ->
case mnesia:read({vhost, VHostPath}) of
[] ->
ok = mnesia:write(#vhost{virtual_host = VHostPath}),
- #exchange{} = rabbit_exchange:declare(<<"">>, direct, true, false, []),
- #exchange{} = rabbit_exchange:declare(<<"amq.direct">>, direct, true, false, []),
- #exchange{} = rabbit_exchange:declare(<<"amq.topic">>, topic, true, false, []),
- #exchange{} = rabbit_exchange:declare(<<"amq.fanout">>, fanout, true, false, []),
+ #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"">>), direct, true, false, []),
+ #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.direct">>), direct, true, false, []),
+ #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.topic">>), topic, true, false, []),
+ #exchange{} = rabbit_exchange:declare(rabbit_misc:r(VHostPath,exchange,<<"amq.fanout">>), fanout, true, false, []),
ok;
[_] ->
mnesia:abort({vhost_already_exists, VHostPath})
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5a9849df45..7da0ab01ab 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -130,8 +130,8 @@ recover_durable_queues() ->
ok
end).
-declare(NameBin, Durable, AutoDelete, Args) ->
- Q = start_queue_process(#amqqueue{name = NameBin,
+declare(Resource = #resource{name = Name}, Durable, AutoDelete, Args) ->
+ Q = start_queue_process(#amqqueue{name = Resource,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
@@ -139,7 +139,7 @@ declare(NameBin, Durable, AutoDelete, Args) ->
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, NameBin}) of
+ case mnesia:wread({amqqueue, Resource}) of
[] -> ok = recover_queue(Q),
Q;
[ExistingQ] -> ExistingQ
@@ -172,20 +172,22 @@ default_binding_spec(Name) ->
routing_key = Name,
arguments = []}.
-recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) ->
- ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q),
+recover_bindings(Q = #amqqueue{name = #resource{name = QueueName},
+ binding_specs = Specs}) ->
+ % TODO I don't this should be commented out
+ %ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q),
lists:foreach(fun (B) ->
ok = rabbit_exchange:add_binding(B, Q)
end, Specs),
ok.
-modify_bindings(#resource{name = QueueName}, ExchangeName, RoutingKey, Arguments,
+modify_bindings(Queue = #resource{name = QueueName}, X = #resource{name = ExchangeName}, RoutingKey, Arguments,
SpecPresentFun, SpecAbsentFun) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
+ case mnesia:wread({amqqueue, Queue}) of
[Q = #amqqueue{binding_specs = Specs0}] ->
- Spec = #binding_spec{exchange_name = ExchangeName,
+ Spec = #binding_spec{exchange_name = X,
routing_key = RoutingKey,
arguments = Arguments},
case (case lists:member(Spec, Specs0) of
@@ -242,7 +244,6 @@ delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
lookup(Name) ->
rabbit_misc:dirty_read({amqqueue, Name}).
-with(#resource{name = Name}, F, E) -> with(Name, F, E);
with(Name, F, E) ->
case lookup(Name) of
{ok, Q} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e1a6b4e63b..05ce07f27b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -161,7 +161,8 @@ return_queue_declare_ok(State, NoWait, Q) ->
rabbit_misc:with_exit_handler(
fun () -> {ok, Q#amqqueue.name, 0, 0} end,
fun () -> rabbit_amqqueue:stat(Q) end),
- Reply = #'queue.declare_ok'{queue = ActualName,
+ QueueName = ActualName#resource.name,
+ Reply = #'queue.declare_ok'{queue = QueueName,
message_count = MessageCount,
consumer_count = ConsumerCount},
{reply, Reply, NewState}
@@ -462,12 +463,12 @@ handle_method(#'exchange.declare'{ticket = TicketNumber,
_, State = #ch{ virtual_host = VHostPath }) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
%% FIXME: clarify spec as per declare wrt differing realms
- X = case rabbit_exchange:lookup(
- rabbit_misc:r(VHostPath, exchange, ExchangeNameBin)) of
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
ActualNameBin = check_name('exchange', ExchangeNameBin),
- rabbit_exchange:declare(ActualNameBin,
+ rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
AutoDelete,
@@ -544,7 +545,7 @@ handle_method(#'queue.declare'{ticket = TicketNumber,
<<>> -> rabbit_misc:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
- Finish(rabbit_amqqueue:declare(ActualNameBin,
+ Finish(rabbit_amqqueue:declare(rabbit_misc:r(VHostPath, queue, ActualNameBin),
Durable,
AutoDelete,
Args));
@@ -567,7 +568,7 @@ handle_method(#'queue.delete'{ticket = TicketNumber,
if_empty = IfEmpty,
nowait = NoWait
},
- _, State) ->
+ _, State = #ch{ virtual_host = VHostPath }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
case rabbit_amqqueue:with_or_die(
QueueName,
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 5bc538d550..e4ce3aa3df 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -34,7 +34,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
- ?LOG_EXCH_NAME,
+ rabbit_misc:r(DefaultVHost,exchange,?LOG_EXCH_NAME),
topic, true, false, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 380ff24354..970e874a42 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -90,15 +90,15 @@ recover_durable_exchanges() ->
end, ok, durable_exchanges)
end).
-declare(NameBin, Type, Durable, AutoDelete, Args) ->
- Exchange = #exchange{name = NameBin,
+declare(Resource = #resource{name = Name}, Type, Durable, AutoDelete, Args) ->
+ Exchange = #exchange{name = Resource,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({exchange, NameBin}) of
+ case mnesia:wread({exchange, Resource}) of
[] -> ok = mnesia:write(Exchange),
if Durable ->
ok = mnesia:write(
@@ -131,9 +131,8 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
lookup(Name) ->
rabbit_misc:dirty_read({exchange, Name}).
-lookup_or_die(#resource{name = Name}) -> lookup_or_die(Name);
-lookup_or_die(Name) ->
- case lookup(Name) of
+lookup_or_die(Resource = #resource{name = Name}) ->
+ case lookup(Resource) of
{ok, X} -> X;
{error, not_found} ->
rabbit_misc:protocol_error(
@@ -152,6 +151,7 @@ list_exchange_bindings(Name) ->
queue = QueueName} <- Handlers].
bindings_for_exchange(Name) ->
+ Q1 = qlc:e(qlc:q([B1 || B1 = #binding{} <- mnesia:table(binding)])),
qlc:e(qlc:q([B ||
B = #binding{key = K} <- mnesia:table(binding),
element(1, K) == Name])).
@@ -216,7 +216,7 @@ delivery_key_for_type(fanout, Name, _RoutingKey) ->
delivery_key_for_type(_Type, Name, RoutingKey) ->
{Name, RoutingKey}.
-call_with_exchange(#resource{name = Name}, Fun) -> call_with_exchange(Name, Fun);
+%call_with_exchange(R = #resource{name = Name}, Fun) -> call_with_exchange(R, Fun);
call_with_exchange(Name, Fun) ->
case mnesia:wread({exchange, Name}) of
[] -> {error, not_found};
@@ -334,7 +334,6 @@ last_topic_match(P, R, []) ->
last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-delete(#resource{name = ExchangeName}, IfUnused) -> delete(ExchangeName, IfUnused);
delete(ExchangeName, IfUnused) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> internal_delete(ExchangeName, IfUnused) end).