summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile4
-rw-r--r--docs/rabbitmqctl.1.xml23
-rw-r--r--src/file_handle_cache.erl2
-rw-r--r--src/gen_server2.erl3
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_channel.erl82
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_limiter.erl19
-rw-r--r--src/rabbit_net.erl28
-rw-r--r--src/rabbit_networking.erl23
-rw-r--r--src/rabbit_plugin_activator.erl2
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl43
-rw-r--r--src/rabbit_reader.erl15
-rw-r--r--src/rabbit_ssl.erl173
-rw-r--r--src/rabbit_tests.erl1
-rw-r--r--src/rabbit_variable_queue.erl67
-rw-r--r--src/vm_memory_monitor.erl17
19 files changed, 401 insertions, 130 deletions
diff --git a/Makefile b/Makefile
index 38ec8196de..ee0c083863 100644
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,3 @@
-
TMPDIR ?= /tmp
RABBITMQ_NODENAME ?= rabbit
@@ -92,12 +91,13 @@ endif
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
+ rm -f $@
escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
escript generate_app $(EBIN_DIR) $@ < $<
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(DEPS_FILE)
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl | $(DEPS_FILE)
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8)
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 5179eb253c..73882861fc 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -967,6 +967,21 @@
<listitem><para>Peer port.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>peer_cert_subject</term>
+ <listitem><para>The subject of the peer's SSL
+ certificate, in RFC4514 form.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>peer_cert_issuer</term>
+ <listitem><para>The issuer of the peer's SSL
+ certificate, in RFC4514 form.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>peer_cert_validity</term>
+ <listitem><para>The period for which the peer's SSL
+ certificate is valid.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
<command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
@@ -1101,6 +1116,14 @@
<term>prefetch_count</term>
<listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>client_flow_blocked</term>
+ <listitem><para>True if the client issued a
+ <command>channel.flow{active=false}</command>
+ command, blocking the server from delivering
+ messages to the channel's consumers.
+ </para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index d2830a25ca..6a948d49b6 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -1171,7 +1171,7 @@ ulimit() ->
?FILE_HANDLES_LIMIT_WINDOWS;
{unix, _OsName} ->
%% Under Linux, Solaris and FreeBSD, ulimit is a shell
- %% builtin, not a command. In OS X, it's a command.
+ %% builtin, not a command. In OS X and AIX it's a command.
%% Fortunately, os:cmd invokes the cmd in a shell env, so
%% we're safe in all cases.
case os:cmd("ulimit -n") of
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index b0379b95d1..230d1f2aa1 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -1128,7 +1128,8 @@ function_exported_or_default(Mod, Fun, Arity, Default) ->
%%-----------------------------------------------------------------
format_status(Opt, StatusData) ->
[PDict, SysState, Parent, Debug,
- [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData,
+ #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] =
+ StatusData,
NameTag = if is_pid(Name) ->
pid_to_list(Name);
is_atom(Name) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3e677c3809..42bddc5e81 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete/3, purge/1]).
+-export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -115,6 +115,9 @@
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_exclusive/1 :: (rabbit_types:amqqueue())
+ -> rabbit_types:ok_or_error2(qlen(),
+ 'not_exclusive')).
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -359,6 +362,9 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
+delete_exclusive(#amqqueue{ pid = QPid }) ->
+ gen_server2:call(QPid, delete_exclusive, infinity).
+
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d15a6eb3a9..2c53a8e319 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -43,7 +43,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, prioritise_info/2]).
-import(queue).
-import(erlang).
@@ -593,6 +593,7 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
+ delete_exclusive -> 8;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
_ -> 0
end.
@@ -611,6 +612,10 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8;
+prioritise_info(_Msg, _State) -> 0.
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -780,6 +785,16 @@ handle_call(stat, _From, State = #q{backing_queue = BQ,
active_consumers = ActiveConsumers}) ->
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+handle_call(delete_exclusive, _From,
+ State = #q{ backing_queue_state = BQS,
+ backing_queue = BQ,
+ q = #amqqueue{exclusive_owner = Owner}
+ }) when Owner =/= none ->
+ {stop, normal, {ok, BQ:len(BQS)}, State};
+
+handle_call(delete_exclusive, _From, State) ->
+ reply({error, not_exclusive}, State);
+
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index bde11f00e0..fe36cef923 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -58,7 +58,8 @@
consumer_count,
messages_unacknowledged,
acks_uncommitted,
- prefetch_count]).
+ prefetch_count,
+ client_flow_blocked]).
-define(CREATION_EVENT_KEYS,
[pid,
@@ -314,14 +315,10 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount, State) ->
- NewState = State#ch{most_recently_declared_queue = ActualName},
- case NoWait of
- true -> {noreply, NewState};
- false -> Reply = #'queue.declare_ok'{queue = ActualName,
- message_count = MessageCount,
- consumer_count = ConsumerCount},
- {reply, Reply, NewState}
- end.
+ return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
+ #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount}).
check_resource_access(Username, Resource, Perm) ->
V = {Resource, Perm},
@@ -343,30 +340,30 @@ clear_permission_cache() ->
erase(permission_cache),
ok.
-check_configure_permitted(Resource, #ch{ username = Username}) ->
+check_configure_permitted(Resource, #ch{username = Username}) ->
check_resource_access(Username, Resource, configure).
-check_write_permitted(Resource, #ch{ username = Username}) ->
+check_write_permitted(Resource, #ch{username = Username}) ->
check_resource_access(Username, Resource, write).
-check_read_permitted(Resource, #ch{ username = Username}) ->
+check_read_permitted(Resource, #ch{username = Username}) ->
check_resource_access(Username, Resource, read).
-expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
+expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath,
- most_recently_declared_queue = MRDQ }) ->
+expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath,
+ most_recently_declared_queue = MRDQ}) ->
rabbit_misc:r(VHostPath, queue, MRDQ);
-expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) ->
+expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) ->
rabbit_misc:r(VHostPath, queue, QueueNameBin).
expand_routing_key_shortcut(<<>>, <<>>,
- #ch{ most_recently_declared_queue = <<>> }) ->
+ #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
- #ch{ most_recently_declared_queue = MRDQ }) ->
+ #ch{most_recently_declared_queue = MRDQ}) ->
MRDQ;
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
@@ -437,11 +434,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = DecodedContent,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -480,9 +477,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{ writer_pid = WriterPid,
- reader_pid = ReaderPid,
- next_tag = DeliveryTag }) ->
+ _, State = #ch{writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
@@ -512,15 +509,15 @@ handle_method(#'basic.get'{queue = QueueNameBin,
{reply, #'basic.get_empty'{}, State}
end;
-handle_method(#'basic.consume'{queue = QueueNameBin,
+handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ConsumerTag,
- no_local = _, % FIXME: implement
- no_ack = NoAck,
- exclusive = ExclusiveConsume,
- nowait = NoWait},
- _, State = #ch{ reader_pid = ReaderPid,
- limiter_pid = LimiterPid,
- consumer_mapping = ConsumerMapping }) ->
+ no_local = _, % FIXME: implement
+ no_ack = NoAck,
+ exclusive = ExclusiveConsume,
+ nowait = NoWait},
+ _, State = #ch{reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
+ consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -615,7 +612,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover_async'{requeue = true},
- _, State = #ch{ unacked_message_q = UAMQ }) ->
+ _, State = #ch{unacked_message_q = UAMQ}) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
%% The Qpid python test suite incorrectly assumes
@@ -630,8 +627,8 @@ handle_method(#'basic.recover_async'{requeue = true},
{noreply, State#ch{unacked_message_q = queue:new()}};
handle_method(#'basic.recover_async'{requeue = false},
- _, State = #ch{ writer_pid = WriterPid,
- unacked_message_q = UAMQ }) ->
+ _, State = #ch{writer_pid = WriterPid,
+ unacked_message_q = UAMQ}) ->
ok = rabbit_misc:queue_fold(
fun ({_DeliveryTag, none, _Msg}, ok) ->
%% Was sent as a basic.get_ok. Don't redeliver
@@ -664,7 +661,7 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
- _, State = #ch{ unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -681,7 +678,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
internal = false,
nowait = NoWait,
arguments = Args},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ _, State = #ch{virtual_host = VHostPath}) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
@@ -709,7 +706,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true,
nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ _, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
_ = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -718,7 +715,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused,
nowait = NoWait},
- _, State = #ch { virtual_host = VHostPath }) ->
+ _, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
@@ -872,6 +869,7 @@ handle_method(#'channel.flow'{active = true}, _,
end,
{reply, #'channel.flow_ok'{active = true},
State#ch{limiter_pid = LimiterPid1}};
+
handle_method(#'channel.flow'{active = false}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
@@ -1127,6 +1125,8 @@ i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) ->
queue:len(UAQ);
i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
+i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
+ rabbit_limiter:is_blocked(LimiterPid);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index a3b6f369e3..57efe7cc51 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -347,6 +347,8 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item([C|_] = Value) when is_number(C), C >= 32, C =< 255 ->
+ Value;
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index c323d7cef0..be1dcad1a6 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -37,7 +37,7 @@
handle_info/2, prioritise_call/3]).
-export([start_link/2]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1]).
+-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
%%----------------------------------------------------------------------------
@@ -55,6 +55,7 @@
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
-spec(block/1 :: (maybe_pid()) -> 'ok').
-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
+-spec(is_blocked/1 :: (maybe_pid()) -> boolean()).
-endif.
@@ -119,6 +120,11 @@ unblock(undefined) ->
unblock(LimiterPid) ->
gen_server2:call(LimiterPid, unblock, infinity).
+is_blocked(undefined) ->
+ false;
+is_blocked(LimiterPid) ->
+ gen_server2:call(LimiterPid, is_blocked, infinity).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -157,7 +163,10 @@ handle_call(unblock, _From, State) ->
case maybe_notify(State, State#lim{blocked = false}) of
{cont, State1} -> {reply, ok, State1};
{stop, State1} -> {stop, normal, stopped, State1}
- end.
+ end;
+
+handle_call(is_blocked, _From, State) ->
+ {reply, blocked(State), State}.
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
@@ -186,8 +195,8 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) orelse is_blocked(OldState)) andalso
- not (limit_reached(NewState) orelse is_blocked(NewState)) of
+ case (limit_reached(OldState) orelse blocked(OldState)) andalso
+ not (limit_reached(NewState) orelse blocked(NewState)) of
true -> NewState1 = notify_queues(NewState),
{case NewState1#lim.prefetch_count of
0 -> stop;
@@ -199,7 +208,7 @@ maybe_notify(OldState, NewState) ->
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
-is_blocked(#lim{blocked = Blocked}) -> Blocked.
+blocked(#lim{blocked = Blocked}) -> Blocked.
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 2286896b7b..53d0d5cbf3 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-export([async_recv/3, close/1, controlling_process/2,
- getstat/2, peername/1, port_command/2,
+ getstat/2, peername/1, peercert/1, port_command/2,
send/2, sockname/1]).
%%---------------------------------------------------------------------------
@@ -45,28 +45,29 @@
-type(stat_option() ::
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
--type(error() :: rabbit_types:error(any())).
+-type(ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any())).
+-type(ok_or_any_error() :: rabbit_types:ok_or_error(any())).
-type(socket() :: port() | #ssl_socket{}).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
--spec(close/1 :: (socket()) -> rabbit_types:ok_or_error(any())).
--spec(controlling_process/2 ::
- (socket(), pid()) -> rabbit_types:ok_or_error(any())).
+-spec(close/1 :: (socket()) -> ok_or_any_error()).
+-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
-spec(send/2 ::
- (socket(), binary() | iolist()) -> rabbit_types:ok_or_error(any())).
+ (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(peername/1 ::
(socket())
- -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) |
- error()).
+ -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
+-spec(peercert/1 ::
+ (socket())
+ -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
-spec(sockname/1 ::
(socket())
- -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) |
- error()).
+ -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
-spec(getstat/2 ::
(socket(), [stat_option()])
- -> rabbit_types:ok([{stat_option(), integer()}]) | error()).
+ -> ok_val_or_error([{stat_option(), integer()}])).
-endif.
@@ -108,6 +109,11 @@ peername(Sock) when ?IS_SSL(Sock) ->
peername(Sock) when is_port(Sock) ->
inet:peername(Sock).
+peercert(Sock) when ?IS_SSL(Sock) ->
+ ssl:peercert(Sock#ssl_socket.ssl);
+peercert(Sock) when is_port(Sock) ->
+ nossl.
+
port_command(Sock, Data) when ?IS_SSL(Sock) ->
case ssl:send(Sock#ssl_socket.ssl, Data) of
ok -> self() ! {inet_reply, Sock, ok},
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 6dbd54d2bc..db5c71f62a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -46,8 +46,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--include_lib("ssl/src/ssl_record.hrl").
-
-define(RABBIT_TCP_OPTS, [
binary,
@@ -120,26 +118,7 @@ boot_ssl() ->
end}
| SslOptsConfig]
end,
- % In R13B04 and R14A (at least), rc4 is incorrectly implemented.
- CipherSuites = proplists:get_value(ciphers,
- SslOpts,
- ssl:cipher_suites()),
- FilteredCipherSuites =
- [C || C <- CipherSuites,
- begin
- SuiteCode =
- if is_tuple(C) -> ssl_cipher:suite(C);
- is_list(C) -> ssl_cipher:openssl_suite(C)
- end,
- SP = ssl_cipher:security_parameters(
- SuiteCode,
- #security_parameters{}),
- SP#security_parameters.bulk_cipher_algorithm =/= ?RC4
- end],
- SslOpts1 = [{ciphers, FilteredCipherSuites}
- | [{K, V} || {K, V} <- SslOpts, K =/= ciphers]],
- [start_ssl_listener(Host, Port, SslOpts1)
- || {Host, Port} <- SslListeners],
+ [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
end.
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index b23776cd74..c38ef8d2f8 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -90,7 +90,7 @@ start() ->
%% Compile the script
ScriptFile = RootName ++ ".script",
- case systools:make_script(RootName, [local, silent]) of
+ case systools:make_script(RootName, [local, silent, exref]) of
{ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 0a49b94d09..0b8efc8f83 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -81,7 +81,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
fun () -> ok end,
fun () ->
erlang:demonitor(MonitorRef),
- rabbit_amqqueue:delete(Q, false, false)
+ rabbit_amqqueue:delete_exclusive(Q)
end)
|| {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State}.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index d6b8bb2889..0b98290ccd 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -159,7 +159,9 @@
-define(PUB, {_, _}). %% {Guid, IsPersistent}
--define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]).
+-define(READ_MODE, [binary, raw, read]).
+-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
+-define(WRITE_MODE, [write | ?READ_MODE]).
%%----------------------------------------------------------------------------
@@ -220,8 +222,13 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) ->
- State = #qistate { dir = Dir } = blank_state(Name, not Recover),
+init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name),
+ false = filelib:is_file(Dir), %% is_file == is file or dir
+ {0, [], State};
+
+init(Name, true, MsgStoreRecovered, ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name),
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
@@ -356,15 +363,8 @@ recover(DurableQueues) ->
%% startup and shutdown
%%----------------------------------------------------------------------------
-blank_state(QueueName, EnsureFresh) ->
- StrName = queue_name_to_dir_name(QueueName),
- Dir = filename:join(queues_dir(), StrName),
- ok = case EnsureFresh of
- true -> false = filelib:is_file(Dir), %% is_file == is file or dir
- ok;
- false -> ok
- end,
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+blank_state(QueueName) ->
+ Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)),
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -373,17 +373,21 @@ blank_state(QueueName, EnsureFresh) ->
dirty_count = 0,
max_journal_entries = MaxJournal }.
+clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
+
detect_clean_shutdown(Dir) ->
- case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ case file:delete(clean_file_name(Dir)) of
ok -> true;
{error, enoent} -> false
end.
read_shutdown_terms(Dir) ->
- rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)).
+ rabbit_misc:read_term_file(clean_file_name(Dir)).
store_clean_shutdown(Terms, Dir) ->
- rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
+ CleanFileName = clean_file_name(Dir),
+ ok = filelib:ensure_dir(CleanFileName),
+ rabbit_misc:write_term_file(CleanFileName, Terms).
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -500,7 +504,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName, false)),
+ recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) ->
gatherer:in(Gatherer, {Guid, 1});
@@ -578,7 +582,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
+ _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
ok = file_handle_cache:close(Hdl),
@@ -588,7 +592,8 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
+ ok = filelib:ensure_dir(Path),
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
@@ -785,7 +790,7 @@ segment_entries_foldr(Fun, Init,
load_segment(KeepAcked, #segment { path = Path }) ->
case filelib:is_file(Path) of
false -> {array_new(), 0};
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
+ true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
ok = file_handle_cache:close(Hdl),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 745e008349..ff0fb8f778 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -66,6 +66,8 @@
send_pend, state, channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port,
+ peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -824,6 +826,12 @@ i(peer_address, #v1{sock = Sock}) ->
i(peer_port, #v1{sock = Sock}) ->
{ok, {_, P}} = rabbit_net:peername(Sock),
P;
+i(peer_cert_issuer, #v1{sock = Sock}) ->
+ cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
+i(peer_cert_subject, #v1{sock = Sock}) ->
+ cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock);
+i(peer_cert_validity, #v1{sock = Sock}) ->
+ cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock);
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
@@ -858,6 +866,13 @@ i(client_properties, #v1{connection = #connection{
i(Item, #v1{}) ->
throw({bad_argument, Item}).
+cert_info(F, Sock) ->
+ case rabbit_net:peercert(Sock) of
+ nossl -> '';
+ {error, no_peercert} -> '';
+ {ok, Cert} -> F(Cert)
+ end.
+
%%--------------------------------------------------------------------------
send_to_new_channel(Channel, AnalyzedFrame, State) ->
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
new file mode 100644
index 0000000000..be451af631
--- /dev/null
+++ b/src/rabbit_ssl.erl
@@ -0,0 +1,173 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_ssl).
+
+-include("rabbit.hrl").
+
+-include_lib("public_key/include/public_key.hrl").
+-include_lib("ssl/src/ssl_int.hrl").
+
+-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
+
+%%--------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([certificate/0]).
+
+-type(certificate() :: binary()).
+
+-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject/1 :: (certificate()) -> string()).
+-spec(peer_cert_validity/1 :: (certificate()) -> string()).
+
+-endif.
+
+%%--------------------------------------------------------------------------
+%% High-level functions used by reader
+%%--------------------------------------------------------------------------
+
+%% Return a string describing the certificate's issuer.
+peer_cert_issuer(Cert) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ issuer = Issuer }}) ->
+ format_rdn_sequence(Issuer)
+ end, Cert).
+
+%% Return a string describing the certificate's subject, as per RFC4514.
+peer_cert_subject(Cert) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ subject = Subject }}) ->
+ format_rdn_sequence(Subject)
+ end, Cert).
+
+%% Return a string describing the certificate's validity.
+peer_cert_validity(Cert) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ validity = {'Validity', Start, End} }}) ->
+ lists:flatten(
+ io_lib:format("~s - ~s", [format_asn1_value(Start),
+ format_asn1_value(End)]))
+ end, Cert).
+
+%%--------------------------------------------------------------------------
+
+cert_info(F, Cert) ->
+ F(case public_key:pkix_decode_cert(Cert, otp) of
+ {ok, DecCert} -> DecCert;
+ DecCert -> DecCert
+ end).
+
+%%--------------------------------------------------------------------------
+%% Formatting functions
+%%--------------------------------------------------------------------------
+
+%% Format and rdnSequence as a RFC4514 subject string.
+format_rdn_sequence({rdnSequence, Seq}) ->
+ lists:flatten(
+ rabbit_misc:intersperse(
+ ",", lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]))).
+
+%% Format an RDN set.
+format_complex_rdn(RDNs) ->
+ lists:flatten(
+ rabbit_misc:intersperse("+", [format_rdn(RDN) || RDN <- RDNs])).
+
+%% Format an RDN. If the type name is unknown, use the dotted decimal
+%% representation. See RFC4514, section 2.3.
+format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
+ FV = escape_rdn_value(format_asn1_value(V)),
+ Fmts = [{?'id-at-surname' , "SN"},
+ {?'id-at-givenName' , "GIVENNAME"},
+ {?'id-at-initials' , "INITIALS"},
+ {?'id-at-generationQualifier' , "GENERATIONQUALIFIER"},
+ {?'id-at-commonName' , "CN"},
+ {?'id-at-localityName' , "L"},
+ {?'id-at-stateOrProvinceName' , "ST"},
+ {?'id-at-organizationName' , "O"},
+ {?'id-at-organizationalUnitName' , "OU"},
+ {?'id-at-title' , "TITLE"},
+ {?'id-at-countryName' , "C"},
+ {?'id-at-serialNumber' , "SERIALNUMBER"},
+ {?'id-at-pseudonym' , "PSEUDONYM"},
+ {?'id-domainComponent' , "DC"},
+ {?'id-emailAddress' , "EMAILADDRESS"},
+ {?'street-address' , "STREET"}],
+ case proplists:lookup(T, Fmts) of
+ {_, Fmt} ->
+ io_lib:format(Fmt ++ "=~s", [FV]);
+ none when is_tuple(T) ->
+ TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)],
+ io_lib:format("~s:~s", [rabbit_misc:intersperse(".", TypeL), FV]);
+ none ->
+ io_lib:format("~p:~s", [T, FV])
+ end.
+
+%% Escape a string as per RFC4514.
+escape_rdn_value(V) ->
+ escape_rdn_value(V, start).
+
+escape_rdn_value([], _) ->
+ [];
+escape_rdn_value([C | S], start) when C =:= $ ; C =:= $# ->
+ [$\\, C | escape_rdn_value(S, middle)];
+escape_rdn_value(S, start) ->
+ escape_rdn_value(S, middle);
+escape_rdn_value([$ ], middle) ->
+ [$\\, $ ];
+escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;;
+ C =:= $<; C =:= $>; C =:= $\\ ->
+ [$\\, C | escape_rdn_value(S, middle)];
+escape_rdn_value([C | S], middle) when C < 32 ; C =:= 127 ->
+ %% only U+0000 needs escaping, but for display purposes it's handy
+ %% to escape all non-printable chars
+ lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++
+ escape_rdn_value(S, middle);
+escape_rdn_value([C | S], middle) ->
+ [C | escape_rdn_value(S, middle)].
+
+%% Get the string representation of an OTPCertificate field.
+format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
+ ST =:= universalString; ST =:= utf8String;
+ ST =:= bmpString ->
+ if is_binary(S) -> binary_to_list(S);
+ true -> S
+ end;
+format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
+ Min1, Min2, S1, S2, $Z]}) ->
+ io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
+ [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
+format_asn1_value(V) ->
+ io_lib:format("~p", [V]).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a72656b73b..b36ee0be95 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -75,7 +75,6 @@ all_tests() ->
passed = maybe_run_cluster_dependent_tests(),
passed.
-
maybe_run_cluster_dependent_tests() ->
SecondaryNode = rabbit_misc:makenode("hare"),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 30d3a8aec1..cbc71bcc5c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -472,23 +472,30 @@ delete_and_terminate(State) ->
a(State2 #vqstate { index_state = IndexState1,
msg_store_clients = undefined }).
-purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
+purge(State = #vqstate { q4 = Q4,
+ index_state = IndexState,
+ len = Len,
+ persistent_count = PCount }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4,
- IndexState),
- State1 = #vqstate { q1 = Q1, index_state = IndexState2 } =
- purge_betas_and_deltas(State #vqstate { q4 = queue:new(),
+ {LensByStore, IndexState1} = remove_queue_entries(
+ fun rabbit_misc:queue_fold/3, Q4,
+ orddict:new(), IndexState),
+ {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} =
+ purge_betas_and_deltas(LensByStore,
+ State #vqstate { q4 = queue:new(),
index_state = IndexState1 }),
- IndexState3 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1,
- IndexState2),
+ {LensByStore2, IndexState3} = remove_queue_entries(
+ fun rabbit_misc:queue_fold/3, Q1,
+ LensByStore1, IndexState2),
+ PCount1 = PCount - find_persistent_count(LensByStore2),
{Len, a(State1 #vqstate { q1 = queue:new(),
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
ram_index_count = 0,
- persistent_count = 0 })}.
+ persistent_count = PCount1 })}.
publish(Msg, State) ->
{_SeqId, State1} = publish(Msg, false, false, State),
@@ -957,26 +964,30 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
reduce_memory_use(
State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
-purge_betas_and_deltas(State = #vqstate { q3 = Q3,
+purge_betas_and_deltas(LensByStore,
+ State = #vqstate { q3 = Q3,
index_state = IndexState }) ->
case bpqueue:is_empty(Q3) of
- true -> State;
- false -> IndexState1 = remove_queue_entries(fun beta_fold/3, Q3,
- IndexState),
- purge_betas_and_deltas(
- maybe_deltas_to_betas(
- State #vqstate { q3 = bpqueue:new(),
- index_state = IndexState1 }))
+ true -> {LensByStore, State};
+ false -> {LensByStore1, IndexState1} = remove_queue_entries(
+ fun beta_fold/3, Q3,
+ LensByStore, IndexState),
+ purge_betas_and_deltas(LensByStore1,
+ maybe_deltas_to_betas(
+ State #vqstate {
+ q3 = bpqueue:new(),
+ index_state = IndexState1 }))
end.
-remove_queue_entries(Fold, Q, IndexState) ->
+remove_queue_entries(Fold, Q, LensByStore, IndexState) ->
{GuidsByStore, Delivers, Acks} =
Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
- rabbit_queue_index:ack(Acks,
- rabbit_queue_index:deliver(Delivers, IndexState)).
+ {sum_guids_by_store_to_len(LensByStore, GuidsByStore),
+ rabbit_queue_index:ack(Acks,
+ rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
#msg_status { guid = Guid, seq_id = SeqId,
@@ -991,6 +1002,12 @@ remove_queue_entries1(
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
+sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
+ orddict:fold(
+ fun (MsgStore, Guids, LensByStore1) ->
+ orddict:update_counter(MsgStore, length(Guids), LensByStore1)
+ end, LensByStore, GuidsByStore).
+
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1117,10 +1134,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
- PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
- error -> 0;
- {ok, Guids} -> length(Guids)
- end,
+ PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
+ orddict:new(), GuidsByStore)),
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1 }.
@@ -1132,6 +1147,12 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+find_persistent_count(LensByStore) ->
+ case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of
+ error -> 0;
+ {ok, Len} -> Len
+ end.
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index e658f005a3..067fa9f2d8 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -296,6 +296,12 @@ get_total_memory({unix, sunos}) ->
Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)),
dict:fetch('Memory size', Dict);
+get_total_memory({unix, aix}) ->
+ File = cmd("/usr/bin/vmstat -v"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line_aix/1, Lines)),
+ dict:fetch('memory pages', Dict) * 4096;
+
get_total_memory(_OsType) ->
unknown.
@@ -341,6 +347,17 @@ parse_line_sunos(Line) ->
[Name] -> {list_to_atom(Name), none}
end.
+%% Lines look like " 12345 memory pages"
+%% or " 80.1 maxpin percentage"
+parse_line_aix(Line) ->
+ [Value | NameWords] = string:tokens(Line, " "),
+ Name = string:join(NameWords, " "),
+ {list_to_atom(Name),
+ case lists:member($., Value) of
+ true -> trunc(list_to_float(Value));
+ false -> list_to_integer(Value)
+ end}.
+
freebsd_sysctl(Def) ->
list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n").