summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEssien Ita Essien <essiene@gmail.com>2008-12-26 10:29:00 +0100
committerEssien Ita Essien <essiene@gmail.com>2008-12-26 10:29:00 +0100
commita625e7695d9af4083a137821e52bd3afa09c85c2 (patch)
tree11f0aaae33fa1335f1b986f76757b6733d64fe91 /src
parent9e4fdee06474001eecfd58cce1ceea7e49fde9b3 (diff)
parent4be2257979970f5b697d1e4402ca0bd6bd3ae691 (diff)
downloadrabbitmq-server-git-a625e7695d9af4083a137821e52bd3afa09c85c2.tar.gz
Merge in upstream changes
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_alarm.erl25
-rw-r--r--src/rabbit_channel.erl27
-rw-r--r--src/rabbit_exchange.erl20
-rw-r--r--src/rabbit_mnesia.erl86
-rw-r--r--src/rabbit_tests.erl2
6 files changed, 104 insertions, 63 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 63ececb73e..6891fe736f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -163,11 +163,8 @@ start(normal, []) ->
ok = rabbit_amqqueue:start(),
- ok = rabbit_alarm:start(
- case application:get_env(start_memsup) of
- {ok, Val} -> Val;
- undefined -> true
- end),
+ {ok, MemoryAlarms} = application:get_env(memory_alarms),
+ ok = rabbit_alarm:start(MemoryAlarms),
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 43c4ad90c0..7bbed8b715 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -55,12 +55,12 @@
%%----------------------------------------------------------------------------
-start(StartMemsup) ->
- ok = alarm_handler:add_alarm_handler(?MODULE),
+start(MemoryAlarms) ->
+ ok = alarm_handler:add_alarm_handler(?MODULE, [MemoryAlarms]),
case whereis(memsup) of
- undefined -> if StartMemsup -> ok = start_memsup(),
- ok = adjust_memsup_interval();
- true -> ok
+ undefined -> if MemoryAlarms -> ok = start_memsup(),
+ ok = adjust_memsup_interval();
+ true -> ok
end;
_ -> ok = adjust_memsup_interval()
end.
@@ -74,9 +74,15 @@ register(Pid, HighMemMFA) ->
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, #alarms{alertees = dict:new()}}.
+init([MemoryAlarms]) ->
+ {ok, #alarms{alertees = case MemoryAlarms of
+ true -> dict:new();
+ false -> undefined
+ end}}.
+handle_call({register, _Pid, _HighMemMFA},
+ State = #alarms{alertees = undefined}) ->
+ {ok, ok, State};
handle_call({register, Pid, HighMemMFA},
State = #alarms{alertees = Alertess}) ->
_MRef = erlang:monitor(process, Pid),
@@ -102,6 +108,9 @@ handle_event({clear_alarm, system_memory_high_watermark}, State) ->
handle_event(_Event, State) ->
{ok, State}.
+handle_info({'DOWN', _MRef, process, _Pid, _Reason},
+ State = #alarms{alertees = undefined}) ->
+ {ok, State};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #alarms{alertees = Alertess}) ->
{ok, State#alarms{alertees = dict:erase(Pid, Alertess)}};
@@ -165,6 +174,8 @@ adjust_memsup_interval() ->
{set_check_interval, ?MEMSUP_CHECK_INTERVAL},
infinity).
+alert(_Alert, undefined) ->
+ ok;
alert(Alert, Alertees) ->
dict:fold(fun (Pid, {M, F, A}, Acc) ->
ok = erlang:apply(M, F, A ++ [Pid, Alert]),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5d7fde90ef..19104bcb90 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -111,20 +111,25 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
- case (catch handle_method(Method, Content, State)) of
- {reply, Reply, NewState} ->
- ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
- NewState;
- {noreply, NewState} ->
- NewState;
- stop ->
- exit(normal);
- {'EXIT', {amqp, Error, Explanation, none}} ->
+ try
+ case handle_method(Method, Content, State) of
+ {reply, Reply, NewState} ->
+ ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
+ NewState;
+ {noreply, NewState} ->
+ NewState;
+ stop ->
+ exit(normal)
+ end
+ catch
+ exit:{amqp, Error, Explanation, none} ->
terminate({amqp, Error, Explanation,
rabbit_misc:method_record_type(Method)},
State);
- {'EXIT', Reason} ->
- terminate(Reason, State)
+ exit:normal ->
+ terminate(normal, State);
+ _:Reason ->
+ terminate({Reason, erlang:get_stacktrace()}, State)
end;
handle_message(terminate, State) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 299747d141..925c335cee 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -238,7 +238,19 @@ route(#exchange{name = Name, type = topic}, RoutingKey) ->
%% TODO: This causes a full scan for each entry
%% with the same exchange (see bug 19336)
topic_matches(BindingKey, RoutingKey)]),
- lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query]));
+ lookup_qpids(
+ try
+ mnesia:async_dirty(fun qlc:e/1, [Query])
+ catch exit:{aborted, {badarg, _}} ->
+ %% work around OTP-7025, which was fixed in R12B-1, by
+ %% falling back on a less efficient method
+ [QName || #route{binding = #binding{queue_name = QName,
+ key = BindingKey}} <-
+ mnesia:dirty_match_object(
+ #route{binding = #binding{exchange_name = Name,
+ _ = '_'}}),
+ topic_matches(BindingKey, RoutingKey)]
+ end);
route(X = #exchange{type = fanout}, _) ->
route_internal(X, '_');
@@ -256,8 +268,10 @@ route_internal(#exchange{name = Name}, RoutingKey) ->
lookup_qpids(Queues) ->
sets:fold(
fun(Key, Acc) ->
- [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}),
- [QPid | Acc]
+ case mnesia:dirty_read({amqqueue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
end, [], sets:from_list(Queues)).
%% TODO: Should all of the route and binding management not be
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 57dd92563d..d19c37cb44 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -36,9 +36,6 @@
-export([table_names/0]).
-%% Called by rabbitmq-mnesia-current script
--export([schema_current/0]).
-
%% create_tables/0 exported for helping embed RabbitMQ in or alongside
%% other mnesia-using Erlang applications, such as ejabberd
-export([create_tables/0]).
@@ -57,7 +54,6 @@
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
--spec(schema_current/0 :: () -> bool()).
-endif.
@@ -101,20 +97,6 @@ cluster(ClusterNodes) ->
reset() -> reset(false).
force_reset() -> reset(true).
-%% This is invoked by rabbitmq-mnesia-current.
-schema_current() ->
- application:start(mnesia),
- ok = ensure_mnesia_running(),
- ok = ensure_mnesia_dir(),
- ok = init_db(read_cluster_nodes_config()),
- try
- ensure_schema_integrity(),
- true
- catch
- {error, {schema_integrity_check_failed, _Reason}} ->
- false
- end.
-
%%--------------------------------------------------------------------
table_definitions() ->
@@ -169,17 +151,12 @@ ensure_mnesia_not_running() ->
yes -> throw({error, mnesia_unexpectedly_running})
end.
-ensure_schema_integrity() ->
- case catch lists:foreach(fun (Tab) ->
- mnesia:table_info(Tab, version)
- end,
- table_names()) of
- {'EXIT', Reason} -> throw({error, {schema_integrity_check_failed,
- Reason}});
- _ -> ok
- end,
+check_schema_integrity() ->
%%TODO: more thorough checks
- ok.
+ case catch [mnesia:table_info(Tab, version) || Tab <- table_names()] of
+ {'EXIT', Reason} -> {error, Reason};
+ _ -> ok
+ end.
%% The cluster node config file contains some or all of the disk nodes
%% that are members of the cluster this node is / should be a part of.
@@ -259,7 +236,20 @@ init_db(ClusterNodes) ->
case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of
{ok, []} ->
if WasDiskNode and IsDiskNode ->
- ok;
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ %% NB: we cannot use rabbit_log here since
+ %% it may not have been started yet
+ error_logger:warning_msg(
+ "schema integrity check failed: ~p~n" ++
+ "moving database to backup location " ++
+ "and recreating schema from scratch~n",
+ [Reason]),
+ ok = move_db(),
+ ok = create_schema()
+ end;
WasDiskNode ->
throw({error, {cannot_convert_disk_node_to_ram_node,
ClusterNodes}});
@@ -292,6 +282,28 @@ create_schema() ->
cannot_start_mnesia),
create_tables().
+move_db() ->
+ mnesia:stop(),
+ MnesiaDir = filename:dirname(mnesia:system_info(directory) ++ "/"),
+ {{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(),
+ BackupDir = lists:flatten(
+ io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
+ [MnesiaDir,
+ Year, Month, Day, Hour, Minute, Second])),
+ case file:rename(MnesiaDir, BackupDir) of
+ ok ->
+ %% NB: we cannot use rabbit_log here since it may not have
+ %% been started yet
+ error_logger:warning_msg("moved database from ~s to ~s~n",
+ [MnesiaDir, BackupDir]),
+ ok;
+ {error, Reason} -> throw({error, {cannot_backup_mnesia,
+ MnesiaDir, BackupDir, Reason}})
+ end,
+ ok = ensure_mnesia_dir(),
+ rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ ok.
+
create_tables() ->
lists:foreach(fun ({Tab, TabArgs}) ->
case mnesia:create_table(Tab, TabArgs) of
@@ -353,13 +365,17 @@ create_local_table_copy(Tab, Type) ->
ok.
wait_for_tables() ->
- ok = ensure_schema_integrity(),
- case mnesia:wait_for_tables(table_names(), 30000) of
- ok -> ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
+ case check_schema_integrity() of
+ ok ->
+ case mnesia:wait_for_tables(table_names(), 30000) of
+ ok -> ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
+ {error, Reason} ->
+ throw({error, {failed_waiting_for_tables, Reason}})
+ end;
{error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
+ throw({error, {schema_integrity_check_failed, Reason}})
end.
reset(Force) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1853a85511..df2e71d9e6 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -139,8 +139,6 @@ test_topic_matching() ->
passed.
test_app_management() ->
- true = rabbit_mnesia:schema_current(),
-
%% starting, stopping, status
ok = control_action(stop_app, []),
ok = control_action(stop_app, []),