summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-24 10:46:34 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-03-24 10:46:34 +0000
commitf8b346795475c63bcf6b9d289bf57af6a77c6ada (patch)
tree168d052e62e820cab223e505b08b95df69f2a97f /src
parent9b1e99e3c1520798e9b8ca796d7a3bc53bf2ff55 (diff)
parentd055ecafc5ca06b046fe583ee1d48191cf8bd0d0 (diff)
downloadrabbitmq-server-git-f8b346795475c63bcf6b9d289bf57af6a77c6ada.tar.gz
Merge from default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_binding.erl68
-rw-r--r--src/rabbit_exchange.erl33
-rw-r--r--src/rabbit_exchange_type.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl3
-rw-r--r--src/rabbit_exchange_type_fanout.erl3
-rw-r--r--src/rabbit_exchange_type_headers.erl3
-rw-r--r--src/rabbit_exchange_type_topic.erl15
-rw-r--r--src/rabbit_misc.erl14
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl6
12 files changed, 127 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c7391965d7..80dcb79a4a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -214,7 +214,13 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case mnesia:read({rabbit_durable_queue, QueueName}) of
[] -> ok = store_queue(Q),
B = add_default_binding(Q),
- fun (Tx) -> B(Tx), Q end;
+ fun (Tx) ->
+ R = B(Tx),
+ case Tx of
+ transaction -> R;
+ _ -> Q
+ end
+ end;
%% Q exists on stopped node
[_] -> rabbit_misc:const(not_found)
end;
@@ -433,8 +439,8 @@ internal_delete(QueueName) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- fun (Tx) -> ok = rabbit_binding:process_deletions(
- Deletions, Tx)
+ fun (Tx) -> rabbit_binding:process_deletions(
+ Deletions, Tx)
end
end
end).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 6167790e58..cc7aea33bb 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -127,8 +127,7 @@ add(Binding, InnerFun) ->
fun (Tx) ->
ok = rabbit_exchange:callback(
Src, add_binding, [Tx, Src, B]),
- rabbit_event:notify_if(
- not Tx, binding_created, info(B))
+ process_addition(Src, B, Tx)
end;
[_] -> fun rabbit_misc:const_ok/1
end;
@@ -161,7 +160,7 @@ remove(Binding, InnerFun) ->
{error, _} = Err ->
rabbit_misc:const(Err);
{ok, Deletions} ->
- fun (Tx) -> ok = process_deletions(Deletions, Tx) end
+ fun (Tx) -> process_deletions(Deletions, Tx) end
end
end).
@@ -405,19 +404,66 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions, Tx) ->
+process_addition(Src, _B, transaction) ->
+ serial(Src);
+
+process_addition(_Src, B, _Serial) ->
+ ok = rabbit_event:notify(binding_created, info(B)).
+
+process_deletions(Deletions, transaction) ->
+ process_deletions(
+ fun (X, Bindings, Acc) ->
+ pd_callback(transaction, remove_bindings, X, Bindings),
+ dict:store(X, serial(X), Acc)
+ end,
+ fun (X, Bindings, Acc) ->
+ pd_callback(transaction, delete, X, Bindings),
+ dict:store(X, serial(X), Acc)
+ end,
+ Deletions, dict:new(), true);
+
+process_deletions(Deletions, Serials) ->
+ process_deletions(
+ fun (X, Bindings, Acc) ->
+ pd_callback(dict:fetch(X, Serials), remove_bindings, X, Bindings),
+ Acc
+ end,
+ fun (X, Bindings, Acc) ->
+ pd_callback(dict:fetch(X, Serials), delete, X, Bindings),
+ rabbit_event:notify(exchange_deleted, [{name, X#exchange.name}]),
+ Acc
+ end,
+ Deletions, ok, false).
+
+process_deletions(NotDeletedFun, DeletedFun, Deletions, Acc0, Tx) ->
dict:fold(
- fun (_XName, {X, Deleted, Bindings}, ok) ->
+ fun (_XName, {X, Deleted, Bindings}, Acc) ->
FlatBindings = lists:flatten(Bindings),
[rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
B <- FlatBindings],
case Deleted of
not_deleted ->
- rabbit_exchange:callback(X, remove_bindings,
- [Tx, X, FlatBindings]);
+ NotDeletedFun(X, FlatBindings, Acc);
deleted ->
- rabbit_event:notify_if(not Tx, exchange_deleted,
- [{name, X#exchange.name}]),
- rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings])
+ DeletedFun(X, FlatBindings, Acc)
end
- end, ok, Deletions).
+ end, Acc0, Deletions).
+
+pd_callback(Arg, CB, X, Bindings) ->
+ ok = rabbit_exchange:callback(X, CB, [Arg, X, Bindings]).
+
+serial(X) ->
+ case rabbit_exchange:callback(X, serialise_events, [X]) of
+ true -> next_serial(X);
+ false -> none
+ end.
+
+next_serial(#exchange{name = Name}) ->
+ Prev = case mnesia:read(rabbit_exchange_serial, Name, write) of
+ [] -> 0;
+ [#exchange_serial{serial = S}] -> S
+ end,
+ Serial = Prev + 1,
+ mnesia:write(rabbit_exchange_serial,
+ #exchange_serial{name = Name, serial = Serial}, write),
+ Serial.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a463e57067..504cf93517 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -72,7 +72,8 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok').
+-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) ->
+ boolean() | 'ok').
-endif.
@@ -126,7 +127,15 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- callback(Exchange, create, [Tx, Exchange]),
+ S = case Tx of
+ true -> transaction;
+ false -> case callback(Exchange, serialise_events,
+ [Exchange]) of
+ true -> 0;
+ false -> none
+ end
+ end,
+ callback(Exchange, create, [S, Exchange]),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
@@ -264,12 +273,13 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, [QName | QNames]}.
call_with_exchange(XName, Fun, PrePostCommitFun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:read({rabbit_exchange, XName}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
- end
- end, PrePostCommitFun).
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> Result = case mnesia:read({rabbit_exchange, XName}) of
+ [] -> {error, not_found};
+ [X] -> Fun(X)
+ end,
+ fun(Tx) -> PrePostCommitFun(Result, Tx) end
+ end).
delete(XName, IfUnused) ->
call_with_exchange(
@@ -279,9 +289,9 @@ delete(XName, IfUnused) ->
false -> fun unconditional_delete/1
end,
fun ({deleted, X, Bs, Deletions}, Tx) ->
- ok = rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions), Tx);
+ rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions), Tx);
(Error = {error, _InUseOrNotFound}, _Tx) ->
Error
end).
@@ -306,5 +316,6 @@ conditional_delete(X = #exchange{name = XName}) ->
unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
+ ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 547583e9ac..670551de08 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -21,6 +21,12 @@
behaviour_info(callbacks) ->
[
{description, 0},
+
+ %% Should Rabbit ensure that all events delivered to an individual exchange
+ %% this can be serialised? (they might still be delivered out
+ %% of order, but there'll be a serial number).
+ {serialise_events, 1},
+
{route, 2},
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 349c2f6ee4..bc7a76e30b 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, route/2, serialise_events/1]).
-export([validate/1, create/2, recover/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -39,6 +39,7 @@ route(#exchange{name = Name},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
rabbit_router:match_routing_key(Name, Routes).
+serialise_events(_X) -> false.
validate(_X) -> ok.
create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index bc5293c81d..2e70fb24f2 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, route/2, serialise_events/1]).
-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -38,6 +38,7 @@ description() ->
route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
+serialise_events(_X) -> false.
validate(_X) -> ok.
create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index d3529b0657..1e8b0687a0 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, route/2, serialise_events/1]).
-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -112,6 +112,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
end,
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+serialise_events(_X) -> false.
validate(_X) -> ok.
create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index ffd1e58395..e3fd9283b1 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, route/2, serialise_events/1]).
-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -46,6 +46,7 @@ route(#exchange{name = X},
mnesia:async_dirty(fun trie_match/2, [X, Words])
end || RKey <- Routes]).
+serialise_events(_X) -> false.
validate(_X) -> ok.
create(_Tx, _X) -> ok.
@@ -55,19 +56,19 @@ recover(_Exchange, Bs) ->
lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
end).
-delete(true, #exchange{name = X}, _Bs) ->
+delete(transaction, #exchange{name = X}, _Bs) ->
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
ok;
-delete(false, _Exchange, _Bs) ->
+delete(none, _Exchange, _Bs) ->
ok.
-add_binding(true, _Exchange, Binding) ->
+add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
-add_binding(false, _Exchange, _Binding) ->
+add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(true, #exchange{name = X}, Bs) ->
+remove_bindings(transaction, #exchange{name = X}, Bs) ->
%% The remove process is split into two distinct phases. In the
%% first phase we gather the lists of bindings and edges to
%% delete, then in the second phase we process all the
@@ -86,7 +87,7 @@ remove_bindings(true, #exchange{name = X}, Bs) ->
[trie_remove_edge(X, Parent, Node, W) ||
{Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
ok;
-remove_bindings(false, _X, _Bs) ->
+remove_bindings(none, _X, _Bs) ->
ok.
maybe_add_path(_X, [{root, none}], PathAcc) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2e9563cf3c..3f0bc9bb38 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -409,13 +409,13 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
execute_mnesia_tx_with_tail(TxFun) ->
case mnesia:is_transaction() of
true -> execute_mnesia_transaction(TxFun);
- false -> TailFun = execute_mnesia_transaction(
- fun () ->
- TailFun1 = TxFun(),
- TailFun1(true),
- TailFun1
- end),
- TailFun(false)
+ false -> {TailFun, TailRes} = execute_mnesia_transaction(
+ fun () ->
+ TailFun1 = TxFun(),
+ Res1 = TailFun1(transaction),
+ {TailFun1, Res1}
+ end),
+ TailFun(TailRes)
end.
ensure_ok(ok, _) -> ok;
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index fbcf07ae77..c73f557d03 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -220,6 +220,10 @@ table_definitions() ->
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
{match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange_serial,
+ [{record_name, exchange_serial},
+ {attributes, record_info(fields, exchange_serial)},
+ {match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ca046c9198..5a37c31a71 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -694,8 +694,8 @@ test_topic_matching() ->
exchange_op_callback(X, Fun, ExtraArgs) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
- rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs).
+ fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ ExtraArgs) end),
+ rabbit_exchange:callback(X, Fun, [none, X] ++ ExtraArgs).
test_topic_expect_match(X, List) ->
lists:foreach(
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7567c29ef3..28aee9c91a 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -26,6 +26,7 @@
-rabbit_upgrade({internal_exchanges, mnesia, []}).
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
-rabbit_upgrade({topic_trie, mnesia, []}).
+-rabbit_upgrade({exchange_event_serialisation, mnesia, []}).
%% -------------------------------------------------------------------
@@ -37,6 +38,7 @@
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
+-spec(exchange_event_serialisation/0 :: () -> 'ok').
-endif.
@@ -101,6 +103,10 @@ topic_trie() ->
{attributes, [trie_binding, value]},
{type, ordered_set}]).
+exchange_event_serialisation() ->
+ create(rabbit_exchange_serial, [{record_name, exchange_serial},
+ {attributes, [name, serial]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->