summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <alvaro@rabbitmq.com>2014-01-03 10:32:19 +0100
committerAlvaro Videla <alvaro@rabbitmq.com>2014-01-03 10:32:19 +0100
commit0186d763494044dfeb436ae3f4ff88f85996dbc0 (patch)
treecaf797bcc053bfca78afa326d66526d7fee89e71
parentbe7748479a93e180511477eec8360dd9c6598276 (diff)
parente4d615a414a4816cf33aff1e97720052f3eb866f (diff)
downloadrabbitmq-server-git-0186d763494044dfeb436ae3f4ff88f85996dbc0.tar.gz
merge default into bug25817
-rw-r--r--src/rabbit_amqqueue_process.erl256
-rw-r--r--src/rabbit_reader.erl35
-rw-r--r--src/rabbit_writer.erl2
-rw-r--r--src/vm_memory_monitor.erl23
4 files changed, 162 insertions, 154 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7002fd367c..cb59edd9b1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -238,10 +238,11 @@ notify_decorators(Event, Props, State) when Event =:= startup;
notify_decorators(Event, Props, State = #q{active_consumers = ACs,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- decorator_callback(
- qname(State), notify,
- [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)},
- {is_empty, BQ:is_empty(BQS)} | Props]]).
+ P = priority_queue:highest(ACs),
+ decorator_callback(qname(State), notify,
+ [Event, [{max_active_consumer_priority, P},
+ {is_empty, BQ:is_empty(BQS)} |
+ Props]]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
@@ -326,8 +327,8 @@ terminate_shutdown(Fun, State) ->
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
notify_decorators(shutdown, [], State),
- [emit_consumer_deleted(Ch, CTag, QName)
- || {Ch, CTag, _} <- consumers(State1)],
+ [emit_consumer_deleted(Ch, CTag, QName) ||
+ {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -485,12 +486,10 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
- State = #q{active_consumers = ActiveConsumers,
- consumer_use = CUInfo}) ->
+ State = #q{active_consumers = ActiveConsumers}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false,
- State#q{consumer_use = update_consumer_use(CUInfo, inactive)}};
+ {false, update_consumer_use(State, inactive)};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -537,18 +536,17 @@ deliver_msg_to_consumer0(DeliverFun,
unsent_message_count = Count + 1}),
{Stop, State1}.
-deliver_from_queue_deliver(AckRequired, State) ->
- {Result, State1} = fetch(AckRequired, State),
- {Result, is_empty(State1), State1}.
+update_consumer_use(State = #q{consumer_use = CUInfo}, Use) ->
+ State#q{consumer_use = update_consumer_use1(CUInfo, Use)}.
-update_consumer_use({inactive, _, _, _} = CUInfo, inactive) ->
+update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
-update_consumer_use({active, _, _} = CUInfo, active) ->
+update_consumer_use1({active, _, _} = CUInfo, active) ->
CUInfo;
-update_consumer_use({active, Since, Avg}, inactive) ->
+update_consumer_use1({active, Since, Avg}, inactive) ->
Now = now_micros(),
{inactive, Now, Now - Since, Avg};
-update_consumer_use({inactive, Since, Active, Avg}, active) ->
+update_consumer_use1({inactive, Since, Active, Avg}, active) ->
Now = now_micros(),
{active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
@@ -607,10 +605,12 @@ discard(#delivery{sender = SenderPid,
State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
- {_IsEmpty1, State1} = deliver_msgs_to_consumers(
- fun deliver_from_queue_deliver/2,
- is_empty(State), State),
- State1.
+ {_Active, State3} = deliver_msgs_to_consumers(
+ fun(AckRequired, State1) ->
+ {Result, State2} = fetch(AckRequired, State1),
+ {Result, is_empty(State2), State2}
+ end, is_empty(State), State),
+ State3.
add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) ->
Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
@@ -622,21 +622,21 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) ->
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- case BQ:is_duplicate(Message, BQS) of
- {false, BQS1} ->
- deliver_msgs_to_consumers(
- fun (true, State1 = #q{backing_queue_state = BQS2}) ->
- true = BQ:is_empty(BQS2),
- {AckTag, BQS3} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS2),
- {{Message, Delivered, AckTag},
- true, State1#q{backing_queue_state = BQS3}};
- (false, State1) ->
- {{Message, Delivered, undefined},
- true, discard(Delivery, State1)}
- end, false, State#q{backing_queue_state = BQS1});
- {true, BQS1} ->
- {true, State#q{backing_queue_state = BQS1}}
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case IsDuplicate of
+ false -> deliver_msgs_to_consumers(
+ fun (true, State2 = #q{backing_queue_state = BQS2}) ->
+ true = BQ:is_empty(BQS2),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ Message, Props, SenderPid, BQS2),
+ {{Message, Delivered, AckTag},
+ true, State2#q{backing_queue_state = BQS3}};
+ (false, State2) ->
+ {{Message, Delivered, undefined},
+ true, discard(Delivery, State2)}
+ end, false, State1);
+ true -> {true, State1}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
@@ -652,7 +652,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
{Dropped, State3 = #q{backing_queue_state = BQS2}} =
- maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
@@ -738,7 +738,7 @@ possibly_unblock(State, ChPid, Update) ->
end
end.
-unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
+unblock(State, C = #cr{limiter = Limiter}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -750,49 +750,47 @@ unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
BlockedQ = priority_queue:from_list(Blocked),
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- State1 = State#q{consumer_use =
- update_consumer_use(CUInfo, active)},
- AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ),
- State2 = State1#q{active_consumers = AC1},
+ AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
+ State1 = update_consumer_use(State#q{active_consumers = AC1},
+ active),
[notify_decorators(
- consumer_unblocked, [{consumer_tag, CTag}], State2) ||
+ consumer_unblocked, [{consumer_tag, CTag}], State1) ||
{_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
- run_message_queue(State2)
+ run_message_queue(State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+handle_ch_down(DownPid, State = #q{active_consumers = AC,
+ exclusive_consumer = Holder,
senders = Senders}) ->
- Senders1 = case pmon:is_monitored(DownPid, Senders) of
- false -> Senders;
- true -> credit_flow:peer_down(DownPid),
- pmon:demonitor(DownPid, Senders)
- end,
+ State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
+ false -> Senders;
+ true -> credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
+ end},
case lookup_ch(DownPid) of
not_found ->
- {ok, State#q{senders = Senders1}};
+ {ok, State1};
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
QName = qname(State),
- _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
+ AC1 = remove_consumers(ChPid, AC, QName),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
- State1 = State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- active_consumers = remove_consumers(
- ChPid, State#q.active_consumers,
- QName),
- senders = Senders1},
- case should_auto_delete(State1) of
- true -> {stop, State1};
+ Holder1 = case Holder of
+ {DownPid, _} -> none;
+ Other -> Other
+ end,
+ State2 = State1#q{active_consumers = AC1,
+ exclusive_consumer = Holder1},
+ case should_auto_delete(State2) of
+ true -> {stop, State2};
false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
- ensure_expiry_timer(State1))}
+ ensure_expiry_timer(State2))}
end
end.
@@ -1223,64 +1221,66 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag}, State2} ->
- State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- case AckRequired of
- true -> C = #cr{acktags = ChAckTags} =
- ch_record(ChPid, LimiterPid),
- ChAckTags1 = queue:in(AckTag, ChAckTags),
- update_ch_record(C#cr{acktags = ChAckTags1}),
- State2;
- false -> State2
- end,
+ {{Message, IsDelivered, AckTag},
+ #q{backing_queue = BQ, backing_queue_state = BQS} = State2} ->
+ case AckRequired of
+ true -> C = #cr{acktags = ChAckTags} =
+ ch_record(ChPid, LimiterPid),
+ ChAckTags1 = queue:in(AckTag, ChAckTags),
+ update_ch_record(C#cr{acktags = ChAckTags1});
+ false -> ok
+ end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, BQ:len(BQS), Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State2)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
- _From, State = #q{exclusive_consumer = Holder}) ->
+ _From, State = #q{active_consumers = AC,
+ exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = Count,
- limiter = Limiter} = ch_record(ChPid, LimiterPid),
- Limiter1 = case LimiterActive of
- true -> rabbit_limiter:activate(Limiter);
- false -> Limiter
- end,
- Limiter2 = case CreditArgs of
- none -> Limiter1;
- {Crd, Drain} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag, Crd, Drain)
- end,
- C1 = update_ch_record(C#cr{consumer_count = Count + 1,
- limiter = Limiter2}),
- case is_empty(State) of
- true -> send_drained(C1);
- false -> ok
- end,
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck,
- args = OtherArgs},
- ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> Holder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), OtherArgs),
- AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers),
- State2 = State1#q{active_consumers = AC1},
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State2),
- reply(ok, run_message_queue(State2))
+ in_use -> reply({error, exclusive_consume_unavailable}, State);
+ ok -> C = #cr{consumer_count = Count,
+ limiter = Limiter} =
+ ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ Limiter2 = case CreditArgs of
+ none -> Limiter1;
+ {Crd, Drain} -> rabbit_limiter:credit(
+ Limiter1, ConsumerTag,
+ Crd, Drain)
+ end,
+ C1 = update_ch_record(C#cr{consumer_count = Count + 1,
+ limiter = Limiter2}),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck,
+ args = OtherArgs},
+ AC1 = add_consumer({ChPid, Consumer}, AC),
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> Holder
+ end,
+ State1 = State#q{active_consumers = AC1,
+ has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck, qname(State1), OtherArgs),
+ notify_decorators(
+ basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ reply(ok, run_message_queue(State1))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{exclusive_consumer = Holder}) ->
+ State = #q{active_consumers = AC,
+ exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
not_found ->
@@ -1288,7 +1288,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
C = #cr{consumer_count = Count,
limiter = Limiter,
blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
+ AC1 = remove_consumer(ChPid, ConsumerTag, AC),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
Limiter1 = case Count of
1 -> rabbit_limiter:deactivate(Limiter);
@@ -1298,14 +1298,13 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
update_ch_record(C#cr{consumer_count = Count - 1,
limiter = Limiter2,
blocked_consumers = Blocked1}),
- State1 = State#q{
- exclusive_consumer = case Holder of
- {ChPid, ConsumerTag} -> none;
- _ -> Holder
- end,
- active_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.active_consumers)},
+ Holder1 = case Holder of
+ {ChPid, ConsumerTag} -> none;
+ _ -> Holder
+ end,
+ State1 = State#q{active_consumers = AC1,
+ exclusive_consumer = Holder1},
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
notify_decorators(
basic_cancel, [{consumer_tag, ConsumerTag}], State1),
case should_auto_delete(State1) of
@@ -1372,11 +1371,12 @@ handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
QName = qname(State),
+ AllConsumers = consumers(State),
case Exclusive of
none -> [emit_consumer_created(
Ch, CTag, false, AckRequired, QName, Args) ||
- {Ch, CTag, AckRequired, Args} <- consumers(State)];
- {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State),
+ {Ch, CTag, AckRequired, Args} <- AllConsumers];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers,
emit_consumer_created(
Ch, CTag, true, AckRequired, QName, Args)
end,
@@ -1451,21 +1451,21 @@ handle_cast({set_maximum_since_use, Age}, State) ->
noreply(State);
handle_cast(start_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS}) ->
%% lookup again to get policy for init_with_existing_bq
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
true = BQ =/= rabbit_mirror_queue_master, %% assertion
BQ1 = rabbit_mirror_queue_master,
BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+ backing_queue_state = BQS1});
handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS}) ->
BQ = rabbit_mirror_queue_master, %% assertion
{BQ1, BQS1} = BQ:stop_mirroring(BQS),
noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+ backing_queue_state = BQS1});
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{backing_queue = BQ,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index da22e09266..0aa44d9f47 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -736,6 +736,14 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
Type, Channel, Payload, State)
end;
+handle_input(handshake, <<"AMQP", A, B, C, D>>, State) ->
+ handshake({A, B, C, D}, State);
+handle_input(handshake, Other, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_header, Other});
+
+handle_input(Callback, Data, _State) ->
+ throw({bad_input, Callback, Data}).
+
%% The two rules pertaining to version negotiation:
%%
%% * If the server cannot support the protocol specified in the
@@ -744,37 +752,31 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
%%
%% * The server MUST provide a protocol version that is lower than or
%% equal to that requested by the client in the protocol header.
-handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
+handshake({0, 0, 9, 1}, State) ->
start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
%% This is the protocol header for 0-9, which we can safely treat as
%% though it were 0-9-1.
-handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
+handshake({1, 1, 0, 9}, State) ->
start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
%% This is what most clients send for 0-8. The 0-8 spec, confusingly,
%% defines the version as 8-0.
-handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
+handshake({1, 1, 8, 0}, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
%% The 0-8 spec as on the AMQP web site actually has this as the
%% protocol header; some libraries e.g., py-amqplib, send it when they
%% want 0-8.
-handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
+handshake({1, 1, 9, 1}, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
-%% ... and finally, the 1.0 spec is crystal clear! Note that the
-handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) ->
+%% ... and finally, the 1.0 spec is crystal clear!
+handshake({Id, 1, 0, 0}, State) ->
become_1_0(Id, State);
-handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, {A, B, C, D}});
-
-handle_input(handshake, Other, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_header, Other});
-
-handle_input(Callback, Data, _State) ->
- throw({bad_input, Callback, Data}).
+handshake(Vsn, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, Vsn}).
%% Offer a protocol version to the client. Connection.start only
%% includes a major and minor version number, Luckily 0-9 and 0-9-1
@@ -818,7 +820,10 @@ handle_method0(MethodName, FieldsBin,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
- catch exit:#amqp_error{method = none} = Reason ->
+ catch throw:{writer_inet_error, closed} ->
+ maybe_emit_stats(State),
+ throw(connection_closed_abruptly);
+ exit:#amqp_error{method = none} = Reason ->
handle_exception(State, 0, Reason#amqp_error{method = MethodName});
Type:Reason ->
Stack = erlang:get_stacktrace(),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 34dd3d3b35..92d48e633f 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -272,7 +272,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
[MethodFrame | ContentFrames].
tcp_send(Sock, Data) ->
- rabbit_misc:throw_on_error(inet_error,
+ rabbit_misc:throw_on_error(writer_inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 369ec65596..fc4353dcb4 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -173,16 +173,19 @@ set_mem_limits(State, MemFraction) ->
?MEMORY_SIZE_FOR_UNKNOWN_OS;
M -> M
end,
- UsableMemory = case get_vm_limit() of
- Limit when Limit < TotalMemory ->
- error_logger:warning_msg(
- "Only ~pMB of ~pMB memory usable due to "
- "limited address space.~n",
- [trunc(V/?ONE_MB) || V <- [Limit, TotalMemory]]),
- Limit;
- _ ->
- TotalMemory
- end,
+ UsableMemory =
+ case get_vm_limit() of
+ Limit when Limit < TotalMemory ->
+ error_logger:warning_msg(
+ "Only ~pMB of ~pMB memory usable due to "
+ "limited address space.~n"
+ "Crashes due to memory exhaustion are possible - see~n"
+ "http://www.rabbitmq.com/memory.html#address-space~n",
+ [trunc(V/?ONE_MB) || V <- [Limit, TotalMemory]]),
+ Limit;
+ _ ->
+ TotalMemory
+ end,
MemLim = trunc(MemFraction * UsableMemory),
error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n",
[trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]),