summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-12 11:41:02 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-12 11:41:02 +0100
commit8eadc8328e8a4d046ee7bd9733b915fa1c823074 (patch)
tree59484f7470bf6c6175185069f2368b8b16ba2920 /src
parentc16a63ee4e19b129c438da7c0e0da4c8bf3785c6 (diff)
parent5b920f47ffb3af84853e447291d4e231c163fc12 (diff)
downloadrabbitmq-server-git-8eadc8328e8a4d046ee7bd9733b915fa1c823074.tar.gz
Merging default into bug23095
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_framing_channel.erl75
-rw-r--r--src/rabbit_mnesia.erl128
-rw-r--r--src/rabbit_plugin_activator.erl10
-rw-r--r--src/rabbit_reader.erl64
-rw-r--r--src/supervisor2.erl2
5 files changed, 175 insertions, 104 deletions
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 00b74ad010..553faaa814 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -35,18 +35,19 @@
-export([start_link/3, process/2, shutdown/1]).
%% internal
--export([mainloop/2]).
+-export([mainloop/3]).
%%--------------------------------------------------------------------
start_link(StartFun, StartArgs, Protocol) ->
+ Parent = self(),
{ok, spawn_link(
fun () ->
%% we trap exits so that a normal termination of
%% the channel or reader process terminates us too.
process_flag(trap_exit, true),
{ok, ChannelPid} = apply(StartFun, StartArgs),
- mainloop(ChannelPid, Protocol)
+ mainloop(Parent, ChannelPid, Protocol)
end)}.
process(Pid, Frame) ->
@@ -73,46 +74,55 @@ read_frame(ChannelPid) ->
Msg -> exit({unexpected_message, Msg})
end.
-mainloop(ChannelPid, Protocol) ->
+mainloop(Parent, ChannelPid, Protocol) ->
case read_frame(ChannelPid) of
{method, MethodName, FieldsBin} ->
Method = Protocol:decode_method_fields(MethodName, FieldsBin),
case Protocol:method_has_content(MethodName) of
true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
- rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid,
- ClassId,
- Protocol));
- false -> rabbit_channel:do(ChannelPid, Method)
- end,
- ?MODULE:mainloop(ChannelPid, Protocol);
+ case collect_content(ChannelPid, ClassId, Protocol) of
+ {ok, Content} ->
+ rabbit_channel:do(ChannelPid, Method, Content),
+ ?MODULE:mainloop(Parent, ChannelPid, Protocol);
+ {error, Reason} ->
+ channel_exit(Parent, Reason, MethodName)
+ end;
+ false -> rabbit_channel:do(ChannelPid, Method),
+ ?MODULE:mainloop(Parent, ChannelPid, Protocol)
+ end;
_ ->
- unexpected_frame("expected method frame, "
- "got non method frame instead",
- [])
+ channel_exit(Parent, {unexpected_frame,
+ "expected method frame, "
+ "got non method frame instead",
+ []}, none)
end.
collect_content(ChannelPid, ClassId, Protocol) ->
case read_frame(ChannelPid) of
{content_header, ClassId, 0, BodySize, PropertiesBin} ->
- Payload = collect_content_payload(ChannelPid, BodySize, []),
- #content{class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- protocol = Protocol,
- payload_fragments_rev = Payload};
+ case collect_content_payload(ChannelPid, BodySize, []) of
+ {ok, Payload} -> {ok, #content{
+ class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ protocol = Protocol,
+ payload_fragments_rev = Payload}};
+ Error -> Error
+ end;
{content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- unexpected_frame("expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]);
+ {error, {unexpected_frame,
+ "expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]}};
_ ->
- unexpected_frame("expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId])
+ {error, {unexpected_frame,
+ "expected content header for class ~w, "
+ "got non content header frame instead",
+ [ClassId]}}
end.
collect_content_payload(_ChannelPid, 0, Acc) ->
- Acc;
+ {ok, Acc};
collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
case read_frame(ChannelPid) of
{content_body, FragmentBin} ->
@@ -120,10 +130,13 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
RemainingByteCount - size(FragmentBin),
[FragmentBin | Acc]);
_ ->
- unexpected_frame("expected content body, "
- "got non content body frame instead",
- [])
+ {error, {unexpected_frame,
+ "expected content body, "
+ "got non content body frame instead",
+ []}}
end.
-unexpected_frame(ExplanationFormat, Params) ->
- rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params).
+channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) ->
+ Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params,
+ MethodName),
+ Parent ! {channel_exit, self(), Reason}.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 505dc28fe8..4a5adfaeed 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -91,7 +91,6 @@ init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true),
- ok = wait_for_tables(),
ok.
is_db_empty() ->
@@ -114,7 +113,6 @@ cluster(ClusterNodes, Force) ->
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
try
ok = init_db(ClusterNodes, Force),
- ok = wait_for_tables(),
ok = create_cluster_nodes_config(ClusterNodes)
after
mnesia:stop()
@@ -157,57 +155,87 @@ table_definitions() ->
[{rabbit_user,
[{record_name, user},
{attributes, record_info(fields, user)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #user{_='_'}}]},
{rabbit_user_permission,
[{record_name, user_permission},
{attributes, record_info(fields, user_permission)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #user_permission{user_vhost = #user_vhost{_='_'},
+ permission = #permission{_='_'},
+ _='_'}}]},
{rabbit_vhost,
[{record_name, vhost},
{attributes, record_info(fields, vhost)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #vhost{_='_'}}]},
{rabbit_config,
[{attributes, [key, val]}, % same mnesia's default
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, {rabbit_config, '_', '_'}}]},
{rabbit_listener,
[{record_name, listener},
{attributes, record_info(fields, listener)},
- {type, bag}]},
+ {type, bag},
+ {match, #listener{_='_'}}]},
{rabbit_durable_route,
[{record_name, route},
{attributes, record_info(fields, route)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_route,
[{record_name, route},
{attributes, record_info(fields, route)},
- {type, ordered_set}]},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_reverse_route,
[{record_name, reverse_route},
{attributes, record_info(fields, reverse_route)},
- {type, ordered_set}]},
+ {type, ordered_set},
+ {match, #reverse_route{reverse_binding = reverse_binding_match(),
+ _='_'}}]},
%% Consider the implications to nodes_of_type/1 before altering
%% the next entry.
{rabbit_durable_exchange,
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
{rabbit_exchange,
[{record_name, exchange},
- {attributes, record_info(fields, exchange)}]},
+ {attributes, record_info(fields, exchange)},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]},
{rabbit_queue,
[{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)}]}].
+ {attributes, record_info(fields, amqqueue)},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}].
+
+binding_match() ->
+ #binding{queue_name = queue_name_match(),
+ exchange_name = exchange_name_match(),
+ _='_'}.
+reverse_binding_match() ->
+ #reverse_binding{queue_name = queue_name_match(),
+ exchange_name = exchange_name_match(),
+ _='_'}.
+exchange_name_match() ->
+ resource_match(exchange).
+queue_name_match() ->
+ resource_match(queue).
+resource_match(Kind) ->
+ #resource{kind = Kind, _='_'}.
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
replicated_table_names() ->
- [Tab || {Tab, Attrs} <- table_definitions(),
- not lists:member({local_content, true}, Attrs)
+ [Tab || {Tab, TabDef} <- table_definitions(),
+ not lists:member({local_content, true}, TabDef)
].
dir() -> mnesia:system_info(directory).
@@ -232,26 +260,55 @@ ensure_mnesia_not_running() ->
yes -> throw({error, mnesia_unexpectedly_running})
end.
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
check_schema_integrity() ->
- TabDefs = table_definitions(),
Tables = mnesia:system_info(tables),
- case [Error || Tab <- table_names(),
+ case [Error || {Tab, TabDef} <- table_definitions(),
case lists:member(Tab, Tables) of
false ->
Error = {table_missing, Tab},
true;
true ->
- {_, TabDef} = proplists:lookup(Tab, TabDefs),
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
Attrs = mnesia:table_info(Tab, attributes),
Error = {table_attributes_mismatch, Tab,
ExpAttrs, Attrs},
Attrs /= ExpAttrs
end] of
- [] -> ok;
+ [] -> check_table_integrity();
Errors -> {error, Errors}
end.
+check_table_integrity() ->
+ ok = wait_for_tables(),
+ case lists:all(fun ({Tab, TabDef}) ->
+ {_, Match} = proplists:lookup(match, TabDef),
+ read_test_table(Tab, Match)
+ end, table_definitions()) of
+ true -> ok;
+ false -> {error, invalid_table_content}
+ end.
+
+read_test_table(Tab, Match) ->
+ case mnesia:dirty_first(Tab) of
+ '$end_of_table' ->
+ true;
+ Key ->
+ ObjList = mnesia:dirty_read(Tab, Key),
+ MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
+ case ets:match_spec_run(ObjList, MatchComp) of
+ ObjList -> true;
+ _ -> false
+ end
+ end.
+
%% The cluster node config file contains some or all of the disk nodes
%% that are members of the cluster this node is / should be a part of.
%%
@@ -347,8 +404,9 @@ init_db(ClusterNodes, Force) ->
ok = create_local_table_copies(case IsDiskNode of
true -> disc;
false -> ram
- end)
- end;
+ end),
+ ok = ensure_schema_integrity()
+ end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
@@ -363,7 +421,9 @@ create_schema() ->
cannot_create_schema),
rabbit_misc:ensure_ok(mnesia:start(),
cannot_start_mnesia),
- create_tables().
+ ok = create_tables(),
+ ok = ensure_schema_integrity(),
+ ok = wait_for_tables().
move_db() ->
mnesia:stop(),
@@ -388,12 +448,13 @@ move_db() ->
ok.
create_tables() ->
- lists:foreach(fun ({Tab, TabArgs}) ->
- case mnesia:create_table(Tab, TabArgs) of
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
{atomic, ok} -> ok;
{aborted, Reason} ->
throw({error, {table_creation_failed,
- Tab, TabArgs, Reason}})
+ Tab, TabDef1, Reason}})
end
end,
table_definitions()),
@@ -448,17 +509,12 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
wait_for_tables() -> wait_for_tables(table_names()).
wait_for_tables(TableNames) ->
- case check_schema_integrity() of
- ok ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok -> ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
- end;
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok -> ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
{error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
+ throw({error, {failed_waiting_for_tables, Reason}})
end.
reset(Force) ->
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 35b75a23a9..c9f75be030 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -35,7 +35,6 @@
-define(DefaultPluginDir, "plugins").
-define(DefaultUnpackedPluginDir, "priv/plugins").
--define(DefaultRabbitEBin, "ebin").
-define(BaseApps, [rabbit]).
%%----------------------------------------------------------------------------
@@ -52,15 +51,15 @@
%%----------------------------------------------------------------------------
start() ->
+ io:format("Activating RabbitMQ plugins ..."),
%% Ensure Rabbit is loaded so we can access it's environment
application:load(rabbit),
%% Determine our various directories
PluginDir = get_env(plugins_dir, ?DefaultPluginDir),
UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir),
- RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin),
- RootName = RabbitEBin ++ "/rabbit",
+ RootName = UnpackedPluginDir ++ "/rabbit",
%% Unpack any .ez plugins
unpack_ez_plugins(PluginDir, UnpackedPluginDir),
@@ -86,7 +85,7 @@ start() ->
{erts, erlang:system_info(version)},
AppVersions},
- %% Write it out to ebin/rabbit.rel
+ %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
%% Compile the script
@@ -131,8 +130,9 @@ start() ->
ok -> ok;
error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
- io:format("~n~w plugins activated.~n~n", [length(PluginApps)]),
+ io:format("~n~w plugins activated:~n", [length(PluginApps)]),
[io:format("* ~w~n", [App]) || App <- PluginApps],
+ io:nl(),
halt(),
ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f947cd9053..d5ade90f6f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -37,7 +37,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/3]).
+-export([init/1, mainloop/2]).
-export([conserve_memory/2, server_properties/0]).
@@ -59,7 +59,7 @@
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_length, recv_ref,
+-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
@@ -186,7 +186,7 @@ init(Parent) ->
end.
system_continue(Parent, Deb, State) ->
- ?MODULE:mainloop(Parent, Deb, State).
+ ?MODULE:mainloop(Deb, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -273,22 +273,23 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
ProfilingValue = setup_profiling(),
{ok, Collector} = rabbit_queue_collector:start_link(),
try
- mainloop(Parent, Deb, switch_callback(
- #v1{sock = ClientSock,
- connection = #connection{
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
+ mainloop(Deb, switch_callback(
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
client_properties = none,
- protocol = none},
- callback = uninitialized_callback,
- recv_length = 0,
- recv_ref = none,
+ protocol = none},
+ callback = uninitialized_callback,
+ recv_length = 0,
+ recv_ref = none,
connection_state = pre_init,
- queue_collector = Collector,
- heartbeater = none,
- stats_timer =
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
rabbit_event:init_stats_timer()},
handshake, 8))
catch
@@ -314,15 +315,14 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
end,
done.
-mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
+mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
receive
{inet_async, Sock, Ref, {ok, Data}} ->
{State1, Callback1, Length1} =
handle_input(State#v1.callback, Data,
State#v1{recv_ref = none}),
- mainloop(Parent, Deb,
- switch_callback(State1, Callback1, Length1));
+ mainloop(Deb, switch_callback(State1, Callback1, Length1));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
State;
@@ -332,7 +332,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
{conserve_memory, Conserve} ->
- mainloop(Parent, Deb, internal_conserve_memory(Conserve, State));
+ mainloop(Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -348,16 +348,16 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
{channel_exit, Channel, Reason} ->
- mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
+ mainloop(Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
- mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
+ mainloop(Deb, handle_dependent_exit(Pid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
if ?IS_RUNNING(State) orelse
State#v1.connection_state =:= closing orelse
State#v1.connection_state =:= closed ->
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
true ->
throw({handshake_timeout, State#v1.callback})
end;
@@ -368,22 +368,21 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
gen_server:reply(From, ok),
case ForceTermination of
force -> ok;
- normal -> mainloop(Parent, Deb, NewState)
+ normal -> mainloop(Deb, NewState)
end;
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
{'$gen_call', From, {info, Items}} ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
{'$gen_cast', emit_stats} ->
internal_emit_stats(State),
- mainloop(Parent, Deb,
- State#v1{stats_timer =
- rabbit_event:reset_stats_timer_after(
- State#v1.stats_timer)});
+ mainloop(Deb, State#v1{stats_timer =
+ rabbit_event:reset_stats_timer_after(
+ State#v1.stats_timer)});
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -444,6 +443,9 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
+handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) ->
+ {channel, Channel} = get({chpid, ChPid}),
+ handle_exception(State, Channel, Reason);
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 4d2955d9cd..8788303752 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -563,7 +563,7 @@ restart(Child, State) ->
{terminate, NState} ->
report_error(shutdown, reached_max_restart_intensity,
Child, State#state.name),
- {shutdown, remove_child(Child, NState)}
+ {shutdown, state_del_child(Child, NState)}
end.
restart1(Child, State) ->