diff options
| -rw-r--r-- | docs/rabbitmq.config.example | 4 | ||||
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 20 | ||||
| -rw-r--r-- | scripts/rabbitmq-env.bat | 8 | ||||
| -rw-r--r-- | scripts/rabbitmq-server-ha.ocf | 17 | ||||
| -rw-r--r-- | scripts/rabbitmq-server.bat | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 6 | ||||
| -rw-r--r-- | src/vm_memory_monitor.erl | 28 |
8 files changed, 67 insertions, 25 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index dd1a4e6c87..6be9504b8b 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -193,6 +193,10 @@ %% %% {vm_memory_high_watermark, 0.4}, + %% Alternatively, we can set a limit (in megabytes) of RAM used by the node. + %% + %% {vm_memory_high_watermark, {absolute, 1024}}, + %% Fraction of the high watermark limit at which queues start to %% page message out to disc in order to free up memory. %% diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 4a5e315ecd..43d00418e6 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1230,6 +1230,11 @@ queue is non-exclusive.</para></listitem> </varlistentry> <varlistentry> + <term>exclusive</term> + <listitem><para>True if queue is exclusive (i.e. has + owner_pid), false otherwise</para></listitem> + </varlistentry> + <varlistentry> <term>exclusive_consumer_pid</term> <listitem><para>Id of the Erlang process representing the channel of the exclusive consumer subscribed to this queue. Empty if @@ -1925,6 +1930,21 @@ </variablelist> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>set_vm_memory_high_watermark absolute</command> <arg choice="req"><replaceable>memory_limit_mb</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>memory_limit_mb</term> + <listitem><para> + The new memory limit at which flow control is + triggered, expressed in MB as an integer number + greater than or equal to 0. + </para></listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> </variablelist> </refsect2> </refsect1> diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat index 7465072d19..ffc68035dc 100644 --- a/scripts/rabbitmq-env.bat +++ b/scripts/rabbitmq-env.bat @@ -214,9 +214,9 @@ REM [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} REM [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log" if "!RABBITMQ_LOGS!"=="" ( if "!LOGS!"=="" ( - set LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log + set RABBITMQ_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log ) else ( - set LOGS=!LOGS! + set RABBITMQ_LOGS=!LOGS! ) ) @@ -224,9 +224,9 @@ REM [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS} REM [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log" if "!RABBITMQ_SASL_LOGS!"=="" ( if "!SASL_LOGS!"=="" ( - set SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log + set RABBITMQ_SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log ) else ( - set SASL_LOGS=!SASL_LOGS! + set RABBITMQ_SASL_LOGS=!SASL_LOGS! ) ) diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf index 5ccda7b745..6404a7dbd2 100644 --- a/scripts/rabbitmq-server-ha.ocf +++ b/scripts/rabbitmq-server-ha.ocf @@ -286,7 +286,7 @@ END su_rabbit_cmd() { local timeout if [ "$1" = "-t" ]; then - timeout=="/usr/bin/timeout ${OCF_RESKEY_command_timeout} $2" + timeout="/usr/bin/timeout ${OCF_RESKEY_command_timeout} $2" shift 2 else timeout=$COMMAND_TIMEOUT @@ -404,7 +404,8 @@ rmq_setup_env() { # user for dir in ${PID_DIR} "${OCF_RESKEY_mnesia_base}" "${OCF_RESKEY_log_dir}"; do if test -e ${dir}; then - local files=$(su -s /bin/sh - $OCF_RESKEY_username -c "find ${dir} ! -writable") + local files + files=$(su -s /bin/sh - $OCF_RESKEY_username -c "find ${dir} ! -writable") if [ "${files}" ]; then ocf_log warn "Directory ${dir} is not writable by ${OCF_RESKEY_username}, chowning." chown -R ${OCF_RESKEY_username}:${OCF_RESKEY_groupname} "${dir}" @@ -539,7 +540,6 @@ get_running_nodes() { get_all_pacemaker_nodes() { echo `crm_node -l | awk '{print $2}' | grep -v "^$" | sed -e '/(null)/d'` - return $? } # Get alive cluster nodes in visible partition, but the specified one @@ -550,7 +550,6 @@ get_alive_pacemaker_nodes_but() else echo `crm_node -l -p | sed -e "s/${1}//g" | sed -e '/(null)/d'` fi - return $? } check_need_join_to() { @@ -1108,13 +1107,15 @@ check_timeouts() { fi local count - count=`crm_attribute -N $THIS_PCMK_NODE -l reboot --name $crm_attr_name --query 2>/dev/null | awk '{print $3}' | awk -F "=" '{print $2}' | sed -e '/(null)/d'` - if [ $? -ne 0 ]; then + count=`crm_attribute -N $THIS_PCMK_NODE -l reboot --name $crm_attr_name --query 2>/dev/null` + op_rc=$? + if [ $op_rc -ne 0 ]; then # the crm_attribute exited with error. In that case most probably it printed garbage # instead of the number we need. So defensively assume that it is zero. count=0 fi + count=`echo "${count}" | awk '{print $3}' | awk -F "=" '{print $2}' | sed -e '/(null)/d'` count=$((count+1)) # There is a slight chance that this piece of code will be executed twice simultaneously. @@ -1135,8 +1136,8 @@ wait_sync() { wait_time=$1 queues="${COMMAND_TIMEOUT} ${OCF_RESKEY_ctl} list_queues name state" - su_rabbit_cmd -t "${wait_time}s" "sh -c \"while $queues | grep -q 'syncing,'; \ - do sleep 1; done\"" + su_rabbit_cmd -t "${wait_time}" "sh -c \"while ${queues} | grep -q 'syncing,'; \ + do sleep 2; done\"" return $? } diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 8f75a486ec..62da2f6256 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -96,8 +96,8 @@ if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" ( !RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
--rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
--rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit error_logger {file,\""!RABBITMQ_LOGS:\=/!"\"} ^
+-rabbit sasl_error_logger {file,\""!RABBITMQ_SASL_LOGS:\=/!"\"} ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 15fde37c9c..452047fdb2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -92,7 +92,8 @@ durable, auto_delete, arguments, - owner_pid + owner_pid, + exclusive ]). -define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]). @@ -828,6 +829,8 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> ExclusiveOwner; +i(exclusive, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> + is_pid(ExclusiveOwner); i(policy, #q{q = Q}) -> case rabbit_policy:name(Q) of none -> ''; diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index fe0563bbc7..59223dc19b 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -410,6 +410,12 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> Inform("Setting memory threshold on ~p to ~p", [Node, Frac]), rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); +action(set_vm_memory_high_watermark, Node, ["absolute", Arg], _Opts, Inform) -> + Limit = list_to_integer(Arg), + Inform("Setting memory threshold on ~p to ~pMB", [Node, Limit]), + rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, + [{absolute, Limit}]); + action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Setting permissions for user \"~s\" in vhost \"~s\"", diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index bf9a77c174..fe68f50239 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -52,7 +52,7 @@ -record(state, {total_memory, memory_limit, - memory_fraction, + memory_config_limit, timeout, timer, alarmed, @@ -63,6 +63,7 @@ -ifdef(use_specs). +-type(vm_memory_high_watermark() :: (float() | {'absolute', integer()})). -spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()). -spec(start_link/3 :: (float(), fun ((any()) -> 'ok'), fun ((any()) -> 'ok')) -> rabbit_types:ok_pid_or_error()). @@ -70,8 +71,8 @@ -spec(get_vm_limit/0 :: () -> non_neg_integer()). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). --spec(get_vm_memory_high_watermark/0 :: () -> float()). --spec(set_vm_memory_high_watermark/1 :: (float()) -> 'ok'). +-spec(get_vm_memory_high_watermark/0 :: () -> vm_memory_high_watermark()). +-spec(set_vm_memory_high_watermark/1 :: (vm_memory_high_watermark()) -> 'ok'). -spec(get_memory_limit/0 :: () -> non_neg_integer()). -endif. @@ -128,11 +129,12 @@ init([MemFraction, AlarmFuns]) -> alarm_funs = AlarmFuns }, {ok, set_mem_limits(State, MemFraction)}. -handle_call(get_vm_memory_high_watermark, _From, State) -> - {reply, State#state.memory_fraction, State}; +handle_call(get_vm_memory_high_watermark, _From, + #state{memory_config_limit = MemLimit} = State) -> + {reply, MemLimit, State}; -handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) -> - {reply, ok, set_mem_limits(State, MemFraction)}; +handle_call({set_vm_memory_high_watermark, MemLimit}, _From, State) -> + {reply, ok, set_mem_limits(State, MemLimit)}; handle_call(get_check_interval, _From, State) -> {reply, State#state.timeout, State}; @@ -166,7 +168,7 @@ code_change(_OldVsn, State, _Extra) -> %% Server Internals %%---------------------------------------------------------------------------- -set_mem_limits(State, MemFraction) -> +set_mem_limits(State, MemLimit) -> case erlang:system_info(wordsize) of 4 -> error_logger:warning_msg( @@ -206,12 +208,18 @@ set_mem_limits(State, MemFraction) -> _ -> TotalMemory end, - MemLim = trunc(MemFraction * UsableMemory), + MemLim = interpret_limit(MemLimit, UsableMemory), error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n", [trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]), internal_update(State #state { total_memory = TotalMemory, memory_limit = MemLim, - memory_fraction = MemFraction}). + memory_config_limit = MemLimit}). + +interpret_limit({'absolute', MemLim}, UsableMemory) -> + %% Absolute memory is provided in MB + min(MemLim * ?ONE_MB, UsableMemory); +interpret_limit(MemFraction, UsableMemory) -> + trunc(MemFraction * UsableMemory). internal_update(State = #state { memory_limit = MemLimit, alarmed = Alarmed, |
