summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-04-18 16:02:57 -0700
committerMichael Klishin <michael@clojurewerkz.org>2016-04-18 16:02:57 -0700
commitcc258d7a04d336883c12277b1a432f8c9a630e35 (patch)
tree2231bc0e92b2cc0f61c5037e9b399865a642bff5
parente5724d797f18aec8ab86b9a4054e1183a5bc063f (diff)
parent57c4a7af72268b8cf88efd2ec774616dd14aa31f (diff)
downloadrabbitmq-server-git-cc258d7a04d336883c12277b1a432f8c9a630e35.tar.gz
Merge branch 'stable' into rabbitmq-server-698
-rw-r--r--scripts/rabbitmq-defaults.bat4
-rw-r--r--scripts/rabbitmq-env11
-rw-r--r--scripts/rabbitmq-env.bat4
-rwxr-xr-xscripts/rabbitmq-server2
-rwxr-xr-xscripts/rabbitmq-server-ha.ocf5
-rw-r--r--src/gm.erl351
-rw-r--r--src/rabbit_control_main.erl10
-rw-r--r--src/rabbit_log.erl10
-rw-r--r--src/rabbit_vm.erl2
9 files changed, 251 insertions, 148 deletions
diff --git a/scripts/rabbitmq-defaults.bat b/scripts/rabbitmq-defaults.bat
index 27edd0d11e..8fff5ea827 100644
--- a/scripts/rabbitmq-defaults.bat
+++ b/scripts/rabbitmq-defaults.bat
@@ -46,6 +46,4 @@ REM PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
for /f "delims=" %%F in ("!TDP0!..\plugins") do set PLUGINS_DIR=%%~dpsF%%~nF%%~xF
REM CONF_ENV_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq-env.conf
-if "!RABBITMQ_CONF_ENV_FILE!"=="" (
- set RABBITMQ_CONF_ENV_FILE=!RABBITMQ_BASE!\rabbitmq-env-conf.bat
-)
+set CONF_ENV_FILE=!RABBITMQ_BASE!\rabbitmq-env-conf.bat
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index dffed035ea..35239620ca 100644
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -65,20 +65,15 @@ RABBITMQ_HOME="$(rmq_realpath "${RABBITMQ_SCRIPTS_DIR}/..")"
## Common defaults
SERVER_ERL_ARGS="+P 1048576"
-# warn about old rabbitmq.conf file, if no new one
-if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
- [ ! -f ${CONF_ENV_FILE} ] ; then
- echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
- echo "location has moved to ${CONF_ENV_FILE}"
-fi
-
# We save the current value of $RABBITMQ_PID_FILE in case it was set by
# an init script. If $CONF_ENV_FILE overrides it again, we must ignore
# it and warn the user.
saved_RABBITMQ_PID_FILE=$RABBITMQ_PID_FILE
## Get configuration variables from the configure environment file
-[ -f ${CONF_ENV_FILE} ] && . ${CONF_ENV_FILE} || true
+[ "x" = "x$RABBITMQ_CONF_ENV_FILE" ] && RABBITMQ_CONF_ENV_FILE=${CONF_ENV_FILE}
+
+[ -f ${RABBITMQ_CONF_ENV_FILE} ] && . ${RABBITMQ_CONF_ENV_FILE} || true
if [ "$saved_RABBITMQ_PID_FILE" -a \
"$saved_RABBITMQ_PID_FILE" != "$RABBITMQ_PID_FILE" ]; then
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index d5df9ddbd6..ea9cf45962 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -47,6 +47,10 @@ REM set SERVER_ERL_ARGS=+P 1048576
REM ## Get configuration variables from the configure environment file
REM [ -f ${CONF_ENV_FILE} ] && . ${CONF_ENV_FILE} || true
+if "!RABBITMQ_CONF_ENV_FILE!"=="" (
+ set RABBITMQ_CONF_ENV_FILE=!CONF_ENV_FILE!
+)
+
if exist "!RABBITMQ_CONF_ENV_FILE!" (
call "!RABBITMQ_CONF_ENV_FILE!"
)
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 8ed7319b41..ab2975feb1 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -187,7 +187,7 @@ check_not_empty() {
eval value=\$$name
if [ -z "$value" ]; then
echo "Error: ENV variable should be defined: $1.
- Please check rabbitmq-env, rabbitmq-defaults, and $CONF_ENV_FILE script files"
+ Please check rabbitmq-env, rabbitmq-defaults, and ${RABBITMQ_CONF_ENV_FILE} script files"
exit 78
fi
}
diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf
index ae7991833b..301f7a1fc7 100755
--- a/scripts/rabbitmq-server-ha.ocf
+++ b/scripts/rabbitmq-server-ha.ocf
@@ -678,8 +678,8 @@ reset_mnesia() {
# remove mnesia files, if required
if $make_amnesia ; then
kill_rmq_and_remove_pid
- ocf_run rm -rf "${MNESIA_FILES}/*"
- ocf_log warn "${LH} Mnesia files appear corrupted and have been removed."
+ ocf_run rm -rf "${MNESIA_FILES}"
+ ocf_log warn "${LH} Mnesia files appear corrupted and have been removed from ${MNESIA_FILES}."
fi
# always return OCF SUCCESS
return $OCF_SUCCESS
@@ -1479,6 +1479,7 @@ get_monitor() {
if [ -n "$master_name" ]; then
ocf_log info "${LH} master exists and rabbit app is not running. Exiting to be restarted by pacemaker"
+ stop_server_process
rc=$OCF_ERR_GENERIC
fi
fi
diff --git a/src/gm.erl b/src/gm.erl
index aeb050e15f..199cf7c4de 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -617,14 +617,20 @@ handle_call({add_on_right, NewMember}, _From,
group_name = GroupName,
members_state = MembersState,
txn_executor = TxnFun }) ->
- Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun),
- View1 = group_to_view(Group),
- MembersState1 = remove_erased_members(MembersState, View1),
- ok = send_right(NewMember, View1,
- {catchup, Self, prepare_members_state(MembersState1)}),
- {Result, State1} = change_view(View1, State #state {
- members_state = MembersState1 }),
- handle_callback_result({Result, {ok, Group}, State1}).
+ try
+ Group = record_new_member_in_group(
+ NewMember, Self, GroupName, TxnFun),
+ View1 = group_to_view(check_membership(Self, Group)),
+ MembersState1 = remove_erased_members(MembersState, View1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self, prepare_members_state(MembersState1)}),
+ {Result, State1} = change_view(View1, State #state {
+ members_state = MembersState1 }),
+ handle_callback_result({Result, {ok, Group}, State1})
+ catch
+ lost_membership ->
+ {stop, normal, State}
+ end.
%% add_on_right causes a catchup to be sent immediately from the left,
%% so we can never see this from the left neighbour. However, it's
@@ -638,19 +644,28 @@ handle_cast({?TAG, _ReqVer, check_neighbours},
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
+ self = Self,
members_state = MembersState,
group_name = GroupName }) ->
- {Result, State1} =
- case needs_view_update(ReqVer, View) of
- true -> View1 = group_to_view(dirty_read_group(GroupName)),
- MemberState1 = remove_erased_members(MembersState, View1),
- change_view(View1, State #state {
- members_state = MemberState1 });
- false -> {ok, State}
- end,
- handle_callback_result(
- if_callback_success(
- Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
+ try
+ {Result, State1} =
+ case needs_view_update(ReqVer, View) of
+ true ->
+ View1 = group_to_view(
+ check_membership(Self,
+ dirty_read_group(GroupName))),
+ MemberState1 = remove_erased_members(MembersState, View1),
+ change_view(View1, State #state {
+ members_state = MemberState1 });
+ false -> {ok, State}
+ end,
+ handle_callback_result(
+ if_callback_success(
+ Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1))
+ catch
+ lost_membership ->
+ {stop, normal, State}
+ end;
handle_cast({broadcast, _Msg, _SizeHint},
State = #state { shutting_down = {true, _} }) ->
@@ -724,39 +739,44 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
group_name = GroupName,
confirms = Confirms,
txn_executor = TxnFun }) ->
- Member = case {Left, Right} of
- {{Member1, MRef}, _} -> Member1;
- {_, {Member1, MRef}} -> Member1;
- _ -> undefined
- end,
- case {Member, Reason} of
- {undefined, _} ->
- noreply(State);
- {_, {shutdown, ring_shutdown}} ->
- noreply(State);
- _ ->
- %% In the event of a partial partition we could see another member
- %% go down and then remove them from Mnesia. While they can
- %% recover from this they'd have to restart the queue - not
- %% ideal. So let's sleep here briefly just in case this was caused
- %% by a partial partition; in which case by the time we record the
- %% member death in Mnesia we will probably be in a full
- %% partition and will not be assassinating another member.
- timer:sleep(100),
- View1 = group_to_view(record_dead_member_in_group(
- Member, GroupName, TxnFun)),
- handle_callback_result(
- case alive_view_members(View1) of
- [Self] -> maybe_erase_aliases(
- State #state {
- members_state = blank_member_state(),
- confirms = purge_confirms(Confirms) },
- View1);
- _ -> change_view(View1, State)
- end)
+ try
+ check_membership(GroupName),
+ Member = case {Left, Right} of
+ {{Member1, MRef}, _} -> Member1;
+ {_, {Member1, MRef}} -> Member1;
+ _ -> undefined
+ end,
+ case {Member, Reason} of
+ {undefined, _} ->
+ noreply(State);
+ {_, {shutdown, ring_shutdown}} ->
+ noreply(State);
+ _ ->
+ %% In the event of a partial partition we could see another member
+ %% go down and then remove them from Mnesia. While they can
+ %% recover from this they'd have to restart the queue - not
+ %% ideal. So let's sleep here briefly just in case this was caused
+ %% by a partial partition; in which case by the time we record the
+ %% member death in Mnesia we will probably be in a full
+ %% partition and will not be assassinating another member.
+ timer:sleep(100),
+ View1 = group_to_view(record_dead_member_in_group(Self,
+ Member, GroupName, TxnFun, true)),
+ handle_callback_result(
+ case alive_view_members(View1) of
+ [Self] -> maybe_erase_aliases(
+ State #state {
+ members_state = blank_member_state(),
+ confirms = purge_confirms(Confirms) },
+ View1);
+ _ -> change_view(View1, State)
+ end)
+ end
+ catch
+ lost_membership ->
+ {stop, normal, State}
end.
-
terminate(Reason, #state { module = Module, callback_args = Args }) ->
Module:handle_terminate(Args, Reason).
@@ -841,52 +861,30 @@ handle_msg({catchup, _NotLeft, _MembersState}, State) ->
handle_msg({activity, Left, Activity},
State = #state { self = Self,
+ group_name = GroupName,
left = {Left, _MRefL},
view = View,
members_state = MembersState,
confirms = Confirms })
when MembersState =/= undefined ->
- {MembersState1, {Confirms1, Activity1}} =
- lists:foldl(
- fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
- with_member_acc(
- fun (Member = #member { pending_ack = PA,
- last_pub = LP,
- last_ack = LA },
- {Confirms2, Activity2}) ->
- case is_member_alias(Id, Self, View) of
- true ->
- {ToAck, PA1} =
- find_common(queue_from_pubs(Pubs), PA,
- queue:new()),
- LA1 = last_ack(Acks, LA),
- AckNums = acks_from_queue(ToAck),
- Confirms3 = maybe_confirm(
- Self, Id, Confirms2, AckNums),
- {Member #member { pending_ack = PA1,
- last_ack = LA1 },
- {Confirms3,
- activity_cons(
- Id, [], AckNums, Activity2)}};
- false ->
- PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
- LA1 = last_ack(Acks, LA),
- LP1 = last_pub(Pubs, LP),
- {Member #member { pending_ack = PA1,
- last_pub = LP1,
- last_ack = LA1 },
- {Confirms2,
- activity_cons(Id, Pubs, Acks, Activity2)}}
- end
- end, Id, MembersStateConfirmsActivity)
- end, {MembersState, {Confirms, activity_nil()}}, Activity),
- State1 = State #state { members_state = MembersState1,
- confirms = Confirms1 },
- Activity3 = activity_finalise(Activity1),
- ok = maybe_send_activity(Activity3, State1),
- {Result, State2} = maybe_erase_aliases(State1, View),
- if_callback_success(
- Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
+ try
+ %% If we have to stop, do it asap so we avoid any ack confirmation
+ %% Membership must be checked again by erase_members_in_group, as the
+ %% node can be marked as dead on the meanwhile
+ check_membership(GroupName),
+ {MembersState1, {Confirms1, Activity1}} =
+ calculate_activity(MembersState, Confirms, Activity, Self, View),
+ State1 = State #state { members_state = MembersState1,
+ confirms = Confirms1 },
+ Activity3 = activity_finalise(Activity1),
+ ok = maybe_send_activity(Activity3, State1),
+ {Result, State2} = maybe_erase_aliases(State1, View),
+ if_callback_success(
+ Result, fun activity_true/3, fun activity_false/3, Activity3, State2)
+ catch
+ lost_membership ->
+ {{stop, normal}, State}
+ end;
handle_msg({activity, _NotLeft, _Activity}, State) ->
{ok, State}.
@@ -1091,8 +1089,8 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
fun () ->
join_group(
Self, GroupName,
- record_dead_member_in_group(
- Left, GroupName, TxnFun),
+ record_dead_member_in_group(Self,
+ Left, GroupName, TxnFun, false),
TxnFun)
end,
try
@@ -1142,47 +1140,84 @@ prune_or_create_group(Self, GroupName, TxnFun) ->
end
end).
-record_dead_member_in_group(Member, GroupName, TxnFun) ->
- TxnFun(
- fun () ->
- Group = #gm_group { members = Members, version = Ver } =
- read_group(GroupName),
- case lists:splitwith(
- fun (Member1) -> Member1 =/= Member end, Members) of
- {_Members1, []} -> %% not found - already recorded dead
- Group;
- {Members1, [Member | Members2]} ->
- Members3 = Members1 ++ [{dead, Member} | Members2],
- write_group(Group #gm_group { members = Members3,
- version = Ver + 1 })
- end
- end).
+record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) ->
+ Fun =
+ fun () ->
+ try
+ Group = #gm_group { members = Members, version = Ver } =
+ case Verify of
+ true ->
+ check_membership(Self, read_group(GroupName));
+ false ->
+ read_group(GroupName)
+ end,
+ case lists:splitwith(
+ fun (Member1) -> Member1 =/= Member end, Members) of
+ {_Members1, []} -> %% not found - already recorded dead
+ Group;
+ {Members1, [Member | Members2]} ->
+ Members3 = Members1 ++ [{dead, Member} | Members2],
+ write_group(Group #gm_group { members = Members3,
+ version = Ver + 1 })
+ end
+ catch
+ lost_membership ->
+ %% The transaction must not be abruptly crashed, but
+ %% leave the gen_server to stop normally
+ {error, lost_membership}
+ end
+ end,
+ handle_lost_membership_in_txn(TxnFun, Fun).
+
+handle_lost_membership_in_txn(TxnFun, Fun) ->
+ case TxnFun(Fun) of
+ {error, lost_membership = T} ->
+ throw(T);
+ Any ->
+ Any
+ end.
record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
- TxnFun(
- fun () ->
- Group = #gm_group { members = Members, version = Ver } =
- read_group(GroupName),
- {Prefix, [Left | Suffix]} =
- lists:splitwith(fun (M) -> M =/= Left end, Members),
- write_group(Group #gm_group {
- members = Prefix ++ [Left, NewMember | Suffix],
- version = Ver + 1 })
- end).
+ Fun =
+ fun () ->
+ try
+ Group = #gm_group { members = Members, version = Ver } =
+ check_membership(Left, read_group(GroupName)),
+ {Prefix, [Left | Suffix]} =
+ lists:splitwith(fun (M) -> M =/= Left end, Members),
+ write_group(Group #gm_group {
+ members = Prefix ++ [Left, NewMember | Suffix],
+ version = Ver + 1 })
+ catch
+ lost_membership ->
+ %% The transaction must not be abruptly crashed, but
+ %% leave the gen_server to stop normally
+ {error, lost_membership}
+ end
+ end,
+ handle_lost_membership_in_txn(TxnFun, Fun).
-erase_members_in_group(Members, GroupName, TxnFun) ->
+erase_members_in_group(Self, Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
- TxnFun(
- fun () ->
- Group = #gm_group { members = [_|_] = Members1, version = Ver } =
- read_group(GroupName),
- case Members1 -- DeadMembers of
- Members1 -> Group;
- Members2 -> write_group(
- Group #gm_group { members = Members2,
- version = Ver + 1 })
+ Fun =
+ fun () ->
+ try
+ Group = #gm_group { members = [_|_] = Members1, version = Ver } =
+ check_membership(Self, read_group(GroupName)),
+ case Members1 -- DeadMembers of
+ Members1 -> Group;
+ Members2 -> write_group(
+ Group #gm_group { members = Members2,
+ version = Ver + 1 })
+ end
+ catch
+ lost_membership ->
+ %% The transaction must not be abruptly crashed, but
+ %% leave the gen_server to stop normally
+ {error, lost_membership}
end
- end).
+ end,
+ handle_lost_membership_in_txn(TxnFun, Fun).
maybe_erase_aliases(State = #state { self = Self,
group_name = GroupName,
@@ -1203,7 +1238,7 @@ maybe_erase_aliases(State = #state { self = Self,
View1 = case Erasable of
[] -> View;
_ -> group_to_view(
- erase_members_in_group(Erasable, GroupName, TxnFun))
+ erase_members_in_group(Self, Erasable, GroupName, TxnFun))
end,
change_view(View1, State #state { members_state = MembersState1 }).
@@ -1378,6 +1413,41 @@ maybe_send_activity(Activity, #state { self = Self,
send_right(Right, View, Msg) ->
ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}).
+calculate_activity(MembersState, Confirms, Activity, Self, View) ->
+ lists:foldl(
+ fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
+ with_member_acc(
+ fun (Member = #member { pending_ack = PA,
+ last_pub = LP,
+ last_ack = LA },
+ {Confirms2, Activity2}) ->
+ case is_member_alias(Id, Self, View) of
+ true ->
+ {ToAck, PA1} =
+ find_common(queue_from_pubs(Pubs), PA,
+ queue:new()),
+ LA1 = last_ack(Acks, LA),
+ AckNums = acks_from_queue(ToAck),
+ Confirms3 = maybe_confirm(
+ Self, Id, Confirms2, AckNums),
+ {Member #member { pending_ack = PA1,
+ last_ack = LA1 },
+ {Confirms3,
+ activity_cons(
+ Id, [], AckNums, Activity2)}};
+ false ->
+ PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
+ LA1 = last_ack(Acks, LA),
+ LP1 = last_pub(Pubs, LP),
+ {Member #member { pending_ack = PA1,
+ last_pub = LP1,
+ last_ack = LA1 },
+ {Confirms2,
+ activity_cons(Id, Pubs, Acks, Activity2)}}
+ end
+ end, Id, MembersStateConfirmsActivity)
+ end, {MembersState, {Confirms, activity_nil()}}, Activity).
+
callback(Args, Module, Activity) ->
Result =
lists:foldl(
@@ -1530,3 +1600,24 @@ call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout).
cast(Pid, Msg) -> gen_server2:cast(Pid, Msg).
monitor(Pid) -> erlang:monitor(process, Pid).
demonitor(MRef) -> erlang:demonitor(MRef).
+
+check_membership(Self, #gm_group{members = M} = Group) ->
+ case lists:member(Self, M) of
+ true ->
+ Group;
+ false ->
+ throw(lost_membership)
+ end.
+
+check_membership(GroupName) ->
+ case dirty_read_group(GroupName) of
+ #gm_group{members = M} ->
+ case lists:keymember(self(), 2, M) of
+ true ->
+ ok;
+ false ->
+ throw(lost_membership)
+ end;
+ {error, not_found} ->
+ throw(lost_membership)
+ end.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index d4d6166276..f63694b657 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -508,9 +508,15 @@ action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts),
ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)),
Inform(Msg, [Key, Pattern, Defn, PriorityArg]),
- rpc_call(
+ Res = rpc_call(
Node, rabbit_policy, parse_set,
- [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]);
+ [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]),
+ case Res of
+ {error, Format, Args} when is_list(Format) andalso is_list(Args) ->
+ {error_string, rabbit_misc:format(Format, Args)};
+ _ ->
+ Res
+ end;
action(clear_policy, Node, [Key], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index c6081fad0d..ed73a293ca 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -96,10 +96,18 @@ with_local_io(Fun) ->
Node = node(),
case node(GL) of
Node -> Fun();
- _ -> group_leader(whereis(user), self()),
+ _ -> set_group_leader_to_user(),
try
Fun()
after
group_leader(GL, self())
end
end.
+
+set_group_leader_to_user() ->
+ case whereis(user) of
+ undefined ->
+ warning("the 'user' I/O process has terminated, some features will fail until Erlang VM is restarted");
+ User ->
+ group_leader(User, self())
+ end.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index d5f7328fec..82effb4fc5 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -45,7 +45,7 @@ memory() ->
Mnesia = mnesia_memory(),
MsgIndexETS = ets_memory([msg_store_persistent, msg_store_transient]),
- MgmtDbETS = ets_memory([rabbit_mgmt_db]),
+ MgmtDbETS = ets_memory([rabbit_mgmt_event_collector]),
[{total, Total},
{processes, Processes},