diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-08-26 18:54:44 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-08-26 18:54:44 +0100 |
| commit | 0c8dc46663459832ada1cf3bd4d2ea050cffca73 (patch) | |
| tree | 26b259e761e622b928f9a9fcc25db1d39d60e053 | |
| parent | 3b8c39f4114b1b28770a65886c182986c2d3dbdf (diff) | |
| download | rabbitmq-server-git-0c8dc46663459832ada1cf3bd4d2ea050cffca73.tar.gz | |
cosmetic: place functions in appropriate sections of the file
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 426 |
1 files changed, 218 insertions, 208 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 6fc5db619f..d131eea111 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -56,6 +56,8 @@ -define(TO_DISK_MAX_FLUSH_SIZE, 100000). +%%---------------------------------------------------------------------------- + -ifdef(use_specs). -type(mode() :: ( 'disk' | 'mixed' )). @@ -85,14 +87,11 @@ -spec(tx_rollback/2 :: ([message()], mqstate()) -> okmqs()). -spec(requeue/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()). -spec(purge/1 :: (mqstate()) -> okmqs()). - -spec(delete_queue/1 :: (mqstate()) -> {'ok', mqstate()}). - -spec(len/1 :: (mqstate()) -> non_neg_integer()). -spec(is_empty/1 :: (mqstate()) -> boolean()). -spec(set_storage_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()). - -spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) -> {mqstate(), non_neg_integer(), non_neg_integer(), non_neg_integer()}). @@ -100,6 +99,8 @@ -endif. +%%---------------------------------------------------------------------------- + init(Queue, IsDurable) -> Len = rabbit_disk_queue:len(Queue), MsgBuf = inc_queue_length(queue:new(), Len), @@ -113,195 +114,6 @@ init(Queue, IsDurable) -> memory_size = Size, memory_gain = undefined, memory_loss = undefined, prefetcher = undefined }}. -size_of_message( - #basic_message { content = #content { payload_fragments_rev = Payload, - properties_bin = PropsBin }}) - when is_binary(PropsBin) -> - size(PropsBin) + lists:foldl(fun (Frag, SumAcc) -> - SumAcc + size(Frag) - end, 0, Payload). - -ensure_binary_properties(Msg = #basic_message { content = Content }) -> - Msg #basic_message { - content = rabbit_binary_generator:ensure_content_encoded(Content) }. - -set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> - {ok, State}; -set_storage_mode(disk, TxnMessages, State = - #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - is_durable = IsDurable, prefetcher = Prefetcher }) -> - State1 = State #mqstate { mode = disk }, - MsgBuf1 = - case Prefetcher of - undefined -> MsgBuf; - _ -> - case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of - empty -> MsgBuf; - {Fetched, Len} -> - MsgBuf2 = dec_queue_length(MsgBuf, Len), - queue:join(Fetched, MsgBuf2) - end - end, - %% We enqueue _everything_ here. This means that should a message - %% already be in the disk queue we must remove it and add it back - %% in. Fortunately, by using requeue, we avoid rewriting the - %% message on disk. - %% Note we also batch together messages on disk so that we minimise - %% the calls to requeue. - {ok, MsgBuf3} = - send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()), - %% tx_publish txn messages. Some of these will have been already - %% published if they really are durable and persistent which is - %% why we can't just use our own tx_publish/2 function (would end - %% up publishing twice, so refcount would go wrong in disk_queue). - lists:foreach( - fun (Msg = #basic_message { is_persistent = IsPersistent }) -> - ok = case IsDurable andalso IsPersistent of - true -> ok; - _ -> rabbit_disk_queue:tx_publish(Msg) - end - end, TxnMessages), - garbage_collect(), - {ok, State1 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}; -set_storage_mode(mixed, TxnMessages, State = - #mqstate { mode = disk, is_durable = IsDurable }) -> - %% The queue has a token just saying how many msgs are on disk - %% (this is already built for us when in disk mode). - %% Don't actually do anything to the disk - %% Don't start prefetcher just yet because the queue maybe busy - - %% wait for hibernate timeout in the amqqueue_process. - - %% Remove txn messages from disk which are neither persistent and - %% durable. This is necessary to avoid leaks. This is also pretty - %% much the inverse behaviour of our own tx_rollback/2 which is why - %% we're not using it. - Cancel = - lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> - case IsDurable andalso IsPersistent of - true -> Acc; - false -> [Msg #basic_message.guid | Acc] - end - end, [], TxnMessages), - ok = if Cancel == [] -> ok; - true -> rabbit_disk_queue:tx_rollback(Cancel) - end, - garbage_collect(), - {ok, State #mqstate { mode = mixed }}. - -send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, - Commit, Ack, MsgBuf) -> - case queue:out(Queue) of - {empty, _Queue} -> - ok = flush_messages_to_disk_queue(Q, Commit, Ack), - {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []), - {ok, MsgBuf}; - {{value, {Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered}}, Queue1} -> - case IsDurable andalso IsPersistent of - true -> %% it's already in the Q - send_messages_to_disk( - IsDurable, Q, Queue1, PublishCount, RequeueCount + 1, - Commit, Ack, inc_queue_length(MsgBuf, 1)); - false -> - republish_message_to_disk_queue( - IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, - Ack, MsgBuf, Msg, IsDelivered) - end; - {{value, {Msg, IsDelivered, AckTag}}, Queue1} -> - %% these have come via the prefetcher, so are no longer in - %% the disk queue so they need to be republished - republish_message_to_disk_queue( - IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, - [AckTag | Ack], MsgBuf, Msg, IsDelivered); - {{value, {on_disk, Count}}, Queue1} -> - send_messages_to_disk(IsDurable, Q, Queue1, PublishCount, - RequeueCount + Count, Commit, Ack, - inc_queue_length(MsgBuf, Count)) - end. - -republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount, - Commit, Ack, MsgBuf, Msg = - #basic_message { guid = MsgId }, IsDelivered) -> - {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack), - ok = rabbit_disk_queue:tx_publish(Msg), - {PublishCount1, Commit2, Ack2} = - case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of - true -> ok = flush_messages_to_disk_queue( - Q, [{MsgId, IsDelivered} | Commit1], Ack1), - {0, [], []}; - false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1} - end, - send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0, - Commit2, Ack2, inc_queue_length(MsgBuf, 1)). - -flush_messages_to_disk_queue(_Q, [], []) -> - ok; -flush_messages_to_disk_queue(Q, Commit, Ack) -> - rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack). - -flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) -> - {Commit, Ack}; -flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) -> - ok = flush_messages_to_disk_queue(Q, Commit, Ack), - ok = rabbit_disk_queue:filesync(), - ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), - {[], []}. - -gain_memory(Inc, State = #mqstate { memory_size = QSize, - memory_gain = Gain }) -> - State #mqstate { memory_size = QSize + Inc, - memory_gain = Gain + Inc }. - -lose_memory(Dec, State = #mqstate { memory_size = QSize, - memory_loss = Loss }) -> - State #mqstate { memory_size = QSize - Dec, - memory_loss = Loss + Dec }. - -inc_queue_length(MsgBuf, 0) -> - MsgBuf; -inc_queue_length(MsgBuf, Count) -> - {NewCount, MsgBufTail} = - case queue:out_r(MsgBuf) of - {empty, MsgBuf1} -> {Count, MsgBuf1}; - {{value, {on_disk, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1}; - {{value, _}, _MsgBuf1} -> {Count, MsgBuf} - end, - queue:in({on_disk, NewCount}, MsgBufTail). - -dec_queue_length(MsgBuf, Count) -> - case queue:out(MsgBuf) of - {{value, {on_disk, Len}}, MsgBuf1} -> - case Len of - Count -> - MsgBuf1; - _ when Len > Count -> - queue:in_r({on_disk, Len-Count}, MsgBuf1) - end; - _ -> MsgBuf - end. - -maybe_prefetch(State = #mqstate { prefetcher = undefined, - mode = mixed, - msg_buf = MsgBuf, - queue = Q }) -> - case queue:peek(MsgBuf) of - {value, {on_disk, Count}} -> - %% only prefetch for the next contiguous block on - %% disk. Beyond there, we either hit the end of the queue, - %% or the next msg is already in RAM, held by us, the - %% mixed queue - {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(Q, Count), - State #mqstate { prefetcher = Prefetcher }; - _ -> State - end; -maybe_prefetch(State) -> - State. - -on_disk(disk, _IsDurable, _IsPersistent) -> true; -on_disk(mixed, true, true) -> true; -on_disk(mixed, _IsDurable, _IsPersistent) -> false. - publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = Mode, is_durable = IsDurable, msg_buf = MsgBuf, length = Length }) -> @@ -390,22 +202,6 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, end) end. -maybe_ack(_Q, true, true, AckTag) -> - AckTag; -maybe_ack(Q, _, _, AckTag) -> - ok = rabbit_disk_queue:ack(Q, [AckTag]), - not_on_disk. - -remove_diskless(MsgsWithAcks) -> - lists:foldl( - fun ({Msg, AckTag}, {AccAckTags, AccSize}) -> - Msg1 = ensure_binary_properties(Msg), - {case AckTag of - not_on_disk -> AccAckTags; - _ -> [AckTag | AccAckTags] - end, size_of_message(Msg1) + AccSize} - end, {[], 0}, MsgsWithAcks). - ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> {AckTags, ASize} = remove_diskless(MsgsWithAcks), ok = case AckTags of @@ -536,9 +332,223 @@ len(#mqstate { length = Length }) -> is_empty(#mqstate { length = Length }) -> 0 == Length. +%%---------------------------------------------------------------------------- +%% storage mode management +%%---------------------------------------------------------------------------- + +set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> + {ok, State}; +set_storage_mode(disk, TxnMessages, State = + #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + is_durable = IsDurable, prefetcher = Prefetcher }) -> + State1 = State #mqstate { mode = disk }, + MsgBuf1 = + case Prefetcher of + undefined -> MsgBuf; + _ -> + case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of + empty -> MsgBuf; + {Fetched, Len} -> + MsgBuf2 = dec_queue_length(MsgBuf, Len), + queue:join(Fetched, MsgBuf2) + end + end, + %% We enqueue _everything_ here. This means that should a message + %% already be in the disk queue we must remove it and add it back + %% in. Fortunately, by using requeue, we avoid rewriting the + %% message on disk. + %% Note we also batch together messages on disk so that we minimise + %% the calls to requeue. + {ok, MsgBuf3} = + send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()), + %% tx_publish txn messages. Some of these will have been already + %% published if they really are durable and persistent which is + %% why we can't just use our own tx_publish/2 function (would end + %% up publishing twice, so refcount would go wrong in disk_queue). + lists:foreach( + fun (Msg = #basic_message { is_persistent = IsPersistent }) -> + ok = case IsDurable andalso IsPersistent of + true -> ok; + _ -> rabbit_disk_queue:tx_publish(Msg) + end + end, TxnMessages), + garbage_collect(), + {ok, State1 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}; +set_storage_mode(mixed, TxnMessages, State = + #mqstate { mode = disk, is_durable = IsDurable }) -> + %% The queue has a token just saying how many msgs are on disk + %% (this is already built for us when in disk mode). + %% Don't actually do anything to the disk + %% Don't start prefetcher just yet because the queue maybe busy - + %% wait for hibernate timeout in the amqqueue_process. + + %% Remove txn messages from disk which are neither persistent and + %% durable. This is necessary to avoid leaks. This is also pretty + %% much the inverse behaviour of our own tx_rollback/2 which is why + %% we're not using it. + Cancel = + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> + case IsDurable andalso IsPersistent of + true -> Acc; + false -> [Msg #basic_message.guid | Acc] + end + end, [], TxnMessages), + ok = if Cancel == [] -> ok; + true -> rabbit_disk_queue:tx_rollback(Cancel) + end, + garbage_collect(), + {ok, State #mqstate { mode = mixed }}. + +send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, + Commit, Ack, MsgBuf) -> + case queue:out(Queue) of + {empty, _Queue} -> + ok = flush_messages_to_disk_queue(Q, Commit, Ack), + {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []), + {ok, MsgBuf}; + {{value, {Msg = #basic_message { is_persistent = IsPersistent }, + IsDelivered}}, Queue1} -> + case IsDurable andalso IsPersistent of + true -> %% it's already in the Q + send_messages_to_disk( + IsDurable, Q, Queue1, PublishCount, RequeueCount + 1, + Commit, Ack, inc_queue_length(MsgBuf, 1)); + false -> + republish_message_to_disk_queue( + IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, + Ack, MsgBuf, Msg, IsDelivered) + end; + {{value, {Msg, IsDelivered, AckTag}}, Queue1} -> + %% these have come via the prefetcher, so are no longer in + %% the disk queue so they need to be republished + republish_message_to_disk_queue( + IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, + [AckTag | Ack], MsgBuf, Msg, IsDelivered); + {{value, {on_disk, Count}}, Queue1} -> + send_messages_to_disk(IsDurable, Q, Queue1, PublishCount, + RequeueCount + Count, Commit, Ack, + inc_queue_length(MsgBuf, Count)) + end. + +republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount, + Commit, Ack, MsgBuf, Msg = + #basic_message { guid = MsgId }, IsDelivered) -> + {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack), + ok = rabbit_disk_queue:tx_publish(Msg), + {PublishCount1, Commit2, Ack2} = + case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of + true -> ok = flush_messages_to_disk_queue( + Q, [{MsgId, IsDelivered} | Commit1], Ack1), + {0, [], []}; + false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1} + end, + send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0, + Commit2, Ack2, inc_queue_length(MsgBuf, 1)). + +flush_messages_to_disk_queue(_Q, [], []) -> + ok; +flush_messages_to_disk_queue(Q, Commit, Ack) -> + rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack). + +flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) -> + {Commit, Ack}; +flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) -> + ok = flush_messages_to_disk_queue(Q, Commit, Ack), + ok = rabbit_disk_queue:filesync(), + ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), + {[], []}. + estimate_queue_memory_and_reset_counters(State = #mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) -> {State #mqstate { memory_gain = 0, memory_loss = 0 }, 4 * Size, Gain, Loss}. storage_mode(#mqstate { mode = Mode }) -> Mode. + +%%---------------------------------------------------------------------------- +%% helpers +%%---------------------------------------------------------------------------- + +size_of_message( + #basic_message { content = #content { payload_fragments_rev = Payload, + properties_bin = PropsBin }}) + when is_binary(PropsBin) -> + size(PropsBin) + lists:foldl(fun (Frag, SumAcc) -> + SumAcc + size(Frag) + end, 0, Payload). + +ensure_binary_properties(Msg = #basic_message { content = Content }) -> + Msg #basic_message { + content = rabbit_binary_generator:ensure_content_encoded(Content) }. + +gain_memory(Inc, State = #mqstate { memory_size = QSize, + memory_gain = Gain }) -> + State #mqstate { memory_size = QSize + Inc, + memory_gain = Gain + Inc }. + +lose_memory(Dec, State = #mqstate { memory_size = QSize, + memory_loss = Loss }) -> + State #mqstate { memory_size = QSize - Dec, + memory_loss = Loss + Dec }. + +inc_queue_length(MsgBuf, 0) -> + MsgBuf; +inc_queue_length(MsgBuf, Count) -> + {NewCount, MsgBufTail} = + case queue:out_r(MsgBuf) of + {empty, MsgBuf1} -> {Count, MsgBuf1}; + {{value, {on_disk, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1}; + {{value, _}, _MsgBuf1} -> {Count, MsgBuf} + end, + queue:in({on_disk, NewCount}, MsgBufTail). + +dec_queue_length(MsgBuf, Count) -> + case queue:out(MsgBuf) of + {{value, {on_disk, Len}}, MsgBuf1} -> + case Len of + Count -> + MsgBuf1; + _ when Len > Count -> + queue:in_r({on_disk, Len-Count}, MsgBuf1) + end; + _ -> MsgBuf + end. + +maybe_prefetch(State = #mqstate { prefetcher = undefined, + mode = mixed, + msg_buf = MsgBuf, + queue = Q }) -> + case queue:peek(MsgBuf) of + {value, {on_disk, Count}} -> + %% only prefetch for the next contiguous block on + %% disk. Beyond there, we either hit the end of the queue, + %% or the next msg is already in RAM, held by us, the + %% mixed queue + {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(Q, Count), + State #mqstate { prefetcher = Prefetcher }; + _ -> State + end; +maybe_prefetch(State) -> + State. + +maybe_ack(_Q, true, true, AckTag) -> + AckTag; +maybe_ack(Q, _, _, AckTag) -> + ok = rabbit_disk_queue:ack(Q, [AckTag]), + not_on_disk. + +remove_diskless(MsgsWithAcks) -> + lists:foldl( + fun ({Msg, AckTag}, {AccAckTags, AccSize}) -> + Msg1 = ensure_binary_properties(Msg), + {case AckTag of + not_on_disk -> AccAckTags; + _ -> [AckTag | AccAckTags] + end, size_of_message(Msg1) + AccSize} + end, {[], 0}, MsgsWithAcks). + +on_disk(disk, _IsDurable, _IsPersistent) -> true; +on_disk(mixed, true, true) -> true; +on_disk(mixed, _IsDurable, _IsPersistent) -> false. + |
