summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmq.config.example4
-rw-r--r--docs/rabbitmqctl.1.xml20
-rw-r--r--scripts/rabbitmq-env.bat8
-rw-r--r--scripts/rabbitmq-server-ha.ocf17
-rw-r--r--scripts/rabbitmq-server.bat4
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_control_main.erl6
-rw-r--r--src/vm_memory_monitor.erl28
8 files changed, 67 insertions, 25 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index dd1a4e6c87..6be9504b8b 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -193,6 +193,10 @@
%%
%% {vm_memory_high_watermark, 0.4},
+ %% Alternatively, we can set a limit (in megabytes) of RAM used by the node.
+ %%
+ %% {vm_memory_high_watermark, {absolute, 1024}},
+
%% Fraction of the high watermark limit at which queues start to
%% page message out to disc in order to free up memory.
%%
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 4a5e315ecd..43d00418e6 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1230,6 +1230,11 @@
queue is non-exclusive.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>exclusive</term>
+ <listitem><para>True if queue is exclusive (i.e. has
+ owner_pid), false otherwise</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>exclusive_consumer_pid</term>
<listitem><para>Id of the Erlang process representing the channel of the
exclusive consumer subscribed to this queue. Empty if
@@ -1925,6 +1930,21 @@
</variablelist>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_vm_memory_high_watermark absolute</command> <arg choice="req"><replaceable>memory_limit_mb</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>memory_limit_mb</term>
+ <listitem><para>
+ The new memory limit at which flow control is
+ triggered, expressed in MB as an integer number
+ greater than or equal to 0.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
</refsect1>
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index 7465072d19..ffc68035dc 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -214,9 +214,9 @@ REM [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
REM [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
if "!RABBITMQ_LOGS!"=="" (
if "!LOGS!"=="" (
- set LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log
+ set RABBITMQ_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log
) else (
- set LOGS=!LOGS!
+ set RABBITMQ_LOGS=!LOGS!
)
)
@@ -224,9 +224,9 @@ REM [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS}
REM [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log"
if "!RABBITMQ_SASL_LOGS!"=="" (
if "!SASL_LOGS!"=="" (
- set SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log
+ set RABBITMQ_SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log
) else (
- set SASL_LOGS=!SASL_LOGS!
+ set RABBITMQ_SASL_LOGS=!SASL_LOGS!
)
)
diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf
index 5ccda7b745..6404a7dbd2 100644
--- a/scripts/rabbitmq-server-ha.ocf
+++ b/scripts/rabbitmq-server-ha.ocf
@@ -286,7 +286,7 @@ END
su_rabbit_cmd() {
local timeout
if [ "$1" = "-t" ]; then
- timeout=="/usr/bin/timeout ${OCF_RESKEY_command_timeout} $2"
+ timeout="/usr/bin/timeout ${OCF_RESKEY_command_timeout} $2"
shift 2
else
timeout=$COMMAND_TIMEOUT
@@ -404,7 +404,8 @@ rmq_setup_env() {
# user
for dir in ${PID_DIR} "${OCF_RESKEY_mnesia_base}" "${OCF_RESKEY_log_dir}"; do
if test -e ${dir}; then
- local files=$(su -s /bin/sh - $OCF_RESKEY_username -c "find ${dir} ! -writable")
+ local files
+ files=$(su -s /bin/sh - $OCF_RESKEY_username -c "find ${dir} ! -writable")
if [ "${files}" ]; then
ocf_log warn "Directory ${dir} is not writable by ${OCF_RESKEY_username}, chowning."
chown -R ${OCF_RESKEY_username}:${OCF_RESKEY_groupname} "${dir}"
@@ -539,7 +540,6 @@ get_running_nodes() {
get_all_pacemaker_nodes()
{
echo `crm_node -l | awk '{print $2}' | grep -v "^$" | sed -e '/(null)/d'`
- return $?
}
# Get alive cluster nodes in visible partition, but the specified one
@@ -550,7 +550,6 @@ get_alive_pacemaker_nodes_but()
else
echo `crm_node -l -p | sed -e "s/${1}//g" | sed -e '/(null)/d'`
fi
- return $?
}
check_need_join_to() {
@@ -1108,13 +1107,15 @@ check_timeouts() {
fi
local count
- count=`crm_attribute -N $THIS_PCMK_NODE -l reboot --name $crm_attr_name --query 2>/dev/null | awk '{print $3}' | awk -F "=" '{print $2}' | sed -e '/(null)/d'`
- if [ $? -ne 0 ]; then
+ count=`crm_attribute -N $THIS_PCMK_NODE -l reboot --name $crm_attr_name --query 2>/dev/null`
+ op_rc=$?
+ if [ $op_rc -ne 0 ]; then
# the crm_attribute exited with error. In that case most probably it printed garbage
# instead of the number we need. So defensively assume that it is zero.
count=0
fi
+ count=`echo "${count}" | awk '{print $3}' | awk -F "=" '{print $2}' | sed -e '/(null)/d'`
count=$((count+1))
# There is a slight chance that this piece of code will be executed twice simultaneously.
@@ -1135,8 +1136,8 @@ wait_sync() {
wait_time=$1
queues="${COMMAND_TIMEOUT} ${OCF_RESKEY_ctl} list_queues name state"
- su_rabbit_cmd -t "${wait_time}s" "sh -c \"while $queues | grep -q 'syncing,'; \
- do sleep 1; done\""
+ su_rabbit_cmd -t "${wait_time}" "sh -c \"while ${queues} | grep -q 'syncing,'; \
+ do sleep 2; done\""
return $?
}
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 8f75a486ec..62da2f6256 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -96,8 +96,8 @@ if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
--rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
--rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit error_logger {file,\""!RABBITMQ_LOGS:\=/!"\"} ^
+-rabbit sasl_error_logger {file,\""!RABBITMQ_SASL_LOGS:\=/!"\"} ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 15fde37c9c..452047fdb2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -92,7 +92,8 @@
durable,
auto_delete,
arguments,
- owner_pid
+ owner_pid,
+ exclusive
]).
-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
@@ -828,6 +829,8 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
ExclusiveOwner;
+i(exclusive, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
+ is_pid(ExclusiveOwner);
i(policy, #q{q = Q}) ->
case rabbit_policy:name(Q) of
none -> '';
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index fe0563bbc7..59223dc19b 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -410,6 +410,12 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
Inform("Setting memory threshold on ~p to ~p", [Node, Frac]),
rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
+action(set_vm_memory_high_watermark, Node, ["absolute", Arg], _Opts, Inform) ->
+ Limit = list_to_integer(Arg),
+ Inform("Setting memory threshold on ~p to ~pMB", [Node, Limit]),
+ rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark,
+ [{absolute, Limit}]);
+
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Setting permissions for user \"~s\" in vhost \"~s\"",
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index bf9a77c174..fe68f50239 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -52,7 +52,7 @@
-record(state, {total_memory,
memory_limit,
- memory_fraction,
+ memory_config_limit,
timeout,
timer,
alarmed,
@@ -63,6 +63,7 @@
-ifdef(use_specs).
+-type(vm_memory_high_watermark() :: (float() | {'absolute', integer()})).
-spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()).
-spec(start_link/3 :: (float(), fun ((any()) -> 'ok'),
fun ((any()) -> 'ok')) -> rabbit_types:ok_pid_or_error()).
@@ -70,8 +71,8 @@
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
--spec(get_vm_memory_high_watermark/0 :: () -> float()).
--spec(set_vm_memory_high_watermark/1 :: (float()) -> 'ok').
+-spec(get_vm_memory_high_watermark/0 :: () -> vm_memory_high_watermark()).
+-spec(set_vm_memory_high_watermark/1 :: (vm_memory_high_watermark()) -> 'ok').
-spec(get_memory_limit/0 :: () -> non_neg_integer()).
-endif.
@@ -128,11 +129,12 @@ init([MemFraction, AlarmFuns]) ->
alarm_funs = AlarmFuns },
{ok, set_mem_limits(State, MemFraction)}.
-handle_call(get_vm_memory_high_watermark, _From, State) ->
- {reply, State#state.memory_fraction, State};
+handle_call(get_vm_memory_high_watermark, _From,
+ #state{memory_config_limit = MemLimit} = State) ->
+ {reply, MemLimit, State};
-handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) ->
- {reply, ok, set_mem_limits(State, MemFraction)};
+handle_call({set_vm_memory_high_watermark, MemLimit}, _From, State) ->
+ {reply, ok, set_mem_limits(State, MemLimit)};
handle_call(get_check_interval, _From, State) ->
{reply, State#state.timeout, State};
@@ -166,7 +168,7 @@ code_change(_OldVsn, State, _Extra) ->
%% Server Internals
%%----------------------------------------------------------------------------
-set_mem_limits(State, MemFraction) ->
+set_mem_limits(State, MemLimit) ->
case erlang:system_info(wordsize) of
4 ->
error_logger:warning_msg(
@@ -206,12 +208,18 @@ set_mem_limits(State, MemFraction) ->
_ ->
TotalMemory
end,
- MemLim = trunc(MemFraction * UsableMemory),
+ MemLim = interpret_limit(MemLimit, UsableMemory),
error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n",
[trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]),
internal_update(State #state { total_memory = TotalMemory,
memory_limit = MemLim,
- memory_fraction = MemFraction}).
+ memory_config_limit = MemLimit}).
+
+interpret_limit({'absolute', MemLim}, UsableMemory) ->
+ %% Absolute memory is provided in MB
+ min(MemLim * ?ONE_MB, UsableMemory);
+interpret_limit(MemFraction, UsableMemory) ->
+ trunc(MemFraction * UsableMemory).
internal_update(State = #state { memory_limit = MemLimit,
alarmed = Alarmed,