summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-28 21:20:46 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-28 21:20:46 +0100
commite52c5e65eea60e770631d80735cc1bd63d56c12b (patch)
tree59386f4b2090eae29d6398a4e2168f8989ef3a6a /src
parent1c2cfb95baaf6654360c6156776ec6e5a9606a77 (diff)
parent4e16d0c2d08c7ba8935427afcf215de7cc838a7b (diff)
downloadrabbitmq-server-git-e52c5e65eea60e770631d80735cc1bd63d56c12b.tar.gz
merge default into bug23939
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl28
-rw-r--r--src/rabbit_binding.erl89
-rw-r--r--src/rabbit_exchange.erl48
-rw-r--r--src/rabbit_exchange_type.erl7
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_exchange_type_topic.erl16
-rw-r--r--src/rabbit_misc.erl17
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl6
12 files changed, 144 insertions, 87 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c7391965d7..167b1a55ac 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -214,7 +214,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case mnesia:read({rabbit_durable_queue, QueueName}) of
[] -> ok = store_queue(Q),
B = add_default_binding(Q),
- fun (Tx) -> B(Tx), Q end;
+ fun () -> B(), Q end;
%% Q exists on stopped node
[_] -> rabbit_misc:const(not_found)
end;
@@ -222,7 +222,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
false -> TailFun = internal_delete(QueueName),
- fun (Tx) -> TailFun(Tx), ExistingQ end
+ fun () -> TailFun(), ExistingQ end
end
end
end).
@@ -433,9 +433,7 @@ internal_delete(QueueName) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- fun (Tx) -> ok = rabbit_binding:process_deletions(
- Deletions, Tx)
- end
+ rabbit_binding:process_deletions(Deletions)
end
end).
@@ -464,18 +462,14 @@ drop_expired(QPid) ->
gen_server2:cast(QPid, drop_expired).
on_node_down(Node) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end,
- fun (Deletions, Tx) ->
- rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(),
- Deletions),
- Tx)
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node])),
+ rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels))
end).
delete_queue(QueueName) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 6167790e58..1336223244 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -21,7 +21,7 @@
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
- process_deletions/2]).
+ process_deletions/1]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
@@ -77,7 +77,7 @@
(rabbit_types:binding_destination()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
--spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok').
+-spec(process_deletions/1 :: (deletions()) -> 'ok').
-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
-spec(add_deletion/3 :: (rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
@@ -122,21 +122,23 @@ add(Binding, InnerFun) ->
case InnerFun(Src, Dst) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> ok = sync_binding(B, all_durable([Src, Dst]),
- fun mnesia:write/3),
- fun (Tx) ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Tx, Src, B]),
- rabbit_event:notify_if(
- not Tx, binding_created, info(B))
- end;
- [_] -> fun rabbit_misc:const_ok/1
+ [] -> add_notify(Src, Dst, B);
+ [_] -> fun rabbit_misc:const_ok/0
end;
{error, _} = Err ->
rabbit_misc:const(Err)
end
end).
+add_notify(Src, Dst, B) ->
+ ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:write/3),
+ ok = rabbit_exchange:callback(Src, add_binding, [transaction, Src, B]),
+ Serial = serial(Src),
+ fun () ->
+ ok = rabbit_exchange:callback(Src, add_binding, [Serial, Src, B]),
+ ok = rabbit_event:notify(binding_created, info(B))
+ end.
+
remove(Binding, InnerFun) ->
binding_action(
Binding,
@@ -158,10 +160,8 @@ remove(Binding, InnerFun) ->
end
end,
case Result of
- {error, _} = Err ->
- rabbit_misc:const(Err);
- {ok, Deletions} ->
- fun (Tx) -> ok = process_deletions(Deletions, Tx) end
+ {error, _} = Err -> rabbit_misc:const(Err);
+ {ok, Deletions} -> process_deletions(Deletions)
end
end).
@@ -405,19 +405,46 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions, Tx) ->
- dict:fold(
- fun (_XName, {X, Deleted, Bindings}, ok) ->
- FlatBindings = lists:flatten(Bindings),
- [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
- B <- FlatBindings],
- case Deleted of
- not_deleted ->
- rabbit_exchange:callback(X, remove_bindings,
- [Tx, X, FlatBindings]);
- deleted ->
- rabbit_event:notify_if(not Tx, exchange_deleted,
- [{name, X#exchange.name}]),
- rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings])
- end
- end, ok, Deletions).
+process_deletions(Deletions) ->
+ Serials = dict:fold(
+ fun (_XName, {X, Deleted, Bindings}, Acc) ->
+ FlatBindings = lists:flatten(Bindings),
+ pd_callback(transaction, X, Deleted, FlatBindings),
+ dict:store(X, serial(X), Acc)
+ end, Deletions, dict:new()),
+ fun() ->
+ dict:fold(
+ fun (XName, {X, Deleted, Bindings}, ok) ->
+ FlatBindings = lists:flatten(Bindings),
+ Serial = dict:fetch(X, Serials),
+ pd_callback(Serial, X, Deleted, FlatBindings),
+ [rabbit_event:notify(binding_deleted, info(B)) ||
+ B <- FlatBindings],
+ case Deleted of
+ deleted -> ok = rabbit_event:notify(
+ exchange_deleted, [{name, XName}]);
+ _ -> ok
+ end
+ end, Deletions, ok)
+ end.
+
+pd_callback(Arg, X, Deleted, Bindings) ->
+ ok = rabbit_exchange:callback(X, case Deleted of
+ not_deleted -> remove_bindings;
+ deleted -> delete
+ end, [Arg, X, Bindings]).
+
+serial(X) ->
+ case rabbit_exchange:serialise_events(X) of
+ true -> next_serial(X);
+ false -> none
+ end.
+
+next_serial(#exchange{name = Name}) ->
+ Serial = case mnesia:read(rabbit_exchange_serial, Name, write) of
+ [] -> 1;
+ [#exchange_serial{serial = S}] -> S + 1
+ end,
+ mnesia:write(rabbit_exchange_serial,
+ #exchange_serial{name = Name, serial = Serial}, write),
+ Serial.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index cab6510bdd..6801705279 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -22,7 +22,7 @@
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
- publish/2, delete/2]).
+ publish/2, delete/2, serialise_events/1]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
@@ -72,10 +72,10 @@
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
+-spec(serialise_events/1:: (rabbit_types:exchange()) -> boolean()).
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
-
-endif.
%%----------------------------------------------------------------------------
@@ -131,6 +131,13 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
+ S = case Tx of
+ true -> transaction;
+ false -> case serialise_events(Exchange) of
+ true -> 0;
+ false -> none
+ end
+ end,
ok = (type_to_module(Type)):create(Tx, Exchange),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
@@ -268,29 +275,37 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
-call_with_exchange(XName, Fun, PrePostCommitFun) ->
- rabbit_misc:execute_mnesia_transaction(
+call_with_exchange(XName, Fun) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
- [] -> {error, not_found};
+ [] -> rabbit_misc:const({error, not_found});
[X] -> Fun(X)
end
- end, PrePostCommitFun).
+ end).
delete(XName, IfUnused) ->
+ delete0(XName, case IfUnused of
+ true -> fun conditional_delete/1;
+ false -> fun unconditional_delete/1
+ end).
+
+delete0(XName, Fun) ->
call_with_exchange(
XName,
- case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
- end,
- fun ({deleted, X, Bs, Deletions}, Tx) ->
- ok = rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions), Tx);
- (Error = {error, _InUseOrNotFound}, _Tx) ->
- Error
+ fun (X) ->
+ case Fun(X) of
+ {deleted, X, Bs, Deletions} ->
+ rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions));
+ {error, _InUseOrNotFound} = E ->
+ rabbit_misc:const(E)
+ end
end).
+serialise_events(#exchange{type = XType}) ->
+ apply(type_to_module(XType), serialise_events, []).
+
maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
maybe_auto_delete(#exchange{auto_delete = true} = X) ->
@@ -308,5 +323,6 @@ conditional_delete(X = #exchange{name = XName}) ->
unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
+ ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 547583e9ac..d1563a6211 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -21,6 +21,13 @@
behaviour_info(callbacks) ->
[
{description, 0},
+
+ %% Should Rabbit ensure that all binding events that are
+ %% delivered to an individual exchange can be serialised? (they
+ %% might still be delivered out of order, but there'll be a
+ %% serial number).
+ {serialise_events, 0},
+
{route, 2},
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 349c2f6ee4..687567a8cd 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, recover/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -35,6 +35,8 @@ description() ->
[{name, <<"direct">>},
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
route(#exchange{name = Name},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
rabbit_router:match_routing_key(Name, Routes).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index bc5293c81d..cbde0dd2c5 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -35,6 +35,8 @@ description() ->
[{name, <<"fanout">>},
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index d3529b0657..89f8fcfbf9 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -41,6 +41,8 @@ description() ->
[{name, <<"headers">>},
{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
route(#exchange{name = Name},
#delivery{message = #basic_message{content = Content}}) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index ffd1e58395..7f3d83e05c 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -38,6 +38,8 @@ description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
%% NB: This may return duplicate results in some situations (that's ok)
route(#exchange{name = X},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
@@ -55,19 +57,19 @@ recover(_Exchange, Bs) ->
lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
end).
-delete(true, #exchange{name = X}, _Bs) ->
+delete(transaction, #exchange{name = X}, _Bs) ->
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
ok;
-delete(false, _Exchange, _Bs) ->
+delete(none, _Exchange, _Bs) ->
ok.
-add_binding(true, _Exchange, Binding) ->
+add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
-add_binding(false, _Exchange, _Binding) ->
+add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(true, #exchange{name = X}, Bs) ->
+remove_bindings(transaction, #exchange{name = X}, Bs) ->
%% The remove process is split into two distinct phases. In the
%% first phase we gather the lists of bindings and edges to
%% delete, then in the second phase we process all the
@@ -86,7 +88,7 @@ remove_bindings(true, #exchange{name = X}, Bs) ->
[trie_remove_edge(X, Parent, Node, W) ||
{Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
ok;
-remove_bindings(false, _X, _Bs) ->
+remove_bindings(none, _X, _Bs) ->
ok.
maybe_add_path(_X, [{root, none}], PathAcc) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2e9563cf3c..45f599993a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -54,7 +54,7 @@
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([lock_file/1]).
--export([const_ok/1, const/1]).
+-export([const_ok/0, const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
@@ -191,7 +191,7 @@
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
--spec(const_ok/1 :: (any()) -> 'ok').
+-spec(const_ok/0 :: () -> 'ok').
-spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
@@ -409,13 +409,8 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
execute_mnesia_tx_with_tail(TxFun) ->
case mnesia:is_transaction() of
true -> execute_mnesia_transaction(TxFun);
- false -> TailFun = execute_mnesia_transaction(
- fun () ->
- TailFun1 = TxFun(),
- TailFun1(true),
- TailFun1
- end),
- TailFun(false)
+ false -> TailFun = execute_mnesia_transaction(TxFun),
+ TailFun()
end.
ensure_ok(ok, _) -> ok;
@@ -847,8 +842,8 @@ lock_file(Path) ->
ok = file:close(Lock)
end.
-const_ok(_) -> ok.
-const(X) -> fun (_) -> X end.
+const_ok() -> ok.
+const(X) -> fun () -> X end.
%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
%% when IPv6 is enabled but not used (i.e. 99% of the time).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index fbcf07ae77..c73f557d03 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -220,6 +220,10 @@ table_definitions() ->
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
{match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange_serial,
+ [{record_name, exchange_serial},
+ {attributes, record_info(fields, exchange_serial)},
+ {match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ca046c9198..5a37c31a71 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -694,8 +694,8 @@ test_topic_matching() ->
exchange_op_callback(X, Fun, ExtraArgs) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
- rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs).
+ fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ ExtraArgs) end),
+ rabbit_exchange:callback(X, Fun, [none, X] ++ ExtraArgs).
test_topic_expect_match(X, List) ->
lists:foreach(
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7567c29ef3..7c53e99694 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -26,6 +26,7 @@
-rabbit_upgrade({internal_exchanges, mnesia, []}).
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
-rabbit_upgrade({topic_trie, mnesia, []}).
+-rabbit_upgrade({exchange_event_serial, mnesia, []}).
%% -------------------------------------------------------------------
@@ -37,6 +38,7 @@
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
+-spec(exchange_event_serial/0 :: () -> 'ok').
-endif.
@@ -101,6 +103,10 @@ topic_trie() ->
{attributes, [trie_binding, value]},
{type, ordered_set}]).
+exchange_event_serial() ->
+ create(rabbit_exchange_serial, [{record_name, exchange_serial},
+ {attributes, [name, serial]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->