summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-08-26 18:54:44 +0100
committerMatthias Radestock <matthias@lshift.net>2009-08-26 18:54:44 +0100
commit0c8dc46663459832ada1cf3bd4d2ea050cffca73 (patch)
tree26b259e761e622b928f9a9fcc25db1d39d60e053
parent3b8c39f4114b1b28770a65886c182986c2d3dbdf (diff)
downloadrabbitmq-server-git-0c8dc46663459832ada1cf3bd4d2ea050cffca73.tar.gz
cosmetic: place functions in appropriate sections of the file
-rw-r--r--src/rabbit_mixed_queue.erl426
1 files changed, 218 insertions, 208 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 6fc5db619f..d131eea111 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -56,6 +56,8 @@
-define(TO_DISK_MAX_FLUSH_SIZE, 100000).
+%%----------------------------------------------------------------------------
+
-ifdef(use_specs).
-type(mode() :: ( 'disk' | 'mixed' )).
@@ -85,14 +87,11 @@
-spec(tx_rollback/2 :: ([message()], mqstate()) -> okmqs()).
-spec(requeue/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()).
-spec(purge/1 :: (mqstate()) -> okmqs()).
-
-spec(delete_queue/1 :: (mqstate()) -> {'ok', mqstate()}).
-
-spec(len/1 :: (mqstate()) -> non_neg_integer()).
-spec(is_empty/1 :: (mqstate()) -> boolean()).
-spec(set_storage_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()).
-
-spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) ->
{mqstate(), non_neg_integer(), non_neg_integer(),
non_neg_integer()}).
@@ -100,6 +99,8 @@
-endif.
+%%----------------------------------------------------------------------------
+
init(Queue, IsDurable) ->
Len = rabbit_disk_queue:len(Queue),
MsgBuf = inc_queue_length(queue:new(), Len),
@@ -113,195 +114,6 @@ init(Queue, IsDurable) ->
memory_size = Size, memory_gain = undefined,
memory_loss = undefined, prefetcher = undefined }}.
-size_of_message(
- #basic_message { content = #content { payload_fragments_rev = Payload,
- properties_bin = PropsBin }})
- when is_binary(PropsBin) ->
- size(PropsBin) + lists:foldl(fun (Frag, SumAcc) ->
- SumAcc + size(Frag)
- end, 0, Payload).
-
-ensure_binary_properties(Msg = #basic_message { content = Content }) ->
- Msg #basic_message {
- content = rabbit_binary_generator:ensure_content_encoded(Content) }.
-
-set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
- {ok, State};
-set_storage_mode(disk, TxnMessages, State =
- #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
- is_durable = IsDurable, prefetcher = Prefetcher }) ->
- State1 = State #mqstate { mode = disk },
- MsgBuf1 =
- case Prefetcher of
- undefined -> MsgBuf;
- _ ->
- case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
- empty -> MsgBuf;
- {Fetched, Len} ->
- MsgBuf2 = dec_queue_length(MsgBuf, Len),
- queue:join(Fetched, MsgBuf2)
- end
- end,
- %% We enqueue _everything_ here. This means that should a message
- %% already be in the disk queue we must remove it and add it back
- %% in. Fortunately, by using requeue, we avoid rewriting the
- %% message on disk.
- %% Note we also batch together messages on disk so that we minimise
- %% the calls to requeue.
- {ok, MsgBuf3} =
- send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()),
- %% tx_publish txn messages. Some of these will have been already
- %% published if they really are durable and persistent which is
- %% why we can't just use our own tx_publish/2 function (would end
- %% up publishing twice, so refcount would go wrong in disk_queue).
- lists:foreach(
- fun (Msg = #basic_message { is_persistent = IsPersistent }) ->
- ok = case IsDurable andalso IsPersistent of
- true -> ok;
- _ -> rabbit_disk_queue:tx_publish(Msg)
- end
- end, TxnMessages),
- garbage_collect(),
- {ok, State1 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }};
-set_storage_mode(mixed, TxnMessages, State =
- #mqstate { mode = disk, is_durable = IsDurable }) ->
- %% The queue has a token just saying how many msgs are on disk
- %% (this is already built for us when in disk mode).
- %% Don't actually do anything to the disk
- %% Don't start prefetcher just yet because the queue maybe busy -
- %% wait for hibernate timeout in the amqqueue_process.
-
- %% Remove txn messages from disk which are neither persistent and
- %% durable. This is necessary to avoid leaks. This is also pretty
- %% much the inverse behaviour of our own tx_rollback/2 which is why
- %% we're not using it.
- Cancel =
- lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
- case IsDurable andalso IsPersistent of
- true -> Acc;
- false -> [Msg #basic_message.guid | Acc]
- end
- end, [], TxnMessages),
- ok = if Cancel == [] -> ok;
- true -> rabbit_disk_queue:tx_rollback(Cancel)
- end,
- garbage_collect(),
- {ok, State #mqstate { mode = mixed }}.
-
-send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
- Commit, Ack, MsgBuf) ->
- case queue:out(Queue) of
- {empty, _Queue} ->
- ok = flush_messages_to_disk_queue(Q, Commit, Ack),
- {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []),
- {ok, MsgBuf};
- {{value, {Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered}}, Queue1} ->
- case IsDurable andalso IsPersistent of
- true -> %% it's already in the Q
- send_messages_to_disk(
- IsDurable, Q, Queue1, PublishCount, RequeueCount + 1,
- Commit, Ack, inc_queue_length(MsgBuf, 1));
- false ->
- republish_message_to_disk_queue(
- IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
- Ack, MsgBuf, Msg, IsDelivered)
- end;
- {{value, {Msg, IsDelivered, AckTag}}, Queue1} ->
- %% these have come via the prefetcher, so are no longer in
- %% the disk queue so they need to be republished
- republish_message_to_disk_queue(
- IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
- [AckTag | Ack], MsgBuf, Msg, IsDelivered);
- {{value, {on_disk, Count}}, Queue1} ->
- send_messages_to_disk(IsDurable, Q, Queue1, PublishCount,
- RequeueCount + Count, Commit, Ack,
- inc_queue_length(MsgBuf, Count))
- end.
-
-republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount,
- Commit, Ack, MsgBuf, Msg =
- #basic_message { guid = MsgId }, IsDelivered) ->
- {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack),
- ok = rabbit_disk_queue:tx_publish(Msg),
- {PublishCount1, Commit2, Ack2} =
- case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
- true -> ok = flush_messages_to_disk_queue(
- Q, [{MsgId, IsDelivered} | Commit1], Ack1),
- {0, [], []};
- false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1}
- end,
- send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0,
- Commit2, Ack2, inc_queue_length(MsgBuf, 1)).
-
-flush_messages_to_disk_queue(_Q, [], []) ->
- ok;
-flush_messages_to_disk_queue(Q, Commit, Ack) ->
- rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack).
-
-flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) ->
- {Commit, Ack};
-flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) ->
- ok = flush_messages_to_disk_queue(Q, Commit, Ack),
- ok = rabbit_disk_queue:filesync(),
- ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount),
- {[], []}.
-
-gain_memory(Inc, State = #mqstate { memory_size = QSize,
- memory_gain = Gain }) ->
- State #mqstate { memory_size = QSize + Inc,
- memory_gain = Gain + Inc }.
-
-lose_memory(Dec, State = #mqstate { memory_size = QSize,
- memory_loss = Loss }) ->
- State #mqstate { memory_size = QSize - Dec,
- memory_loss = Loss + Dec }.
-
-inc_queue_length(MsgBuf, 0) ->
- MsgBuf;
-inc_queue_length(MsgBuf, Count) ->
- {NewCount, MsgBufTail} =
- case queue:out_r(MsgBuf) of
- {empty, MsgBuf1} -> {Count, MsgBuf1};
- {{value, {on_disk, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1};
- {{value, _}, _MsgBuf1} -> {Count, MsgBuf}
- end,
- queue:in({on_disk, NewCount}, MsgBufTail).
-
-dec_queue_length(MsgBuf, Count) ->
- case queue:out(MsgBuf) of
- {{value, {on_disk, Len}}, MsgBuf1} ->
- case Len of
- Count ->
- MsgBuf1;
- _ when Len > Count ->
- queue:in_r({on_disk, Len-Count}, MsgBuf1)
- end;
- _ -> MsgBuf
- end.
-
-maybe_prefetch(State = #mqstate { prefetcher = undefined,
- mode = mixed,
- msg_buf = MsgBuf,
- queue = Q }) ->
- case queue:peek(MsgBuf) of
- {value, {on_disk, Count}} ->
- %% only prefetch for the next contiguous block on
- %% disk. Beyond there, we either hit the end of the queue,
- %% or the next msg is already in RAM, held by us, the
- %% mixed queue
- {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(Q, Count),
- State #mqstate { prefetcher = Prefetcher };
- _ -> State
- end;
-maybe_prefetch(State) ->
- State.
-
-on_disk(disk, _IsDurable, _IsPersistent) -> true;
-on_disk(mixed, true, true) -> true;
-on_disk(mixed, _IsDurable, _IsPersistent) -> false.
-
publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
#mqstate { queue = Q, mode = Mode, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
@@ -390,22 +202,6 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
end)
end.
-maybe_ack(_Q, true, true, AckTag) ->
- AckTag;
-maybe_ack(Q, _, _, AckTag) ->
- ok = rabbit_disk_queue:ack(Q, [AckTag]),
- not_on_disk.
-
-remove_diskless(MsgsWithAcks) ->
- lists:foldl(
- fun ({Msg, AckTag}, {AccAckTags, AccSize}) ->
- Msg1 = ensure_binary_properties(Msg),
- {case AckTag of
- not_on_disk -> AccAckTags;
- _ -> [AckTag | AccAckTags]
- end, size_of_message(Msg1) + AccSize}
- end, {[], 0}, MsgsWithAcks).
-
ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
{AckTags, ASize} = remove_diskless(MsgsWithAcks),
ok = case AckTags of
@@ -536,9 +332,223 @@ len(#mqstate { length = Length }) ->
is_empty(#mqstate { length = Length }) ->
0 == Length.
+%%----------------------------------------------------------------------------
+%% storage mode management
+%%----------------------------------------------------------------------------
+
+set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
+ {ok, State};
+set_storage_mode(disk, TxnMessages, State =
+ #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ is_durable = IsDurable, prefetcher = Prefetcher }) ->
+ State1 = State #mqstate { mode = disk },
+ MsgBuf1 =
+ case Prefetcher of
+ undefined -> MsgBuf;
+ _ ->
+ case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
+ empty -> MsgBuf;
+ {Fetched, Len} ->
+ MsgBuf2 = dec_queue_length(MsgBuf, Len),
+ queue:join(Fetched, MsgBuf2)
+ end
+ end,
+ %% We enqueue _everything_ here. This means that should a message
+ %% already be in the disk queue we must remove it and add it back
+ %% in. Fortunately, by using requeue, we avoid rewriting the
+ %% message on disk.
+ %% Note we also batch together messages on disk so that we minimise
+ %% the calls to requeue.
+ {ok, MsgBuf3} =
+ send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()),
+ %% tx_publish txn messages. Some of these will have been already
+ %% published if they really are durable and persistent which is
+ %% why we can't just use our own tx_publish/2 function (would end
+ %% up publishing twice, so refcount would go wrong in disk_queue).
+ lists:foreach(
+ fun (Msg = #basic_message { is_persistent = IsPersistent }) ->
+ ok = case IsDurable andalso IsPersistent of
+ true -> ok;
+ _ -> rabbit_disk_queue:tx_publish(Msg)
+ end
+ end, TxnMessages),
+ garbage_collect(),
+ {ok, State1 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }};
+set_storage_mode(mixed, TxnMessages, State =
+ #mqstate { mode = disk, is_durable = IsDurable }) ->
+ %% The queue has a token just saying how many msgs are on disk
+ %% (this is already built for us when in disk mode).
+ %% Don't actually do anything to the disk
+ %% Don't start prefetcher just yet because the queue maybe busy -
+ %% wait for hibernate timeout in the amqqueue_process.
+
+ %% Remove txn messages from disk which are neither persistent and
+ %% durable. This is necessary to avoid leaks. This is also pretty
+ %% much the inverse behaviour of our own tx_rollback/2 which is why
+ %% we're not using it.
+ Cancel =
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
+ case IsDurable andalso IsPersistent of
+ true -> Acc;
+ false -> [Msg #basic_message.guid | Acc]
+ end
+ end, [], TxnMessages),
+ ok = if Cancel == [] -> ok;
+ true -> rabbit_disk_queue:tx_rollback(Cancel)
+ end,
+ garbage_collect(),
+ {ok, State #mqstate { mode = mixed }}.
+
+send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
+ Commit, Ack, MsgBuf) ->
+ case queue:out(Queue) of
+ {empty, _Queue} ->
+ ok = flush_messages_to_disk_queue(Q, Commit, Ack),
+ {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []),
+ {ok, MsgBuf};
+ {{value, {Msg = #basic_message { is_persistent = IsPersistent },
+ IsDelivered}}, Queue1} ->
+ case IsDurable andalso IsPersistent of
+ true -> %% it's already in the Q
+ send_messages_to_disk(
+ IsDurable, Q, Queue1, PublishCount, RequeueCount + 1,
+ Commit, Ack, inc_queue_length(MsgBuf, 1));
+ false ->
+ republish_message_to_disk_queue(
+ IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
+ Ack, MsgBuf, Msg, IsDelivered)
+ end;
+ {{value, {Msg, IsDelivered, AckTag}}, Queue1} ->
+ %% these have come via the prefetcher, so are no longer in
+ %% the disk queue so they need to be republished
+ republish_message_to_disk_queue(
+ IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
+ [AckTag | Ack], MsgBuf, Msg, IsDelivered);
+ {{value, {on_disk, Count}}, Queue1} ->
+ send_messages_to_disk(IsDurable, Q, Queue1, PublishCount,
+ RequeueCount + Count, Commit, Ack,
+ inc_queue_length(MsgBuf, Count))
+ end.
+
+republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount,
+ Commit, Ack, MsgBuf, Msg =
+ #basic_message { guid = MsgId }, IsDelivered) ->
+ {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack),
+ ok = rabbit_disk_queue:tx_publish(Msg),
+ {PublishCount1, Commit2, Ack2} =
+ case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
+ true -> ok = flush_messages_to_disk_queue(
+ Q, [{MsgId, IsDelivered} | Commit1], Ack1),
+ {0, [], []};
+ false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1}
+ end,
+ send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0,
+ Commit2, Ack2, inc_queue_length(MsgBuf, 1)).
+
+flush_messages_to_disk_queue(_Q, [], []) ->
+ ok;
+flush_messages_to_disk_queue(Q, Commit, Ack) ->
+ rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack).
+
+flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) ->
+ {Commit, Ack};
+flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) ->
+ ok = flush_messages_to_disk_queue(Q, Commit, Ack),
+ ok = rabbit_disk_queue:filesync(),
+ ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount),
+ {[], []}.
+
estimate_queue_memory_and_reset_counters(State =
#mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) ->
{State #mqstate { memory_gain = 0, memory_loss = 0 }, 4 * Size, Gain, Loss}.
storage_mode(#mqstate { mode = Mode }) ->
Mode.
+
+%%----------------------------------------------------------------------------
+%% helpers
+%%----------------------------------------------------------------------------
+
+size_of_message(
+ #basic_message { content = #content { payload_fragments_rev = Payload,
+ properties_bin = PropsBin }})
+ when is_binary(PropsBin) ->
+ size(PropsBin) + lists:foldl(fun (Frag, SumAcc) ->
+ SumAcc + size(Frag)
+ end, 0, Payload).
+
+ensure_binary_properties(Msg = #basic_message { content = Content }) ->
+ Msg #basic_message {
+ content = rabbit_binary_generator:ensure_content_encoded(Content) }.
+
+gain_memory(Inc, State = #mqstate { memory_size = QSize,
+ memory_gain = Gain }) ->
+ State #mqstate { memory_size = QSize + Inc,
+ memory_gain = Gain + Inc }.
+
+lose_memory(Dec, State = #mqstate { memory_size = QSize,
+ memory_loss = Loss }) ->
+ State #mqstate { memory_size = QSize - Dec,
+ memory_loss = Loss + Dec }.
+
+inc_queue_length(MsgBuf, 0) ->
+ MsgBuf;
+inc_queue_length(MsgBuf, Count) ->
+ {NewCount, MsgBufTail} =
+ case queue:out_r(MsgBuf) of
+ {empty, MsgBuf1} -> {Count, MsgBuf1};
+ {{value, {on_disk, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1};
+ {{value, _}, _MsgBuf1} -> {Count, MsgBuf}
+ end,
+ queue:in({on_disk, NewCount}, MsgBufTail).
+
+dec_queue_length(MsgBuf, Count) ->
+ case queue:out(MsgBuf) of
+ {{value, {on_disk, Len}}, MsgBuf1} ->
+ case Len of
+ Count ->
+ MsgBuf1;
+ _ when Len > Count ->
+ queue:in_r({on_disk, Len-Count}, MsgBuf1)
+ end;
+ _ -> MsgBuf
+ end.
+
+maybe_prefetch(State = #mqstate { prefetcher = undefined,
+ mode = mixed,
+ msg_buf = MsgBuf,
+ queue = Q }) ->
+ case queue:peek(MsgBuf) of
+ {value, {on_disk, Count}} ->
+ %% only prefetch for the next contiguous block on
+ %% disk. Beyond there, we either hit the end of the queue,
+ %% or the next msg is already in RAM, held by us, the
+ %% mixed queue
+ {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(Q, Count),
+ State #mqstate { prefetcher = Prefetcher };
+ _ -> State
+ end;
+maybe_prefetch(State) ->
+ State.
+
+maybe_ack(_Q, true, true, AckTag) ->
+ AckTag;
+maybe_ack(Q, _, _, AckTag) ->
+ ok = rabbit_disk_queue:ack(Q, [AckTag]),
+ not_on_disk.
+
+remove_diskless(MsgsWithAcks) ->
+ lists:foldl(
+ fun ({Msg, AckTag}, {AccAckTags, AccSize}) ->
+ Msg1 = ensure_binary_properties(Msg),
+ {case AckTag of
+ not_on_disk -> AccAckTags;
+ _ -> [AckTag | AccAckTags]
+ end, size_of_message(Msg1) + AccSize}
+ end, {[], 0}, MsgsWithAcks).
+
+on_disk(disk, _IsDurable, _IsPersistent) -> true;
+on_disk(mixed, true, true) -> true;
+on_disk(mixed, _IsDurable, _IsPersistent) -> false.
+