summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-02-09 19:13:10 +0000
committerMatthew Sackman <matthew@lshift.net>2010-02-09 19:13:10 +0000
commit2bce382b64f7f666079236453eb80808ea86326c (patch)
treecf5918a07e41620c64145625d6f03b79d008bf81 /src
parent071ee6ef2004dc906ebc0689538b93518faf1d7e (diff)
parenta193b226765e85f51a4334f7d725c46cfb815d03 (diff)
downloadrabbitmq-server-git-2bce382b64f7f666079236453eb80808ea86326c.tar.gz
Merging default into bug21673
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_limiter.erl12
-rw-r--r--src/rabbit_misc.erl34
-rw-r--r--src/rabbit_multi.erl27
-rw-r--r--src/vm_memory_monitor.erl27
5 files changed, 82 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e98c4366ed..a60a5590b7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -540,12 +540,14 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{ limiter_pid = LimiterPid }) ->
+ _, State = #ch{ limiter_pid = LimiterPid,
+ unacked_message_q = UAMQ }) ->
NewLimiterPid = case {LimiterPid, PrefetchCount} of
{undefined, 0} ->
undefined;
{undefined, _} ->
- LPid = rabbit_limiter:start_link(self()),
+ LPid = rabbit_limiter:start_link(self(),
+ queue:len(UAMQ)),
ok = limit_queues(LPid, State),
LPid;
{_, 0} ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 6bd803a27b..83df15ce20 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -35,7 +35,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
--export([start_link/1, shutdown/1]).
+-export([start_link/2, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1]).
@@ -45,7 +45,7 @@
-type(maybe_pid() :: pid() | 'undefined').
--spec(start_link/1 :: (pid()) -> pid()).
+-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
@@ -70,8 +70,8 @@
%% API
%%----------------------------------------------------------------------------
-start_link(ChPid) ->
- {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid], []),
+start_link(ChPid, UnackedMsgCount) ->
+ {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []),
Pid.
shutdown(undefined) ->
@@ -117,8 +117,8 @@ get_limit(Pid) ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([ChPid]) ->
- {ok, #lim{ch_pid = ChPid} }.
+init([ChPid, UnackedMsgCount]) ->
+ {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 92d03789b3..e30e65bf55 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -57,6 +57,7 @@
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([pid_to_string/1, string_to_pid/1]).
+-export([version_compare/2, version_compare/3]).
-import(mnesia).
-import(lists).
@@ -561,3 +562,36 @@ string_to_pid(Str) ->
%% invalid regexp - shouldn't happen
throw(Error)
end.
+
+version_compare(A, B, lte) ->
+ case version_compare(A, B) of
+ eq -> true;
+ lt -> true;
+ gt -> false
+ end;
+version_compare(A, B, gte) ->
+ case version_compare(A, B) of
+ eq -> true;
+ gt -> true;
+ lt -> false
+ end;
+version_compare(A, B, Result) ->
+ Result =:= version_compare(A, B).
+
+version_compare([], []) ->
+ eq;
+version_compare([], _ ) ->
+ lt; %% 2.3 < 2.3.1
+version_compare(_ , []) ->
+ gt; %% 2.3.1 > 2.3
+version_compare(A, B) ->
+ {AStr, ATl} = lists:splitwith(fun (X) -> X =/= $. end, A),
+ {BStr, BTl} = lists:splitwith(fun (X) -> X =/= $. end, B),
+ ANum = list_to_integer(AStr),
+ BNum = list_to_integer(BStr),
+ if ANum =:= BNum -> ATl1 = lists:dropwhile(fun (X) -> X =:= $. end, ATl),
+ BTl1 = lists:dropwhile(fun (X) -> X =:= $. end, BTl),
+ version_compare(ATl1, BTl1);
+ ANum < BNum -> lt;
+ ANum > BNum -> gt
+ end.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 4e77d7f93f..74e6ca68fc 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -187,7 +187,7 @@ start_node(Node, RpcTimeout) ->
io:format("Starting node ~s...~n", [Node]),
case rpc:call(Node, os, getpid, []) of
{badrpc, _} ->
- Port = run_cmd(script_filename()),
+ Port = run_rabbitmq_server(),
Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port),
Pid = case rpc:call(Node, os, getpid, []) of
{badrpc, _} -> throw(cannot_get_pid);
@@ -217,8 +217,22 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
end
end.
-run_cmd(FullPath) ->
- erlang:open_port({spawn, FullPath}, [nouse_stdio]).
+run_rabbitmq_server() ->
+ with_os([{unix, fun run_rabbitmq_server_unix/0},
+ {win32, fun run_rabbitmq_server_win32/0}]).
+
+run_rabbitmq_server_unix() ->
+ FullPath = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server",
+ erlang:open_port({spawn_executable, FullPath},
+ [{arg0, FullPath}, {args, ["-noinput"]}, nouse_stdio]).
+
+run_rabbitmq_server_win32() ->
+ Cmd = filename:nativename(os:find_executable("cmd")),
+ CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME")
+ ++ "\\rabbitmq-server.bat\" -noinput",
+ erlang:open_port({spawn_executable, Cmd},
+ [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]},
+ nouse_stdio, hide]).
is_rabbit_running(Node, RpcTimeout) ->
case rpc:call(Node, rabbit, status, [], RpcTimeout) of
@@ -236,13 +250,6 @@ with_os(Handlers) ->
Handler -> Handler()
end.
-script_filename() ->
- ScriptHome = getenv("RABBITMQ_SCRIPT_HOME"),
- ScriptName = with_os(
- [{unix , fun () -> "rabbitmq-server" end},
- {win32, fun () -> "rabbitmq-server.bat" end}]),
- ScriptHome ++ "/" ++ ScriptName ++ " -noinput".
-
pids_file() -> getenv("RABBITMQ_PIDS_FILE").
write_pids_file(Pids) ->
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 02bd04991e..0cdb41422f 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -261,14 +261,29 @@ get_total_memory({unix,freebsd}) ->
PageCount * PageSize;
get_total_memory({win32,_OSname}) ->
- %% Due to the Erlang print format bug, on Windows boxes the memory size is
- %% broken. For example Windows 7 64 bit with 4Gigs of RAM we get negative
- %% memory size:
+ %% Due to the Erlang print format bug, on Windows boxes the memory
+ %% size is broken. For example Windows 7 64 bit with 4Gigs of RAM
+ %% we get negative memory size:
%% > os_mon_sysinfo:get_mem_info().
%% ["76 -1658880 1016913920 -1 -1021628416 2147352576 2134794240\n"]
- %% Due to this bug, we don't actually know anything. Even if the number is
- %% postive we can't be sure if it's correct.
- unknown;
+ %% Due to this bug, we don't actually know anything. Even if the
+ %% number is postive we can't be sure if it's correct. This only
+ %% affects us on os_mon versions prior to 2.2.1.
+ case application:get_key(os_mon, vsn) of
+ undefined ->
+ unknown;
+ {ok, Version} ->
+ case rabbit_misc:version_compare(Version, "2.2.1", lt) of
+ true -> %% os_mon is < 2.2.1, so we know nothing
+ unknown;
+ false ->
+ [Result|_] = os_mon_sysinfo:get_mem_info(),
+ {ok, [_MemLoad, TotPhys, _AvailPhys,
+ _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} =
+ io_lib:fread("~d~d~d~d~d~d~d", Result),
+ TotPhys
+ end
+ end;
get_total_memory({unix, linux}) ->
File = read_proc_file("/proc/meminfo"),