diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-12 11:41:02 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-12 11:41:02 +0100 |
| commit | 8eadc8328e8a4d046ee7bd9733b915fa1c823074 (patch) | |
| tree | 59484f7470bf6c6175185069f2368b8b16ba2920 /src | |
| parent | c16a63ee4e19b129c438da7c0e0da4c8bf3785c6 (diff) | |
| parent | 5b920f47ffb3af84853e447291d4e231c163fc12 (diff) | |
| download | rabbitmq-server-git-8eadc8328e8a4d046ee7bd9733b915fa1c823074.tar.gz | |
Merging default into bug23095
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_framing_channel.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 128 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 64 | ||||
| -rw-r--r-- | src/supervisor2.erl | 2 |
5 files changed, 175 insertions, 104 deletions
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 00b74ad010..553faaa814 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -35,18 +35,19 @@ -export([start_link/3, process/2, shutdown/1]). %% internal --export([mainloop/2]). +-export([mainloop/3]). %%-------------------------------------------------------------------- start_link(StartFun, StartArgs, Protocol) -> + Parent = self(), {ok, spawn_link( fun () -> %% we trap exits so that a normal termination of %% the channel or reader process terminates us too. process_flag(trap_exit, true), {ok, ChannelPid} = apply(StartFun, StartArgs), - mainloop(ChannelPid, Protocol) + mainloop(Parent, ChannelPid, Protocol) end)}. process(Pid, Frame) -> @@ -73,46 +74,55 @@ read_frame(ChannelPid) -> Msg -> exit({unexpected_message, Msg}) end. -mainloop(ChannelPid, Protocol) -> +mainloop(Parent, ChannelPid, Protocol) -> case read_frame(ChannelPid) of {method, MethodName, FieldsBin} -> Method = Protocol:decode_method_fields(MethodName, FieldsBin), case Protocol:method_has_content(MethodName) of true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), - rabbit_channel:do(ChannelPid, Method, - collect_content(ChannelPid, - ClassId, - Protocol)); - false -> rabbit_channel:do(ChannelPid, Method) - end, - ?MODULE:mainloop(ChannelPid, Protocol); + case collect_content(ChannelPid, ClassId, Protocol) of + {ok, Content} -> + rabbit_channel:do(ChannelPid, Method, Content), + ?MODULE:mainloop(Parent, ChannelPid, Protocol); + {error, Reason} -> + channel_exit(Parent, Reason, MethodName) + end; + false -> rabbit_channel:do(ChannelPid, Method), + ?MODULE:mainloop(Parent, ChannelPid, Protocol) + end; _ -> - unexpected_frame("expected method frame, " - "got non method frame instead", - []) + channel_exit(Parent, {unexpected_frame, + "expected method frame, " + "got non method frame instead", + []}, none) end. collect_content(ChannelPid, ClassId, Protocol) -> case read_frame(ChannelPid) of {content_header, ClassId, 0, BodySize, PropertiesBin} -> - Payload = collect_content_payload(ChannelPid, BodySize, []), - #content{class_id = ClassId, - properties = none, - properties_bin = PropertiesBin, - protocol = Protocol, - payload_fragments_rev = Payload}; + case collect_content_payload(ChannelPid, BodySize, []) of + {ok, Payload} -> {ok, #content{ + class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + protocol = Protocol, + payload_fragments_rev = Payload}}; + Error -> Error + end; {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> - unexpected_frame("expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]); + {error, {unexpected_frame, + "expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId]}}; _ -> - unexpected_frame("expected content header for class ~w, " - "got non content header frame instead", - [ClassId]) + {error, {unexpected_frame, + "expected content header for class ~w, " + "got non content header frame instead", + [ClassId]}} end. collect_content_payload(_ChannelPid, 0, Acc) -> - Acc; + {ok, Acc}; collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> case read_frame(ChannelPid) of {content_body, FragmentBin} -> @@ -120,10 +130,13 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> RemainingByteCount - size(FragmentBin), [FragmentBin | Acc]); _ -> - unexpected_frame("expected content body, " - "got non content body frame instead", - []) + {error, {unexpected_frame, + "expected content body, " + "got non content body frame instead", + []}} end. -unexpected_frame(ExplanationFormat, Params) -> - rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params). +channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) -> + Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params, + MethodName), + Parent ! {channel_exit, self(), Reason}. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 505dc28fe8..4a5adfaeed 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -91,7 +91,6 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), - ok = wait_for_tables(), ok. is_db_empty() -> @@ -114,7 +113,6 @@ cluster(ClusterNodes, Force) -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try ok = init_db(ClusterNodes, Force), - ok = wait_for_tables(), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -157,57 +155,87 @@ table_definitions() -> [{rabbit_user, [{record_name, user}, {attributes, record_info(fields, user)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user{_='_'}}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user_permission{user_vhost = #user_vhost{_='_'}, + permission = #permission{_='_'}, + _='_'}}]}, {rabbit_vhost, [{record_name, vhost}, {attributes, record_info(fields, vhost)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #vhost{_='_'}}]}, {rabbit_config, [{attributes, [key, val]}, % same mnesia's default - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, {rabbit_config, '_', '_'}}]}, {rabbit_listener, [{record_name, listener}, {attributes, record_info(fields, listener)}, - {type, bag}]}, + {type, bag}, + {match, #listener{_='_'}}]}, {rabbit_durable_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_reverse_route, [{record_name, reverse_route}, {attributes, record_info(fields, reverse_route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #reverse_route{reverse_binding = reverse_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_exchange, [{record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, + {attributes, record_info(fields, exchange)}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, {rabbit_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}]. + {attributes, record_info(fields, amqqueue)}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + +binding_match() -> + #binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +reverse_binding_match() -> + #reverse_binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +exchange_name_match() -> + resource_match(exchange). +queue_name_match() -> + resource_match(queue). +resource_match(Kind) -> + #resource{kind = Kind, _='_'}. table_names() -> [Tab || {Tab, _} <- table_definitions()]. replicated_table_names() -> - [Tab || {Tab, Attrs} <- table_definitions(), - not lists:member({local_content, true}, Attrs) + [Tab || {Tab, TabDef} <- table_definitions(), + not lists:member({local_content, true}, TabDef) ]. dir() -> mnesia:system_info(directory). @@ -232,26 +260,55 @@ ensure_mnesia_not_running() -> yes -> throw({error, mnesia_unexpectedly_running}) end. +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + check_schema_integrity() -> - TabDefs = table_definitions(), Tables = mnesia:system_info(tables), - case [Error || Tab <- table_names(), + case [Error || {Tab, TabDef} <- table_definitions(), case lists:member(Tab, Tables) of false -> Error = {table_missing, Tab}, true; true -> - {_, TabDef} = proplists:lookup(Tab, TabDefs), {_, ExpAttrs} = proplists:lookup(attributes, TabDef), Attrs = mnesia:table_info(Tab, attributes), Error = {table_attributes_mismatch, Tab, ExpAttrs, Attrs}, Attrs /= ExpAttrs end] of - [] -> ok; + [] -> check_table_integrity(); Errors -> {error, Errors} end. +check_table_integrity() -> + ok = wait_for_tables(), + case lists:all(fun ({Tab, TabDef}) -> + {_, Match} = proplists:lookup(match, TabDef), + read_test_table(Tab, Match) + end, table_definitions()) of + true -> ok; + false -> {error, invalid_table_content} + end. + +read_test_table(Tab, Match) -> + case mnesia:dirty_first(Tab) of + '$end_of_table' -> + true; + Key -> + ObjList = mnesia:dirty_read(Tab, Key), + MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), + case ets:match_spec_run(ObjList, MatchComp) of + ObjList -> true; + _ -> false + end + end. + %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. %% @@ -347,8 +404,9 @@ init_db(ClusterNodes, Force) -> ok = create_local_table_copies(case IsDiskNode of true -> disc; false -> ram - end) - end; + end), + ok = ensure_schema_integrity() + end; {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -363,7 +421,9 @@ create_schema() -> cannot_create_schema), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - create_tables(). + ok = create_tables(), + ok = ensure_schema_integrity(), + ok = wait_for_tables(). move_db() -> mnesia:stop(), @@ -388,12 +448,13 @@ move_db() -> ok. create_tables() -> - lists:foreach(fun ({Tab, TabArgs}) -> - case mnesia:create_table(Tab, TabArgs) of + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of {atomic, ok} -> ok; {aborted, Reason} -> throw({error, {table_creation_failed, - Tab, TabArgs, Reason}}) + Tab, TabDef1, Reason}}) end end, table_definitions()), @@ -448,17 +509,12 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). wait_for_tables() -> wait_for_tables(table_names()). wait_for_tables(TableNames) -> - case check_schema_integrity() of - ok -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) - end; + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) + throw({error, {failed_waiting_for_tables, Reason}}) end. reset(Force) -> diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 35b75a23a9..c9f75be030 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -35,7 +35,6 @@ -define(DefaultPluginDir, "plugins"). -define(DefaultUnpackedPluginDir, "priv/plugins"). --define(DefaultRabbitEBin, "ebin"). -define(BaseApps, [rabbit]). %%---------------------------------------------------------------------------- @@ -52,15 +51,15 @@ %%---------------------------------------------------------------------------- start() -> + io:format("Activating RabbitMQ plugins ..."), %% Ensure Rabbit is loaded so we can access it's environment application:load(rabbit), %% Determine our various directories PluginDir = get_env(plugins_dir, ?DefaultPluginDir), UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), - RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin), - RootName = RabbitEBin ++ "/rabbit", + RootName = UnpackedPluginDir ++ "/rabbit", %% Unpack any .ez plugins unpack_ez_plugins(PluginDir, UnpackedPluginDir), @@ -86,7 +85,7 @@ start() -> {erts, erlang:system_info(version)}, AppVersions}, - %% Write it out to ebin/rabbit.rel + %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), %% Compile the script @@ -131,8 +130,9 @@ start() -> ok -> ok; error -> error("failed to compile boot script file ~s", [ScriptFile]) end, - io:format("~n~w plugins activated.~n~n", [length(PluginApps)]), + io:format("~n~w plugins activated:~n", [length(PluginApps)]), [io:format("* ~w~n", [App]) || App <- PluginApps], + io:nl(), halt(), ok. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f947cd9053..d5ade90f6f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -37,7 +37,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/1, mainloop/3]). +-export([init/1, mainloop/2]). -export([conserve_memory/2, server_properties/0]). @@ -59,7 +59,7 @@ %--------------------------------------------------------------------------- --record(v1, {sock, connection, callback, recv_length, recv_ref, +-record(v1, {parent, sock, connection, callback, recv_length, recv_ref, connection_state, queue_collector, heartbeater, stats_timer}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, @@ -186,7 +186,7 @@ init(Parent) -> end. system_continue(Parent, Deb, State) -> - ?MODULE:mainloop(Parent, Deb, State). + ?MODULE:mainloop(Deb, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -273,22 +273,23 @@ start_connection(Parent, Deb, Sock, SockTransform) -> ProfilingValue = setup_profiling(), {ok, Collector} = rabbit_queue_collector:start_link(), try - mainloop(Parent, Deb, switch_callback( - #v1{sock = ClientSock, - connection = #connection{ - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, + mainloop(Deb, switch_callback( + #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, client_properties = none, - protocol = none}, - callback = uninitialized_callback, - recv_length = 0, - recv_ref = none, + protocol = none}, + callback = uninitialized_callback, + recv_length = 0, + recv_ref = none, connection_state = pre_init, - queue_collector = Collector, - heartbeater = none, - stats_timer = + queue_collector = Collector, + heartbeater = none, + stats_timer = rabbit_event:init_stats_timer()}, handshake, 8)) catch @@ -314,15 +315,14 @@ start_connection(Parent, Deb, Sock, SockTransform) -> end, done. -mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> +mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), receive {inet_async, Sock, Ref, {ok, Data}} -> {State1, Callback1, Length1} = handle_input(State#v1.callback, Data, State#v1{recv_ref = none}), - mainloop(Parent, Deb, - switch_callback(State1, Callback1, Length1)); + mainloop(Deb, switch_callback(State1, Callback1, Length1)); {inet_async, Sock, Ref, {error, closed}} -> if State#v1.connection_state =:= closed -> State; @@ -332,7 +332,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); {conserve_memory, Conserve} -> - mainloop(Parent, Deb, internal_conserve_memory(Conserve, State)); + mainloop(Deb, internal_conserve_memory(Conserve, State)); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -348,16 +348,16 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); {channel_exit, Channel, Reason} -> - mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); + mainloop(Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> - mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); + mainloop(Deb, handle_dependent_exit(Pid, Reason, State)); terminate_connection -> State; handshake_timeout -> if ?IS_RUNNING(State) orelse State#v1.connection_state =:= closing orelse State#v1.connection_state =:= closed -> - mainloop(Parent, Deb, State); + mainloop(Deb, State); true -> throw({handshake_timeout, State#v1.callback}) end; @@ -368,22 +368,21 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> gen_server:reply(From, ok), case ForceTermination of force -> ok; - normal -> mainloop(Parent, Deb, NewState) + normal -> mainloop(Deb, NewState) end; {'$gen_call', From, info} -> gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Parent, Deb, State); + mainloop(Deb, State); {'$gen_call', From, {info, Items}} -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), - mainloop(Parent, Deb, State); + mainloop(Deb, State); {'$gen_cast', emit_stats} -> internal_emit_stats(State), - mainloop(Parent, Deb, - State#v1{stats_timer = - rabbit_event:reset_stats_timer_after( - State#v1.stats_timer)}); + mainloop(Deb, State#v1{stats_timer = + rabbit_event:reset_stats_timer_after( + State#v1.stats_timer)}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -444,6 +443,9 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. +handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) -> + {channel, Channel} = get({chpid, ChPid}), + handle_exception(State, Channel, Reason); handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 4d2955d9cd..8788303752 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -563,7 +563,7 @@ restart(Child, State) -> {terminate, NState} -> report_error(shutdown, reached_max_restart_intensity, Child, State#state.name), - {shutdown, remove_child(Child, NState)} + {shutdown, state_del_child(Child, NState)} end. restart1(Child, State) -> |
