summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@lshift.net>2010-01-14 13:23:59 +0000
committerMichael Bridgen <mikeb@lshift.net>2010-01-14 13:23:59 +0000
commit86fb3e2f089653d3951bd564633eb5c7ae156ad1 (patch)
tree63aed86c356425a6081404d81e9972df929d44c4
parent2778cb7181909ab5efbd46d5191ade2b85718c66 (diff)
downloadrabbitmq-server-git-86fb3e2f089653d3951bd564633eb5c7ae156ad1.tar.gz
Change to (almost) the mooted API, and support recover and validate.
-rw-r--r--include/rabbit_exchange_behaviour_spec.hrl3
-rw-r--r--src/rabbit_exchange.erl27
-rw-r--r--src/rabbit_exchange_behaviour.erl5
-rw-r--r--src/rabbit_exchange_type_direct.erl5
-rw-r--r--src/rabbit_exchange_type_fanout.erl5
-rw-r--r--src/rabbit_exchange_type_headers.erl5
-rw-r--r--src/rabbit_exchange_type_topic.erl5
-rw-r--r--src/rabbit_misc.erl22
8 files changed, 46 insertions, 31 deletions
diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl
index 7e965fc796..9cd47aaf6b 100644
--- a/include/rabbit_exchange_behaviour_spec.hrl
+++ b/include/rabbit_exchange_behaviour_spec.hrl
@@ -32,8 +32,9 @@
-spec(description/0 :: () -> [{atom(), any()}]).
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(declare/1 :: (exchange()) -> 'ok').
+-spec(validate/1 :: (exchange()) -> 'ok').
-spec(init/1 :: (exchange()) -> 'ok').
+-spec(recover/1 :: (exchange()) -> 'ok').
-spec(delete/1 :: (exchange()) -> 'ok').
-spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
-spec(delete_binding/2 :: (exchange(), binding()) -> 'ok').
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 0e7defab0c..fab53c4bb3 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -95,17 +95,23 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
- ok = rabbit_misc:table_foreach(
- fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
- Exchange, write)
- end, rabbit_durable_exchange),
- ok = rabbit_misc:table_foreach(
- fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
+ [begin
+ #exchange{ type = Type } = X,
+ Type:recover(X)
+ end || X <-
+ rabbit_misc:table_fold(
+ fun(Exchange, Acc) -> ok = mnesia:write(rabbit_exchange,
+ Exchange, write),
+ [Exchange | Acc]
+ end, [], rabbit_durable_exchange)],
+ ok = rabbit_misc:table_fold(
+ fun(Route, ok) -> {_, ReverseRoute} = route_with_reverse(Route),
ok = mnesia:write(rabbit_route,
Route, write),
ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write)
- end, rabbit_durable_route).
+ ReverseRoute, write),
+ ok
+ end, ok, rabbit_durable_route).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -113,6 +119,9 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
+ %% Don't ignore the return value; we want to upset things if it
+ %% isn't ok.
+ ok = Type:validate(Exchange),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, ExchangeName}) of
@@ -127,7 +136,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
end
end) of
{new, X} ->
- ok = Type:declare(X),
+ ok = Type:init(X),
X;
{existing, X} ->
X;
diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl
index 4b275c0059..58da4041d4 100644
--- a/src/rabbit_exchange_behaviour.erl
+++ b/src/rabbit_exchange_behaviour.erl
@@ -38,8 +38,9 @@ behaviour_info(callbacks) ->
{description, 0},
{publish, 2},
- {declare, 1}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
- {init, 1}, %% called after declaration when previously absent, or during recovery
+ {validate, 1}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
+ {init, 1}, %% called after declaration when previously absent
+ {recover, 1}, %% called when recovering
{delete, 1}, %% called after exchange deletion
{add_binding, 2}, %% called after a binding has been added
{delete_binding, 2} %% called after a binding has been deleted
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index dff06b2538..dedd5b694c 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
description() ->
@@ -46,8 +46,9 @@ publish(#exchange{name = Name},
Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery).
-declare(_X) -> ok.
+validate(_X) -> ok.
init(_X) -> ok.
+recover(_X) -> ok.
delete(_X) -> ok.
add_binding(_X, _B) -> ok.
delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index b4654b0c8a..c4cae9e5af 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
description() ->
@@ -45,8 +45,9 @@ description() ->
publish(#exchange{name = Name}, Delivery) ->
rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
-declare(_X) -> ok.
+validate(_X) -> ok.
init(_X) -> ok.
+recover(_X) -> ok.
delete(_X) -> ok.
add_binding(_X, _B) -> ok.
delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index f28bfdc758..40f905ab91 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -36,7 +36,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
-ifdef(use_specs).
@@ -120,8 +120,9 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
end,
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
-declare(_X) -> ok.
+validate(_X) -> ok.
init(_X) -> ok.
+recover(_X) -> ok.
delete(_X) -> ok.
add_binding(_X, _B) -> ok.
delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index ecb6580767..b5eac0defe 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
-export([topic_matches/2]).
@@ -83,8 +83,9 @@ last_topic_match(P, R, []) ->
last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-declare(_X) -> ok.
+validate(_X) -> ok.
init(_X) -> ok.
+recover(_X) -> ok.
delete(_X) -> ok.
add_binding(_X, _B) -> ok.
delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 97c96fc771..d2b878058c 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -49,7 +49,7 @@
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
--export([table_foreach/2]).
+-export([table_fold/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
@@ -113,7 +113,7 @@
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
+-spec(table_fold/3 :: (fun ((any()) -> any()), any(), atom()) -> any()).
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
@@ -351,20 +351,20 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
-%% For each entry in a table, execute a function in a transaction.
-%% This is often far more efficient than wrapping a tx around the lot.
+%% Fold over each entry in a table, executing the cons function in a
+%% transaction. This is often far more efficient than wrapping a tx
+%% around the lot.
%%
%% We ignore entries that have been modified or removed.
-table_foreach(F, TableName) ->
- lists:foreach(
- fun (E) -> execute_mnesia_transaction(
+table_fold(F, Acc0, TableName) ->
+ lists:foldl(
+ fun (E, Acc) -> execute_mnesia_transaction(
fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> ok;
- _ -> F(E)
+ [] -> Acc;
+ _ -> F(E, Acc)
end
end)
- end, dirty_read_all(TableName)),
- ok.
+ end, Acc0, dirty_read_all(TableName)).
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).