diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-02-09 19:13:10 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-02-09 19:13:10 +0000 |
| commit | 2bce382b64f7f666079236453eb80808ea86326c (patch) | |
| tree | cf5918a07e41620c64145625d6f03b79d008bf81 /src | |
| parent | 071ee6ef2004dc906ebc0689538b93518faf1d7e (diff) | |
| parent | a193b226765e85f51a4334f7d725c46cfb815d03 (diff) | |
| download | rabbitmq-server-git-2bce382b64f7f666079236453eb80808ea86326c.tar.gz | |
Merging default into bug21673
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 27 | ||||
| -rw-r--r-- | src/vm_memory_monitor.erl | 27 |
5 files changed, 82 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e98c4366ed..a60a5590b7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -540,12 +540,14 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid }) -> + _, State = #ch{ limiter_pid = LimiterPid, + unacked_message_q = UAMQ }) -> NewLimiterPid = case {LimiterPid, PrefetchCount} of {undefined, 0} -> undefined; {undefined, _} -> - LPid = rabbit_limiter:start_link(self()), + LPid = rabbit_limiter:start_link(self(), + queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid; {_, 0} -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 6bd803a27b..83df15ce20 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -35,7 +35,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([start_link/1, shutdown/1]). +-export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1]). @@ -45,7 +45,7 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/1 :: (pid()) -> pid()). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). @@ -70,8 +70,8 @@ %% API %%---------------------------------------------------------------------------- -start_link(ChPid) -> - {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid], []), +start_link(ChPid, UnackedMsgCount) -> + {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []), Pid. shutdown(undefined) -> @@ -117,8 +117,8 @@ get_limit(Pid) -> %% gen_server callbacks %%---------------------------------------------------------------------------- -init([ChPid]) -> - {ok, #lim{ch_pid = ChPid} }. +init([ChPid, UnackedMsgCount]) -> + {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 92d03789b3..e30e65bf55 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -57,6 +57,7 @@ -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). -export([pid_to_string/1, string_to_pid/1]). +-export([version_compare/2, version_compare/3]). -import(mnesia). -import(lists). @@ -561,3 +562,36 @@ string_to_pid(Str) -> %% invalid regexp - shouldn't happen throw(Error) end. + +version_compare(A, B, lte) -> + case version_compare(A, B) of + eq -> true; + lt -> true; + gt -> false + end; +version_compare(A, B, gte) -> + case version_compare(A, B) of + eq -> true; + gt -> true; + lt -> false + end; +version_compare(A, B, Result) -> + Result =:= version_compare(A, B). + +version_compare([], []) -> + eq; +version_compare([], _ ) -> + lt; %% 2.3 < 2.3.1 +version_compare(_ , []) -> + gt; %% 2.3.1 > 2.3 +version_compare(A, B) -> + {AStr, ATl} = lists:splitwith(fun (X) -> X =/= $. end, A), + {BStr, BTl} = lists:splitwith(fun (X) -> X =/= $. end, B), + ANum = list_to_integer(AStr), + BNum = list_to_integer(BStr), + if ANum =:= BNum -> ATl1 = lists:dropwhile(fun (X) -> X =:= $. end, ATl), + BTl1 = lists:dropwhile(fun (X) -> X =:= $. end, BTl), + version_compare(ATl1, BTl1); + ANum < BNum -> lt; + ANum > BNum -> gt + end. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 4e77d7f93f..74e6ca68fc 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -187,7 +187,7 @@ start_node(Node, RpcTimeout) -> io:format("Starting node ~s...~n", [Node]), case rpc:call(Node, os, getpid, []) of {badrpc, _} -> - Port = run_cmd(script_filename()), + Port = run_rabbitmq_server(), Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port), Pid = case rpc:call(Node, os, getpid, []) of {badrpc, _} -> throw(cannot_get_pid); @@ -217,8 +217,22 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> end end. -run_cmd(FullPath) -> - erlang:open_port({spawn, FullPath}, [nouse_stdio]). +run_rabbitmq_server() -> + with_os([{unix, fun run_rabbitmq_server_unix/0}, + {win32, fun run_rabbitmq_server_win32/0}]). + +run_rabbitmq_server_unix() -> + FullPath = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server", + erlang:open_port({spawn_executable, FullPath}, + [{arg0, FullPath}, {args, ["-noinput"]}, nouse_stdio]). + +run_rabbitmq_server_win32() -> + Cmd = filename:nativename(os:find_executable("cmd")), + CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") + ++ "\\rabbitmq-server.bat\" -noinput", + erlang:open_port({spawn_executable, Cmd}, + [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]}, + nouse_stdio, hide]). is_rabbit_running(Node, RpcTimeout) -> case rpc:call(Node, rabbit, status, [], RpcTimeout) of @@ -236,13 +250,6 @@ with_os(Handlers) -> Handler -> Handler() end. -script_filename() -> - ScriptHome = getenv("RABBITMQ_SCRIPT_HOME"), - ScriptName = with_os( - [{unix , fun () -> "rabbitmq-server" end}, - {win32, fun () -> "rabbitmq-server.bat" end}]), - ScriptHome ++ "/" ++ ScriptName ++ " -noinput". - pids_file() -> getenv("RABBITMQ_PIDS_FILE"). write_pids_file(Pids) -> diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 02bd04991e..0cdb41422f 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -261,14 +261,29 @@ get_total_memory({unix,freebsd}) -> PageCount * PageSize; get_total_memory({win32,_OSname}) -> - %% Due to the Erlang print format bug, on Windows boxes the memory size is - %% broken. For example Windows 7 64 bit with 4Gigs of RAM we get negative - %% memory size: + %% Due to the Erlang print format bug, on Windows boxes the memory + %% size is broken. For example Windows 7 64 bit with 4Gigs of RAM + %% we get negative memory size: %% > os_mon_sysinfo:get_mem_info(). %% ["76 -1658880 1016913920 -1 -1021628416 2147352576 2134794240\n"] - %% Due to this bug, we don't actually know anything. Even if the number is - %% postive we can't be sure if it's correct. - unknown; + %% Due to this bug, we don't actually know anything. Even if the + %% number is postive we can't be sure if it's correct. This only + %% affects us on os_mon versions prior to 2.2.1. + case application:get_key(os_mon, vsn) of + undefined -> + unknown; + {ok, Version} -> + case rabbit_misc:version_compare(Version, "2.2.1", lt) of + true -> %% os_mon is < 2.2.1, so we know nothing + unknown; + false -> + [Result|_] = os_mon_sysinfo:get_mem_info(), + {ok, [_MemLoad, TotPhys, _AvailPhys, + _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} = + io_lib:fread("~d~d~d~d~d~d~d", Result), + TotPhys + end + end; get_total_memory({unix, linux}) -> File = read_proc_file("/proc/meminfo"), |
