summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-06-29 13:51:09 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-06-29 13:51:09 +0100
commita9dc9a1de77dbed1fa79fa4a36c8878239f17ac1 (patch)
treed3b64592b4a5418adb1d63fb6d355577a2c6f9c7 /src
parent563ba9852974db391a35d26b420dd79a89d6489a (diff)
parenta54198f5a64dd49f5906a784f2c768ddc8110966 (diff)
downloadrabbitmq-server-git-a9dc9a1de77dbed1fa79fa4a36c8878239f17ac1.tar.gz
Merged bug24826 into default
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl97
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_control_main.erl5
-rw-r--r--src/rabbit_disk_monitor.erl6
-rw-r--r--src/rabbit_file.erl17
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl8
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mirror_queue_slave.erl17
-rw-r--r--src/rabbit_reader.erl8
10 files changed, 90 insertions, 89 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 97c81ec635..f88ed18fbf 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -558,7 +558,7 @@ handle_call(group_members, _From,
reply(not_joined, State);
handle_call(group_members, _From, State = #state { view = View }) ->
- reply(alive_view_members(View), State);
+ reply(get_pids(alive_view_members(View)), State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
@@ -647,7 +647,7 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
-handle_info({'DOWN', MRef, process, _Pid, _Reason},
+handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
right = Right,
@@ -661,8 +661,10 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
{_, {Member1, MRef}} -> Member1;
_ -> undefined
end,
- case Member of
- undefined ->
+ case {Member, Reason} of
+ {undefined, _} ->
+ noreply(State);
+ {_, {shutdown, ring_shutdown}} ->
noreply(State);
_ ->
View1 =
@@ -876,11 +878,9 @@ flush_broadcast_buffer(State = #state { self = Self,
%% View construction and inspection
%% ---------------------------------------------------------------------------
-needs_view_update(ReqVer, {Ver, _View}) ->
- Ver < ReqVer.
+needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
-view_version({Ver, _View}) ->
- Ver.
+view_version({Ver, _View}) -> Ver.
is_member_alive({dead, _Member}) -> false;
is_member_alive(_) -> true.
@@ -899,17 +899,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
-fetch_view_member(Id, {_Ver, View}) ->
- ?DICT:fetch(Id, View).
+fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
-find_view_member(Id, {_Ver, View}) ->
- ?DICT:find(Id, View).
+find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
-blank_view(Ver) ->
- {Ver, ?DICT:new()}.
+blank_view(Ver) -> {Ver, ?DICT:new()}.
-alive_view_members({_Ver, View}) ->
- ?DICT:fetch_keys(View).
+alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
all_known_members({_Ver, View}) ->
?DICT:fold(
@@ -1150,10 +1146,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
-maybe_monitor(Self, Self) ->
- undefined;
-maybe_monitor(Other, _Self) ->
- erlang:monitor(process, get_pid(Other)).
+maybe_monitor( Self, Self) -> undefined;
+maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1242,23 +1236,19 @@ find_member_or_blank(Id, MembersState) ->
error -> blank_member()
end.
-erase_member(Id, MembersState) ->
- ?DICT:erase(Id, MembersState).
+erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
-blank_member_state() ->
- ?DICT:new().
+blank_member_state() -> ?DICT:new().
store_member(Id, MemberState, MembersState) ->
?DICT:store(Id, MemberState, MembersState).
-prepare_members_state(MembersState) ->
- ?DICT:to_list(MembersState).
+prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
-build_members_state(MembersStateList) ->
- ?DICT:from_list(MembersStateList).
+build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
make_member(GroupName) ->
{case read_group(GroupName) of
@@ -1280,16 +1270,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% Activity assembly
%% ---------------------------------------------------------------------------
-activity_nil() ->
- queue:new().
+activity_nil() -> queue:new().
-activity_cons(_Id, [], [], Tail) ->
- Tail;
-activity_cons(Sender, Pubs, Acks, Tail) ->
- queue:in({Sender, Pubs, Acks}, Tail).
+activity_cons( _Id, [], [], Tail) -> Tail;
+activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
-activity_finalise(Activity) ->
- queue:to_list(Activity).
+activity_finalise(Activity) -> queue:to_list(Activity).
maybe_send_activity([], _State) ->
ok;
@@ -1393,34 +1379,25 @@ purge_confirms(Confirms) ->
%% Msg transformation
%% ---------------------------------------------------------------------------
-acks_from_queue(Q) ->
- [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
-pubs_from_queue(Q) ->
- queue:to_list(Q).
+pubs_from_queue(Q) -> queue:to_list(Q).
-queue_from_pubs(Pubs) ->
- queue:from_list(Pubs).
+queue_from_pubs(Pubs) -> queue:from_list(Pubs).
-apply_acks([], Pubs) ->
- Pubs;
-apply_acks(List, Pubs) ->
- {_, Pubs1} = queue:split(length(List), Pubs),
- Pubs1.
+apply_acks( [], Pubs) -> Pubs;
+apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
join_pubs(Q, []) -> Q;
join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
-last_ack([], LA) ->
- LA;
-last_ack(List, LA) ->
- LA1 = lists:last(List),
- true = LA1 > LA, %% ASSERTION
- LA1.
-
-last_pub([], LP) ->
- LP;
-last_pub(List, LP) ->
- {PubNum, _Msg} = lists:last(List),
- true = PubNum > LP, %% ASSERTION
- PubNum.
+last_ack( [], LA) -> LA;
+last_ack(List, LA) -> LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub( [], LP) -> LP;
+last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fc5d4e934e..fda489fe61 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -323,8 +323,6 @@ boot() ->
start_it(StartFun) ->
try
StartFun()
- catch _:Reason ->
- boot_error("Error description:~n~n~p~n~n", [Reason])
after
%% give the error loggers some time to catch up
timer:sleep(100)
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 637a7d4d0b..afbaea651b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -171,6 +171,9 @@
[queue_name, channel_pid, consumer_tag, ack_required]).
start() ->
+ %% Clear out remnants of old incarnation, in case we restarted
+ %% faster than other nodes handled DOWN messages from us.
+ on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
@@ -530,7 +533,7 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
@@ -595,7 +598,8 @@ on_node_down(Node) ->
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
- node(Pid) == Node])),
+ node(Pid) == Node andalso
+ not is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 2e163cfbe1..b23088cc7a 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -156,6 +156,11 @@ start() ->
{'EXIT', {badarg, _}} ->
print_error("invalid parameter: ~p", [Args]),
usage();
+ {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) ->
+ %% We handle this common case specially to avoid ~p since
+ %% that has i18n issues
+ print_error("~s: ~s", [Problem, Reason]),
+ rabbit_misc:quit(2);
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index d9e8e8e469..58375abb45 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -27,7 +27,7 @@
set_check_interval/1, get_disk_free/0]).
-define(SERVER, ?MODULE).
--define(DEFAULT_DISK_CHECK_INTERVAL, 60000).
+-define(DEFAULT_DISK_CHECK_INTERVAL, 10000).
-record(state, {dir,
limit,
@@ -168,8 +168,8 @@ get_disk_free(Dir, {unix, _}) ->
parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"]));
-get_disk_free(_, _) ->
- unknown.
+get_disk_free(_, Platform) ->
+ {unknown, Platform}.
parse_free_unix(CommandResult) ->
[_, Stats | _] = string:tokens(CommandResult, "\n"),
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 59df14f318..a95f8f269d 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -102,9 +102,12 @@ read_file_info(File) ->
with_fhc_handle(fun () -> prim_file:read_file_info(File) end).
with_fhc_handle(Fun) ->
- ok = file_handle_cache:obtain(),
+ with_fhc_handle(1, Fun).
+
+with_fhc_handle(N, Fun) ->
+ [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)],
try Fun()
- after ok = file_handle_cache:release()
+ after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)]
end.
read_term_file(File) ->
@@ -165,7 +168,7 @@ make_binary(List) ->
{error, Reason}
end.
-
+%% TODO the semantics of this function are rather odd. But see bug 25021.
append_file(File, Suffix) ->
case read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -183,9 +186,11 @@ append_file(File, 0, Suffix) ->
end
end);
append_file(File, _, Suffix) ->
- case with_fhc_handle(fun () -> prim_file:read_file(File) end) of
- {ok, Data} -> write_file([File, Suffix], Data, [append]);
- Error -> Error
+ case with_fhc_handle(2, fun () ->
+ file:copy(File, {[File, Suffix], [append]})
+ end) of
+ {ok, _BytesCopied} -> ok;
+ Error -> Error
end.
ensure_parent_dirs_exist(Filename) ->
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 17e2ffb472..3e058793ec 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
noreply(State);
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
- noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
+
+handle_cast({delete_and_terminate, Reason}, State) ->
+ {stop, Reason, State}.
handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
@@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, Msg),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([_CPid], _From, _Msg) ->
ok.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4e71cc43db..750bcd56e2 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,10 +127,21 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
+ Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ monitor_wait(MRefs),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
+monitor_wait([]) ->
+ ok;
+monitor_wait([MRef | MRefs]) ->
+ receive({'DOWN', MRef, process, _Pid, _Info}) ->
+ ok
+ end,
+ monitor_wait(MRefs).
+
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e412fbbc5d..03fafc3e5a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -351,20 +351,17 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
ok;
handle_msg([SPid], _From, {process_death, Pid}) ->
inform_deaths(SPid, [Pid]);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, {gm, Msg}),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
inform_deaths(SPid, Deaths) ->
- rabbit_misc:with_exit_handler(
- fun () -> {stop, normal} end,
- fun () ->
- case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
- ok ->
- ok;
- {promote, CPid} ->
- {become, rabbit_mirror_queue_coordinator, [CPid]}
- end
- end).
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
%% ---------------------------------------------------------------------------
%% Others
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b773f83ba9..bd5cf58845 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -313,8 +313,8 @@ handle_other(handshake_timeout, _Deb, State) ->
throw({handshake_timeout, State#v1.callback});
handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
mainloop(Deb, State);
-handle_other(timeout, _Deb, #v1{connection_state = S}) ->
- throw({timeout, S});
+handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
+ throw({heartbeat_timeout, S});
handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -683,7 +683,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
- ReceiveFun = fun() -> Parent ! timeout end,
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
@@ -736,8 +736,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-%% Compute frame_max for this instance. Could simply use 0, but breaks
-%% QPid Java client.
server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.