diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-23 14:24:24 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-23 14:24:24 +0100 |
| commit | 8f477b067b49152a6e68f2197822002742bf6164 (patch) | |
| tree | c4be0f5415a1b191983fafb25668ddb14d2cbeef | |
| parent | afb63bcd59ea0bd7ba82316a42aba136d14a9963 (diff) | |
| download | rabbitmq-server-git-8f477b067b49152a6e68f2197822002742bf6164.tar.gz | |
only reduce memory size when messages are acked, not when they're delivered.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 106 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 3 |
3 files changed, 67 insertions, 57 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5e607a4636..f15a58cd07 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -112,7 +112,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> next_msg_id = 1, active_consumers = queue:new(), blocked_consumers = queue:new(), - memory_report_counter = ?MEMORY_REPORT_INTERVAL, + memory_report_counter = 0, old_memory_report = {1, now()} }, ?HIBERNATE_AFTER}. @@ -263,7 +263,7 @@ deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, AutoAcks1 = case AckRequired of true -> AutoAcks; - false -> [AckTag | AutoAcks] + false -> [{Msg, AckTag} | AutoAcks] end, {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, State #q { mixed_state = MS1 }}. @@ -331,8 +331,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> -1 < Len. deliver_or_requeue_msgs_deliver( - false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> - {{Msg, true, noack}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State}; + false, {Len, AcksAcc, [(MsgAckTag = {Msg, _}) | MsgsWithAcks]}, State) -> + {{Msg, true, noack}, {Len - 1, [MsgAckTag | AcksAcc], MsgsWithAcks}, State}; deliver_or_requeue_msgs_deliver( true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> {{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. @@ -487,7 +487,7 @@ commit_transaction(Txn, State) -> {MsgWithAcks, Remaining} = collect_messages(PendingAcksOrdered, UAM), store_ch_record(C#cr{unacked_messages = Remaining}), - [ AckTag || {_Msg, AckTag} <- MsgWithAcks ] + MsgWithAcks end, {ok, MS} = rabbit_mixed_queue:tx_commit( PendingMessagesOrdered, Acks, State #q.mixed_state), @@ -623,7 +623,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, store_ch_record(C#cr{unacked_messages = NewUAM}), {ok, MS1}; false -> - rabbit_mixed_queue:ack([AckTag], MS1) + rabbit_mixed_queue:ack([{Msg, AckTag}], MS1) end, Message = {QName, self(), NextId, IsDelivered, Msg}, reply({ok, Remaining, Message}, @@ -771,9 +771,8 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), case Txn of none -> - Acks = [ AckTag || {_Msg, AckTag} <- MsgWithAcks ], {ok, MS} = - rabbit_mixed_queue:ack(Acks, State #q.mixed_state), + rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state), store_ch_record(C#cr{unacked_messages = Remaining}), noreply(State #q { mixed_state = MS }); _ -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index e700d3d220..243600034b 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -185,12 +185,13 @@ to_mixed_mode(TxnMessages, State = {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}. purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, - is_durable = IsDurable }) -> + is_durable = IsDurable, + memory_size = QSize }) -> %% iterate through the content on disk, ack anything which isn't %% persistent, accumulate everything else that is persistent and %% requeue it - {Acks, Requeue, Length} = - deliver_all_messages(Q, IsDurable, [], [], 0), + {Acks, Requeue, Length, ASize} = + deliver_all_messages(Q, IsDurable, [], [], 0, 0), ok = if Requeue == [] -> ok; true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) @@ -198,22 +199,22 @@ purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, ok = if Acks == [] -> ok; true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks)) end, - {ok, State #mqstate { length = Length }}. + {ok, State #mqstate { length = Length, memory_size = QSize - ASize }}. -deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) -> +deliver_all_messages(Q, IsDurable, Acks, Requeue, Length, ASize) -> case rabbit_disk_queue:deliver(Q) of - empty -> {Acks, Requeue, Length}; - {#basic_message { is_persistent = IsPersistent }, + empty -> {Acks, Requeue, Length, ASize}; + {Msg = #basic_message { is_persistent = IsPersistent }, _Size, IsDelivered, AckTag, _Remaining} -> OnDisk = IsPersistent andalso IsDurable, - {Acks1, Requeue1, Length1} = - if OnDisk -> {Acks, - [{AckTag, {next, IsDelivered}} | Requeue], - Length + 1 - }; - true -> {[AckTag | Acks], Requeue, Length} + {Acks1, Requeue1, Length1, ASize1} = + if OnDisk -> { Acks, + [{AckTag, {next, IsDelivered}} | Requeue], + Length + 1, ASize }; + true -> { [AckTag | Acks], Requeue, Length, + ASize + size_of_message(Msg) } end, - deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1) + deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1, ASize1) end. publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, @@ -237,42 +238,43 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, State = %% attempt_immediate_delivery). publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent}, - State = #mqstate { mode = Mode, is_durable = IsDurable, - queue = Q, length = 0 }) + State = + #mqstate { mode = Mode, is_durable = IsDurable, + queue = Q, length = 0, memory_size = QSize }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> rabbit_disk_queue:publish(Q, Msg, false), + State1 = State #mqstate { memory_size = QSize + size_of_message(Msg) }, if IsDurable andalso IsPersistent -> %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but %% unavoidable as we need the AckTag {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), - {ok, AckTag, State}; + {ok, AckTag, State1}; true -> %% in this case, we don't actually care about the ack, so %% auto ack it (asynchronously). ok = rabbit_disk_queue:auto_ack_next_message(Q), - {ok, noack, State} + {ok, noack, State1} end; -publish_delivered(_Msg, State = #mqstate { mode = mixed, length = 0 }) -> - {ok, noack, State}. +publish_delivered(Msg, State = + #mqstate { mode = mixed, length = 0, memory_size = QSize }) -> + {ok, noack, State #mqstate { memory_size = QSize + size_of_message(Msg) }}. deliver(State = #mqstate { length = 0 }) -> {empty, State}; deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, - length = Length, memory_size = QSize }) -> + length = Length }) -> {Msg = #basic_message { is_persistent = IsPersistent }, _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q), - QSize1 = QSize - size_of_message(Msg), AckTag1 = if IsPersistent andalso IsDurable -> AckTag; true -> ok = rabbit_disk_queue:ack(Q, [AckTag]), noack end, {{Msg, IsDelivered, AckTag1, Remaining}, - State #mqstate { length = Length - 1, memory_size = QSize1 }}; -deliver(State = - #mqstate { mode = mixed, msg_buf = MsgBuf, is_durable = IsDurable, - queue = Q, length = Length, memory_size = QSize }) -> + State #mqstate { length = Length - 1 }}; +deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q, + is_durable = IsDurable, length = Length }) -> {{value, {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered, OnDisk}}, MsgBuf1} @@ -290,18 +292,24 @@ deliver(State = true -> noack end, Rem = Length - 1, - QSize1 = QSize - size_of_message(Msg), {{Msg, IsDelivered, AckTag, Rem}, - State #mqstate { msg_buf = MsgBuf1, length = Rem, memory_size = QSize1 }}. - -remove_noacks(Acks) -> - lists:filter(fun (A) -> A /= noack end, Acks). - -ack(Acks, State = #mqstate { queue = Q }) -> - case remove_noacks(Acks) of - [] -> {ok, State}; - AckTags -> ok = rabbit_disk_queue:ack(Q, AckTags), - {ok, State} + State #mqstate { msg_buf = MsgBuf1, length = Rem }}. + +remove_noacks(MsgsWithAcks) -> + {AckTags, ASize} = + lists:foldl( + fun ({Msg, noack}, {AccAckTags, AccSize}) -> + {AccAckTags, size_of_message(Msg) + AccSize}; + ({Msg, AckTag}, {AccAckTags, AccSize}) -> + {[AckTag | AccAckTags], size_of_message(Msg) + AccSize} + end, {[], 0}, MsgsWithAcks), + {lists:reverse(AckTags), ASize}. + +ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize }) -> + case remove_noacks(MsgsWithAcks) of + {[], ASize} -> {ok, State #mqstate { memory_size = QSize - ASize }}; + {AckTags, ASize} -> ok = rabbit_disk_queue:ack(Q, AckTags), + {ok, State #mqstate { memory_size = QSize - ASize }} end. tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize }) -> @@ -320,19 +328,20 @@ tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize }) -> only_msg_ids(Pubs) -> lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs). -tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q, - length = Length }) -> - RealAcks = remove_noacks(Acks), +tx_commit(Publishes, MsgsWithAcks, + State = #mqstate { mode = disk, queue = Q, length = Length, + memory_size = QSize }) -> + {RealAcks, ASize} = remove_noacks(MsgsWithAcks), ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), RealAcks) end, - {ok, State #mqstate { length = Length + erlang:length(Publishes) }}; -tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, - msg_buf = MsgBuf, - is_durable = IsDurable, - length = Length - }) -> + {ok, State #mqstate { length = Length + erlang:length(Publishes), + memory_size = QSize - ASize }}; +tx_commit(Publishes, MsgsWithAcks, + State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + is_durable = IsDurable, length = Length, + memory_size = QSize }) -> {PersistentPubs, MsgBuf1} = lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, {Acc, MsgBuf2}) -> @@ -346,14 +355,15 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, end, {[], MsgBuf}, Publishes), %% foldl reverses, so re-reverse PersistentPubs to match %% requirements of rabbit_disk_queue (ascending SeqIds) - RealAcks = remove_noacks(Acks), + {RealAcks, ASize} = remove_noacks(MsgsWithAcks), ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok; true -> rabbit_disk_queue:tx_commit( Q, lists:reverse(PersistentPubs), RealAcks) end, {ok, State #mqstate { msg_buf = MsgBuf1, - length = Length + erlang:length(Publishes) }}. + length = Length + erlang:length(Publishes), + memory_size = QSize - ASize }}. tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize }) -> {MsgIds, CSize} = diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index cc10074c38..50f66063d8 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -81,7 +81,8 @@ handle_call({register, Pid}, _From, end, {reply, {ok, Result}, State #state { queues = dict:store(Pid, 0, Qs) }}. -handle_cast(_Any, State) -> +handle_cast(Any, State) -> + io:format("~w~n", [Any]), {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, |
