summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-25 12:04:54 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-25 12:04:54 +0100
commit8fd8e8aa2c70134c8ccf0e88b861026e4d0e1e2b (patch)
tree863b522fc5d6c1ee4084f046f71b7a472c94eb64
parent88689b125975c1fc5a91edf5d3ea39a671ab876e (diff)
downloadrabbitmq-server-git-8fd8e8aa2c70134c8ccf0e88b861026e4d0e1e2b.tar.gz
MQ: Made run length encoding more obvious; Added comment about logic for starting up prefetcher; Tidied API for dec_queue_length and inc_queue_length
-rw-r--r--src/rabbit_mixed_queue.erl73
1 files changed, 38 insertions, 35 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 7cda1004cc..2e8fb333bb 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -102,7 +102,7 @@
init(Queue, IsDurable) ->
Len = rabbit_disk_queue:len(Queue),
- MsgBuf = inc_queue_length(Queue, queue:new(), Len),
+ MsgBuf = inc_queue_length(queue:new(), Len),
Size = rabbit_disk_queue:foldl(
fun (Msg = #basic_message { is_persistent = true },
_AckTag, _IsDelivered, Acc) ->
@@ -125,16 +125,15 @@ set_storage_mode(disk, TxnMessages, State =
#mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
is_durable = IsDurable, prefetcher = Prefetcher }) ->
State1 = State #mqstate { mode = disk },
- {MsgBuf1, State2} =
+ MsgBuf1 =
case Prefetcher of
- undefined -> {MsgBuf, State1};
+ undefined -> MsgBuf;
_ ->
case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
- empty -> {MsgBuf, State1};
+ empty -> MsgBuf;
{Fetched, Len} ->
- State3 = #mqstate { msg_buf = MsgBuf2 } =
- dec_queue_length(Len, State1),
- {queue:join(Fetched, MsgBuf2), State3}
+ MsgBuf2 = dec_queue_length(MsgBuf, Len),
+ queue:join(Fetched, MsgBuf2)
end
end,
%% We enqueue _everything_ here. This means that should a message
@@ -157,7 +156,7 @@ set_storage_mode(disk, TxnMessages, State =
end
end, TxnMessages),
garbage_collect(),
- {ok, State2 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }};
+ {ok, State1 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }};
set_storage_mode(mixed, TxnMessages, State =
#mqstate { mode = disk, is_durable = IsDurable }) ->
%% The queue has a token just saying how many msgs are on disk
@@ -197,7 +196,7 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
true -> %% it's already in the Q
send_messages_to_disk(
IsDurable, Q, Queue1, PublishCount, RequeueCount + 1,
- Commit, Ack, inc_queue_length(Q, MsgBuf, 1));
+ Commit, Ack, inc_queue_length(MsgBuf, 1));
false ->
republish_message_to_disk_queue(
IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
@@ -209,10 +208,10 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
republish_message_to_disk_queue(
IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
[AckTag | Ack], MsgBuf, Msg, IsDelivered);
- {{value, {Q, Count}}, Queue1} ->
+ {{value, {on_disk, Count}}, Queue1} ->
send_messages_to_disk(IsDurable, Q, Queue1, PublishCount,
RequeueCount + Count, Commit, Ack,
- inc_queue_length(Q, MsgBuf, Count))
+ inc_queue_length(MsgBuf, Count))
end.
republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount,
@@ -228,7 +227,7 @@ republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount,
false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1}
end,
send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0,
- Commit2, Ack2, inc_queue_length(Q, MsgBuf, 1)).
+ Commit2, Ack2, inc_queue_length(MsgBuf, 1)).
flush_messages_to_disk_queue(_Q, [], []) ->
ok;
@@ -253,28 +252,27 @@ lose_memory(Dec, State = #mqstate { memory_size = QSize,
State #mqstate { memory_size = QSize - Dec,
memory_loss = Loss + Dec }.
-inc_queue_length(_Q, MsgBuf, 0) ->
+inc_queue_length(MsgBuf, 0) ->
MsgBuf;
-inc_queue_length(Q, MsgBuf, Count) ->
+inc_queue_length(MsgBuf, Count) ->
{NewCount, MsgBufTail} =
case queue:out_r(MsgBuf) of
- {empty, MsgBuf1} -> {Count, MsgBuf1};
- {{value, {Q, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1};
- {{value, _}, _MsgBuf1} -> {Count, MsgBuf}
+ {empty, MsgBuf1} -> {Count, MsgBuf1};
+ {{value, {on_disk, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1};
+ {{value, _}, _MsgBuf1} -> {Count, MsgBuf}
end,
- queue:in({Q, NewCount}, MsgBufTail).
+ queue:in({on_disk, NewCount}, MsgBufTail).
-dec_queue_length(Count, State = #mqstate { queue = Q, msg_buf = MsgBuf }) ->
+dec_queue_length(MsgBuf, Count) ->
case queue:out(MsgBuf) of
- {{value, {Q, Len}}, MsgBuf1} ->
+ {{value, {on_disk, Len}}, MsgBuf1} ->
case Len of
Count ->
- State #mqstate { msg_buf = MsgBuf1 };
+ MsgBuf1;
_ when Len > Count ->
- State #mqstate { msg_buf = queue:in_r({Q, Len-Count},
- MsgBuf1)}
+ queue:in_r({on_disk, Len-Count}, MsgBuf1)
end;
- _ -> State
+ _ -> MsgBuf
end.
maybe_prefetch(State = #mqstate { prefetcher = undefined,
@@ -282,9 +280,13 @@ maybe_prefetch(State = #mqstate { prefetcher = undefined,
msg_buf = MsgBuf,
queue = Q }) ->
case queue:peek(MsgBuf) of
- {value, {Q, Count}} -> {ok, Prefetcher} =
- rabbit_queue_prefetcher:start_link(Q, Count),
- State #mqstate { prefetcher = Prefetcher };
+ {value, {on_disk, Count}} ->
+ %% only prefetch for the next contiguous block on
+ %% disk. Beyond there, we either hit the end of the queue,
+ %% or the next msg is already in RAM, held by us, the
+ %% mixed queue
+ {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(Q, Count),
+ State #mqstate { prefetcher = Prefetcher };
_ -> State
end;
maybe_prefetch(State) ->
@@ -292,7 +294,7 @@ maybe_prefetch(State) ->
publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
msg_buf = MsgBuf }) ->
- MsgBuf1 = inc_queue_length(Q, MsgBuf, 1),
+ MsgBuf1 = inc_queue_length(MsgBuf, 1),
ok = rabbit_disk_queue:publish(Q, Msg, false),
{ok, gain_memory(size_of_message(Msg),
State #mqstate { msg_buf = MsgBuf1,
@@ -356,19 +358,20 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
{{Msg, IsDelivered, AckTag1, Rem},
State1 #mqstate { msg_buf = MsgBuf1 }};
_ when Prefetcher == undefined ->
- State2 = dec_queue_length(1, State1),
+ MsgBuf2 = dec_queue_length(MsgBuf, 1),
{Msg = #basic_message { is_persistent = IsPersistent },
IsDelivered, AckTag, _PersistRem}
= rabbit_disk_queue:fetch(Q),
AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag),
- {{Msg, IsDelivered, AckTag1, Rem}, State2};
+ {{Msg, IsDelivered, AckTag1, Rem},
+ State1 #mqstate { msg_buf = MsgBuf2 }};
_ ->
case rabbit_queue_prefetcher:drain(Prefetcher) of
empty -> fetch(State #mqstate { prefetcher = undefined });
{Fetched, Len, Status} ->
- State2 = #mqstate { msg_buf = MsgBuf2 } =
- dec_queue_length(Len, State),
- fetch(State2 #mqstate
+ MsgBuf2 = dec_queue_length(MsgBuf, Len),
+ %% use State, not State1 as we've not dec'd length
+ fetch(State #mqstate
{ msg_buf = queue:join(Fetched, MsgBuf2),
prefetcher = case Status of
finished -> undefined;
@@ -424,7 +427,7 @@ tx_commit(Publishes, MsgsWithAcks,
Len = erlang:length(Publishes),
{ok, lose_memory(ASize, State #mqstate
{ length = Length + Len,
- msg_buf = inc_queue_length(Q, MsgBuf, Len) })};
+ msg_buf = inc_queue_length(MsgBuf, Len) })};
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
is_durable = IsDurable, length = Length }) ->
@@ -501,7 +504,7 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
Len = erlang:length(MessagesWithAckTags),
{ok, State #mqstate { length = Length + Len,
- msg_buf = inc_queue_length(Q, MsgBuf, Len) }};
+ msg_buf = inc_queue_length(MsgBuf, Len) }};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
is_durable = IsDurable,