diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-25 12:04:54 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-25 12:04:54 +0100 |
| commit | 8fd8e8aa2c70134c8ccf0e88b861026e4d0e1e2b (patch) | |
| tree | 863b522fc5d6c1ee4084f046f71b7a472c94eb64 | |
| parent | 88689b125975c1fc5a91edf5d3ea39a671ab876e (diff) | |
| download | rabbitmq-server-git-8fd8e8aa2c70134c8ccf0e88b861026e4d0e1e2b.tar.gz | |
MQ: Made run length encoding more obvious; Added comment about logic for starting up prefetcher; Tidied API for dec_queue_length and inc_queue_length
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 73 |
1 files changed, 38 insertions, 35 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 7cda1004cc..2e8fb333bb 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -102,7 +102,7 @@ init(Queue, IsDurable) -> Len = rabbit_disk_queue:len(Queue), - MsgBuf = inc_queue_length(Queue, queue:new(), Len), + MsgBuf = inc_queue_length(queue:new(), Len), Size = rabbit_disk_queue:foldl( fun (Msg = #basic_message { is_persistent = true }, _AckTag, _IsDelivered, Acc) -> @@ -125,16 +125,15 @@ set_storage_mode(disk, TxnMessages, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, prefetcher = Prefetcher }) -> State1 = State #mqstate { mode = disk }, - {MsgBuf1, State2} = + MsgBuf1 = case Prefetcher of - undefined -> {MsgBuf, State1}; + undefined -> MsgBuf; _ -> case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of - empty -> {MsgBuf, State1}; + empty -> MsgBuf; {Fetched, Len} -> - State3 = #mqstate { msg_buf = MsgBuf2 } = - dec_queue_length(Len, State1), - {queue:join(Fetched, MsgBuf2), State3} + MsgBuf2 = dec_queue_length(MsgBuf, Len), + queue:join(Fetched, MsgBuf2) end end, %% We enqueue _everything_ here. This means that should a message @@ -157,7 +156,7 @@ set_storage_mode(disk, TxnMessages, State = end end, TxnMessages), garbage_collect(), - {ok, State2 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}; + {ok, State1 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}; set_storage_mode(mixed, TxnMessages, State = #mqstate { mode = disk, is_durable = IsDurable }) -> %% The queue has a token just saying how many msgs are on disk @@ -197,7 +196,7 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, true -> %% it's already in the Q send_messages_to_disk( IsDurable, Q, Queue1, PublishCount, RequeueCount + 1, - Commit, Ack, inc_queue_length(Q, MsgBuf, 1)); + Commit, Ack, inc_queue_length(MsgBuf, 1)); false -> republish_message_to_disk_queue( IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, @@ -209,10 +208,10 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, republish_message_to_disk_queue( IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, [AckTag | Ack], MsgBuf, Msg, IsDelivered); - {{value, {Q, Count}}, Queue1} -> + {{value, {on_disk, Count}}, Queue1} -> send_messages_to_disk(IsDurable, Q, Queue1, PublishCount, RequeueCount + Count, Commit, Ack, - inc_queue_length(Q, MsgBuf, Count)) + inc_queue_length(MsgBuf, Count)) end. republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount, @@ -228,7 +227,7 @@ republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount, false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1} end, send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0, - Commit2, Ack2, inc_queue_length(Q, MsgBuf, 1)). + Commit2, Ack2, inc_queue_length(MsgBuf, 1)). flush_messages_to_disk_queue(_Q, [], []) -> ok; @@ -253,28 +252,27 @@ lose_memory(Dec, State = #mqstate { memory_size = QSize, State #mqstate { memory_size = QSize - Dec, memory_loss = Loss + Dec }. -inc_queue_length(_Q, MsgBuf, 0) -> +inc_queue_length(MsgBuf, 0) -> MsgBuf; -inc_queue_length(Q, MsgBuf, Count) -> +inc_queue_length(MsgBuf, Count) -> {NewCount, MsgBufTail} = case queue:out_r(MsgBuf) of - {empty, MsgBuf1} -> {Count, MsgBuf1}; - {{value, {Q, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1}; - {{value, _}, _MsgBuf1} -> {Count, MsgBuf} + {empty, MsgBuf1} -> {Count, MsgBuf1}; + {{value, {on_disk, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1}; + {{value, _}, _MsgBuf1} -> {Count, MsgBuf} end, - queue:in({Q, NewCount}, MsgBufTail). + queue:in({on_disk, NewCount}, MsgBufTail). -dec_queue_length(Count, State = #mqstate { queue = Q, msg_buf = MsgBuf }) -> +dec_queue_length(MsgBuf, Count) -> case queue:out(MsgBuf) of - {{value, {Q, Len}}, MsgBuf1} -> + {{value, {on_disk, Len}}, MsgBuf1} -> case Len of Count -> - State #mqstate { msg_buf = MsgBuf1 }; + MsgBuf1; _ when Len > Count -> - State #mqstate { msg_buf = queue:in_r({Q, Len-Count}, - MsgBuf1)} + queue:in_r({on_disk, Len-Count}, MsgBuf1) end; - _ -> State + _ -> MsgBuf end. maybe_prefetch(State = #mqstate { prefetcher = undefined, @@ -282,9 +280,13 @@ maybe_prefetch(State = #mqstate { prefetcher = undefined, msg_buf = MsgBuf, queue = Q }) -> case queue:peek(MsgBuf) of - {value, {Q, Count}} -> {ok, Prefetcher} = - rabbit_queue_prefetcher:start_link(Q, Count), - State #mqstate { prefetcher = Prefetcher }; + {value, {on_disk, Count}} -> + %% only prefetch for the next contiguous block on + %% disk. Beyond there, we either hit the end of the queue, + %% or the next msg is already in RAM, held by us, the + %% mixed queue + {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(Q, Count), + State #mqstate { prefetcher = Prefetcher }; _ -> State end; maybe_prefetch(State) -> @@ -292,7 +294,7 @@ maybe_prefetch(State) -> publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, msg_buf = MsgBuf }) -> - MsgBuf1 = inc_queue_length(Q, MsgBuf, 1), + MsgBuf1 = inc_queue_length(MsgBuf, 1), ok = rabbit_disk_queue:publish(Q, Msg, false), {ok, gain_memory(size_of_message(Msg), State #mqstate { msg_buf = MsgBuf1, @@ -356,19 +358,20 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, {{Msg, IsDelivered, AckTag1, Rem}, State1 #mqstate { msg_buf = MsgBuf1 }}; _ when Prefetcher == undefined -> - State2 = dec_queue_length(1, State1), + MsgBuf2 = dec_queue_length(MsgBuf, 1), {Msg = #basic_message { is_persistent = IsPersistent }, IsDelivered, AckTag, _PersistRem} = rabbit_disk_queue:fetch(Q), AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag), - {{Msg, IsDelivered, AckTag1, Rem}, State2}; + {{Msg, IsDelivered, AckTag1, Rem}, + State1 #mqstate { msg_buf = MsgBuf2 }}; _ -> case rabbit_queue_prefetcher:drain(Prefetcher) of empty -> fetch(State #mqstate { prefetcher = undefined }); {Fetched, Len, Status} -> - State2 = #mqstate { msg_buf = MsgBuf2 } = - dec_queue_length(Len, State), - fetch(State2 #mqstate + MsgBuf2 = dec_queue_length(MsgBuf, Len), + %% use State, not State1 as we've not dec'd length + fetch(State #mqstate { msg_buf = queue:join(Fetched, MsgBuf2), prefetcher = case Status of finished -> undefined; @@ -424,7 +427,7 @@ tx_commit(Publishes, MsgsWithAcks, Len = erlang:length(Publishes), {ok, lose_memory(ASize, State #mqstate { length = Length + Len, - msg_buf = inc_queue_length(Q, MsgBuf, Len) })}; + msg_buf = inc_queue_length(MsgBuf, Len) })}; tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, length = Length }) -> @@ -501,7 +504,7 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), Len = erlang:length(MessagesWithAckTags), {ok, State #mqstate { length = Length + Len, - msg_buf = inc_queue_length(Q, MsgBuf, Len) }}; + msg_buf = inc_queue_length(MsgBuf, Len) }}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, |
