summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-23 14:24:24 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-23 14:24:24 +0100
commit8f477b067b49152a6e68f2197822002742bf6164 (patch)
treec4be0f5415a1b191983fafb25668ddb14d2cbeef
parentafb63bcd59ea0bd7ba82316a42aba136d14a9963 (diff)
downloadrabbitmq-server-git-8f477b067b49152a6e68f2197822002742bf6164.tar.gz
only reduce memory size when messages are acked, not when they're delivered.
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_mixed_queue.erl106
-rw-r--r--src/rabbit_queue_mode_manager.erl3
3 files changed, 67 insertions, 57 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5e607a4636..f15a58cd07 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -112,7 +112,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
next_msg_id = 1,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
- memory_report_counter = ?MEMORY_REPORT_INTERVAL,
+ memory_report_counter = 0,
old_memory_report = {1, now()}
}, ?HIBERNATE_AFTER}.
@@ -263,7 +263,7 @@ deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
AutoAcks1 =
case AckRequired of
true -> AutoAcks;
- false -> [AckTag | AutoAcks]
+ false -> [{Msg, AckTag} | AutoAcks]
end,
{{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
State #q { mixed_state = MS1 }}.
@@ -331,8 +331,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
-1 < Len.
deliver_or_requeue_msgs_deliver(
- false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
- {{Msg, true, noack}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State};
+ false, {Len, AcksAcc, [(MsgAckTag = {Msg, _}) | MsgsWithAcks]}, State) ->
+ {{Msg, true, noack}, {Len - 1, [MsgAckTag | AcksAcc], MsgsWithAcks}, State};
deliver_or_requeue_msgs_deliver(
true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
{{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
@@ -487,7 +487,7 @@ commit_transaction(Txn, State) ->
{MsgWithAcks, Remaining} =
collect_messages(PendingAcksOrdered, UAM),
store_ch_record(C#cr{unacked_messages = Remaining}),
- [ AckTag || {_Msg, AckTag} <- MsgWithAcks ]
+ MsgWithAcks
end,
{ok, MS} = rabbit_mixed_queue:tx_commit(
PendingMessagesOrdered, Acks, State #q.mixed_state),
@@ -623,7 +623,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
store_ch_record(C#cr{unacked_messages = NewUAM}),
{ok, MS1};
false ->
- rabbit_mixed_queue:ack([AckTag], MS1)
+ rabbit_mixed_queue:ack([{Msg, AckTag}], MS1)
end,
Message = {QName, self(), NextId, IsDelivered, Msg},
reply({ok, Remaining, Message},
@@ -771,9 +771,8 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
{MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
case Txn of
none ->
- Acks = [ AckTag || {_Msg, AckTag} <- MsgWithAcks ],
{ok, MS} =
- rabbit_mixed_queue:ack(Acks, State #q.mixed_state),
+ rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
noreply(State #q { mixed_state = MS });
_ ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index e700d3d220..243600034b 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -185,12 +185,13 @@ to_mixed_mode(TxnMessages, State =
{ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}.
purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
- is_durable = IsDurable }) ->
+ is_durable = IsDurable,
+ memory_size = QSize }) ->
%% iterate through the content on disk, ack anything which isn't
%% persistent, accumulate everything else that is persistent and
%% requeue it
- {Acks, Requeue, Length} =
- deliver_all_messages(Q, IsDurable, [], [], 0),
+ {Acks, Requeue, Length, ASize} =
+ deliver_all_messages(Q, IsDurable, [], [], 0, 0),
ok = if Requeue == [] -> ok;
true ->
rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
@@ -198,22 +199,22 @@ purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
ok = if Acks == [] -> ok;
true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks))
end,
- {ok, State #mqstate { length = Length }}.
+ {ok, State #mqstate { length = Length, memory_size = QSize - ASize }}.
-deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) ->
+deliver_all_messages(Q, IsDurable, Acks, Requeue, Length, ASize) ->
case rabbit_disk_queue:deliver(Q) of
- empty -> {Acks, Requeue, Length};
- {#basic_message { is_persistent = IsPersistent },
+ empty -> {Acks, Requeue, Length, ASize};
+ {Msg = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered, AckTag, _Remaining} ->
OnDisk = IsPersistent andalso IsDurable,
- {Acks1, Requeue1, Length1} =
- if OnDisk -> {Acks,
- [{AckTag, {next, IsDelivered}} | Requeue],
- Length + 1
- };
- true -> {[AckTag | Acks], Requeue, Length}
+ {Acks1, Requeue1, Length1, ASize1} =
+ if OnDisk -> { Acks,
+ [{AckTag, {next, IsDelivered}} | Requeue],
+ Length + 1, ASize };
+ true -> { [AckTag | Acks], Requeue, Length,
+ ASize + size_of_message(Msg) }
end,
- deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1)
+ deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1, ASize1)
end.
publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
@@ -237,42 +238,43 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
%% attempt_immediate_delivery).
publish_delivered(Msg =
#basic_message { guid = MsgId, is_persistent = IsPersistent},
- State = #mqstate { mode = Mode, is_durable = IsDurable,
- queue = Q, length = 0 })
+ State =
+ #mqstate { mode = Mode, is_durable = IsDurable,
+ queue = Q, length = 0, memory_size = QSize })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
rabbit_disk_queue:publish(Q, Msg, false),
+ State1 = State #mqstate { memory_size = QSize + size_of_message(Msg) },
if IsDurable andalso IsPersistent ->
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
{MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
- {ok, AckTag, State};
+ {ok, AckTag, State1};
true ->
%% in this case, we don't actually care about the ack, so
%% auto ack it (asynchronously).
ok = rabbit_disk_queue:auto_ack_next_message(Q),
- {ok, noack, State}
+ {ok, noack, State1}
end;
-publish_delivered(_Msg, State = #mqstate { mode = mixed, length = 0 }) ->
- {ok, noack, State}.
+publish_delivered(Msg, State =
+ #mqstate { mode = mixed, length = 0, memory_size = QSize }) ->
+ {ok, noack, State #mqstate { memory_size = QSize + size_of_message(Msg) }}.
deliver(State = #mqstate { length = 0 }) ->
{empty, State};
deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
- length = Length, memory_size = QSize }) ->
+ length = Length }) ->
{Msg = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered, AckTag, Remaining}
= rabbit_disk_queue:deliver(Q),
- QSize1 = QSize - size_of_message(Msg),
AckTag1 = if IsPersistent andalso IsDurable -> AckTag;
true -> ok = rabbit_disk_queue:ack(Q, [AckTag]),
noack
end,
{{Msg, IsDelivered, AckTag1, Remaining},
- State #mqstate { length = Length - 1, memory_size = QSize1 }};
-deliver(State =
- #mqstate { mode = mixed, msg_buf = MsgBuf, is_durable = IsDurable,
- queue = Q, length = Length, memory_size = QSize }) ->
+ State #mqstate { length = Length - 1 }};
+deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
+ is_durable = IsDurable, length = Length }) ->
{{value, {Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
IsDelivered, OnDisk}}, MsgBuf1}
@@ -290,18 +292,24 @@ deliver(State =
true -> noack
end,
Rem = Length - 1,
- QSize1 = QSize - size_of_message(Msg),
{{Msg, IsDelivered, AckTag, Rem},
- State #mqstate { msg_buf = MsgBuf1, length = Rem, memory_size = QSize1 }}.
-
-remove_noacks(Acks) ->
- lists:filter(fun (A) -> A /= noack end, Acks).
-
-ack(Acks, State = #mqstate { queue = Q }) ->
- case remove_noacks(Acks) of
- [] -> {ok, State};
- AckTags -> ok = rabbit_disk_queue:ack(Q, AckTags),
- {ok, State}
+ State #mqstate { msg_buf = MsgBuf1, length = Rem }}.
+
+remove_noacks(MsgsWithAcks) ->
+ {AckTags, ASize} =
+ lists:foldl(
+ fun ({Msg, noack}, {AccAckTags, AccSize}) ->
+ {AccAckTags, size_of_message(Msg) + AccSize};
+ ({Msg, AckTag}, {AccAckTags, AccSize}) ->
+ {[AckTag | AccAckTags], size_of_message(Msg) + AccSize}
+ end, {[], 0}, MsgsWithAcks),
+ {lists:reverse(AckTags), ASize}.
+
+ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize }) ->
+ case remove_noacks(MsgsWithAcks) of
+ {[], ASize} -> {ok, State #mqstate { memory_size = QSize - ASize }};
+ {AckTags, ASize} -> ok = rabbit_disk_queue:ack(Q, AckTags),
+ {ok, State #mqstate { memory_size = QSize - ASize }}
end.
tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize }) ->
@@ -320,19 +328,20 @@ tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize }) ->
only_msg_ids(Pubs) ->
lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
-tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q,
- length = Length }) ->
- RealAcks = remove_noacks(Acks),
+tx_commit(Publishes, MsgsWithAcks,
+ State = #mqstate { mode = disk, queue = Q, length = Length,
+ memory_size = QSize }) ->
+ {RealAcks, ASize} = remove_noacks(MsgsWithAcks),
ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
RealAcks)
end,
- {ok, State #mqstate { length = Length + erlang:length(Publishes) }};
-tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
- msg_buf = MsgBuf,
- is_durable = IsDurable,
- length = Length
- }) ->
+ {ok, State #mqstate { length = Length + erlang:length(Publishes),
+ memory_size = QSize - ASize }};
+tx_commit(Publishes, MsgsWithAcks,
+ State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ is_durable = IsDurable, length = Length,
+ memory_size = QSize }) ->
{PersistentPubs, MsgBuf1} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
{Acc, MsgBuf2}) ->
@@ -346,14 +355,15 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
end, {[], MsgBuf}, Publishes),
%% foldl reverses, so re-reverse PersistentPubs to match
%% requirements of rabbit_disk_queue (ascending SeqIds)
- RealAcks = remove_noacks(Acks),
+ {RealAcks, ASize} = remove_noacks(MsgsWithAcks),
ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok;
true ->
rabbit_disk_queue:tx_commit(
Q, lists:reverse(PersistentPubs), RealAcks)
end,
{ok, State #mqstate { msg_buf = MsgBuf1,
- length = Length + erlang:length(Publishes) }}.
+ length = Length + erlang:length(Publishes),
+ memory_size = QSize - ASize }}.
tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize }) ->
{MsgIds, CSize} =
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index cc10074c38..50f66063d8 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -81,7 +81,8 @@ handle_call({register, Pid}, _From,
end,
{reply, {ok, Result}, State #state { queues = dict:store(Pid, 0, Qs) }}.
-handle_cast(_Any, State) ->
+handle_cast(Any, State) ->
+ io:format("~w~n", [Any]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},