diff options
| author | Essien Ita Essien <essiene@gmail.com> | 2008-12-26 10:29:00 +0100 |
|---|---|---|
| committer | Essien Ita Essien <essiene@gmail.com> | 2008-12-26 10:29:00 +0100 |
| commit | a625e7695d9af4083a137821e52bd3afa09c85c2 (patch) | |
| tree | 11f0aaae33fa1335f1b986f76757b6733d64fe91 /src | |
| parent | 9e4fdee06474001eecfd58cce1ceea7e49fde9b3 (diff) | |
| parent | 4be2257979970f5b697d1e4402ca0bd6bd3ae691 (diff) | |
| download | rabbitmq-server-git-a625e7695d9af4083a137821e52bd3afa09c85c2.tar.gz | |
Merge in upstream changes
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 |
6 files changed, 104 insertions, 63 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 63ececb73e..6891fe736f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -163,11 +163,8 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), - ok = rabbit_alarm:start( - case application:get_env(start_memsup) of - {ok, Val} -> Val; - undefined -> true - end), + {ok, MemoryAlarms} = application:get_env(memory_alarms), + ok = rabbit_alarm:start(MemoryAlarms), ok = rabbit_binary_generator: check_empty_content_body_frame_size(), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 43c4ad90c0..7bbed8b715 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -55,12 +55,12 @@ %%---------------------------------------------------------------------------- -start(StartMemsup) -> - ok = alarm_handler:add_alarm_handler(?MODULE), +start(MemoryAlarms) -> + ok = alarm_handler:add_alarm_handler(?MODULE, [MemoryAlarms]), case whereis(memsup) of - undefined -> if StartMemsup -> ok = start_memsup(), - ok = adjust_memsup_interval(); - true -> ok + undefined -> if MemoryAlarms -> ok = start_memsup(), + ok = adjust_memsup_interval(); + true -> ok end; _ -> ok = adjust_memsup_interval() end. @@ -74,9 +74,15 @@ register(Pid, HighMemMFA) -> %%---------------------------------------------------------------------------- -init([]) -> - {ok, #alarms{alertees = dict:new()}}. +init([MemoryAlarms]) -> + {ok, #alarms{alertees = case MemoryAlarms of + true -> dict:new(); + false -> undefined + end}}. +handle_call({register, _Pid, _HighMemMFA}, + State = #alarms{alertees = undefined}) -> + {ok, ok, State}; handle_call({register, Pid, HighMemMFA}, State = #alarms{alertees = Alertess}) -> _MRef = erlang:monitor(process, Pid), @@ -102,6 +108,9 @@ handle_event({clear_alarm, system_memory_high_watermark}, State) -> handle_event(_Event, State) -> {ok, State}. +handle_info({'DOWN', _MRef, process, _Pid, _Reason}, + State = #alarms{alertees = undefined}) -> + {ok, State}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #alarms{alertees = Alertess}) -> {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}}; @@ -165,6 +174,8 @@ adjust_memsup_interval() -> {set_check_interval, ?MEMSUP_CHECK_INTERVAL}, infinity). +alert(_Alert, undefined) -> + ok; alert(Alert, Alertees) -> dict:fold(fun (Pid, {M, F, A}, Acc) -> ok = erlang:apply(M, F, A ++ [Pid, Alert]), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5d7fde90ef..19104bcb90 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -111,20 +111,25 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> - case (catch handle_method(Method, Content, State)) of - {reply, Reply, NewState} -> - ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - NewState; - {noreply, NewState} -> - NewState; - stop -> - exit(normal); - {'EXIT', {amqp, Error, Explanation, none}} -> + try + case handle_method(Method, Content, State) of + {reply, Reply, NewState} -> + ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), + NewState; + {noreply, NewState} -> + NewState; + stop -> + exit(normal) + end + catch + exit:{amqp, Error, Explanation, none} -> terminate({amqp, Error, Explanation, rabbit_misc:method_record_type(Method)}, State); - {'EXIT', Reason} -> - terminate(Reason, State) + exit:normal -> + terminate(normal, State); + _:Reason -> + terminate({Reason, erlang:get_stacktrace()}, State) end; handle_message(terminate, State) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 299747d141..925c335cee 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -238,7 +238,19 @@ route(#exchange{name = Name, type = topic}, RoutingKey) -> %% TODO: This causes a full scan for each entry %% with the same exchange (see bug 19336) topic_matches(BindingKey, RoutingKey)]), - lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])); + lookup_qpids( + try + mnesia:async_dirty(fun qlc:e/1, [Query]) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + [QName || #route{binding = #binding{queue_name = QName, + key = BindingKey}} <- + mnesia:dirty_match_object( + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), + topic_matches(BindingKey, RoutingKey)] + end); route(X = #exchange{type = fanout}, _) -> route_internal(X, '_'); @@ -256,8 +268,10 @@ route_internal(#exchange{name = Name}, RoutingKey) -> lookup_qpids(Queues) -> sets:fold( fun(Key, Acc) -> - [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}), - [QPid | Acc] + case mnesia:dirty_read({amqqueue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end end, [], sets:from_list(Queues)). %% TODO: Should all of the route and binding management not be diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 57dd92563d..d19c37cb44 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -36,9 +36,6 @@ -export([table_names/0]). -%% Called by rabbitmq-mnesia-current script --export([schema_current/0]). - %% create_tables/0 exported for helping embed RabbitMQ in or alongside %% other mnesia-using Erlang applications, such as ejabberd -export([create_tables/0]). @@ -57,7 +54,6 @@ -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). --spec(schema_current/0 :: () -> bool()). -endif. @@ -101,20 +97,6 @@ cluster(ClusterNodes) -> reset() -> reset(false). force_reset() -> reset(true). -%% This is invoked by rabbitmq-mnesia-current. -schema_current() -> - application:start(mnesia), - ok = ensure_mnesia_running(), - ok = ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config()), - try - ensure_schema_integrity(), - true - catch - {error, {schema_integrity_check_failed, _Reason}} -> - false - end. - %%-------------------------------------------------------------------- table_definitions() -> @@ -169,17 +151,12 @@ ensure_mnesia_not_running() -> yes -> throw({error, mnesia_unexpectedly_running}) end. -ensure_schema_integrity() -> - case catch lists:foreach(fun (Tab) -> - mnesia:table_info(Tab, version) - end, - table_names()) of - {'EXIT', Reason} -> throw({error, {schema_integrity_check_failed, - Reason}}); - _ -> ok - end, +check_schema_integrity() -> %%TODO: more thorough checks - ok. + case catch [mnesia:table_info(Tab, version) || Tab <- table_names()] of + {'EXIT', Reason} -> {error, Reason}; + _ -> ok + end. %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. @@ -259,7 +236,20 @@ init_db(ClusterNodes) -> case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of {ok, []} -> if WasDiskNode and IsDiskNode -> - ok; + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + %% NB: we cannot use rabbit_log here since + %% it may not have been started yet + error_logger:warning_msg( + "schema integrity check failed: ~p~n" ++ + "moving database to backup location " ++ + "and recreating schema from scratch~n", + [Reason]), + ok = move_db(), + ok = create_schema() + end; WasDiskNode -> throw({error, {cannot_convert_disk_node_to_ram_node, ClusterNodes}}); @@ -292,6 +282,28 @@ create_schema() -> cannot_start_mnesia), create_tables(). +move_db() -> + mnesia:stop(), + MnesiaDir = filename:dirname(mnesia:system_info(directory) ++ "/"), + {{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(), + BackupDir = lists:flatten( + io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w", + [MnesiaDir, + Year, Month, Day, Hour, Minute, Second])), + case file:rename(MnesiaDir, BackupDir) of + ok -> + %% NB: we cannot use rabbit_log here since it may not have + %% been started yet + error_logger:warning_msg("moved database from ~s to ~s~n", + [MnesiaDir, BackupDir]), + ok; + {error, Reason} -> throw({error, {cannot_backup_mnesia, + MnesiaDir, BackupDir, Reason}}) + end, + ok = ensure_mnesia_dir(), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + ok. + create_tables() -> lists:foreach(fun ({Tab, TabArgs}) -> case mnesia:create_table(Tab, TabArgs) of @@ -353,13 +365,17 @@ create_local_table_copy(Tab, Type) -> ok. wait_for_tables() -> - ok = ensure_schema_integrity(), - case mnesia:wait_for_tables(table_names(), 30000) of - ok -> ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); + case check_schema_integrity() of + ok -> + case mnesia:wait_for_tables(table_names(), 30000) of + ok -> ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) + end; {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) + throw({error, {schema_integrity_check_failed, Reason}}) end. reset(Force) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1853a85511..df2e71d9e6 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -139,8 +139,6 @@ test_topic_matching() -> passed. test_app_management() -> - true = rabbit_mnesia:schema_current(), - %% starting, stopping, status ok = control_action(stop_app, []), ok = control_action(stop_app, []), |
