summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2015-04-27 16:51:26 +0300
committerMichael Klishin <michael@novemberain.com>2015-04-27 16:51:26 +0300
commita5f3380b38be2dbcae0a8844c8086bd5d37a56e6 (patch)
treeb3a1561faaf5720996dd0288b7b0665d279d4bf6 /src
parentca3d9d2742f09bc8aee8f6a57007c6e49df921db (diff)
parent69e6610ff1be091b0571df7b04b8a59c5b6562e8 (diff)
downloadrabbitmq-server-git-a5f3380b38be2dbcae0a8844c8086bd5d37a56e6.tar.gz
Merge pull request #131 from rabbitmq/cosmetics-54
Cosmetics 54
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl83
1 files changed, 48 insertions, 35 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c2c6bdc4aa..ee84005798 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -877,7 +877,7 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
info(head_message_timestamp, #vqstate{
- q3 = Q3,
+ q3 = Q3,
q4 = Q4,
ram_pending_ack = RPA,
qi_pending_ack = QPA}) ->
@@ -916,43 +916,56 @@ invoke( _, _, State) -> State.
is_duplicate(_Msg, State) -> {false, State}.
-%% Get the Timestamp property of the first msg, if present. This is the one with the oldest timestamp
-%% among the heads of the pending acks and unread queues.
-%% We can't check disk_pending_acks as these are paged out - we assume some will soon be paged in
-%% rather than forcing it to happen.
-%% Pending ack msgs are included as they are regarded as unprocessed until acked, this also prevents
-%% the result apparently oscillating during repeated rejects.
-%% Q3 is only checked when Q4 is empty as any Q4 msg will be earlier.
-head_message_timestamp(Q3, Q4, RPA, QPA) ->
- HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
- HeadMsgStatus <-
- [ case ?QUEUE:is_empty(Q4) of
- false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus;
- true -> case ?QUEUE:is_empty(Q3) of
- false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus;
- true -> undefined
- end
- end,
- case gb_trees:is_empty(RPA) of
- false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus;
- true -> undefined
- end,
- case gb_trees:is_empty(QPA) of
- false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus;
- true -> undefined
- end],
- HeadMsgStatus /= undefined ],
- Timestamps =
- [ Timestamp ||
- Timestamp <-
- [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <-
- HeadMsgs ],
- Timestamp /= undefined ],
- case Timestamps == [] of
+%% Get the Timestamp property of the first msg, if present. This is
+%% the one with the oldest timestamp among the heads of the pending
+%% acks and unread queues. We can't check disk_pending_acks as these
+%% are paged out - we assume some will soon be paged in rather than
+%% forcing it to happen. Pending ack msgs are included as they are
+%% regarded as unprocessed until acked, this also prevents the result
+%% apparently oscillating during repeated rejects. Q3 is only checked
+%% when Q4 is empty as any Q4 msg will be earlier.
+head_message_timestamp(Q3, Q4, RPA, QPA) ->
+ HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
+ HeadMsgStatus <-
+ [ get_qs_head([Q4, Q3]),
+ get_pa_head(RPA),
+ get_pa_head(QPA) ],
+ HeadMsgStatus /= undefined ],
+
+ Timestamps =
+ [Timestamp || HeadMsg <- HeadMsgs,
+ Timestamp <- [rabbit_basic:extract_timestamp(
+ HeadMsg#basic_message.content)],
+ Timestamp /= undefined
+ ],
+
+ case Timestamps == [] of
true -> '';
- false -> lists:min(Timestamps)
+ false -> lists:min(Timestamps)
end.
+get_qs_head(Qs) ->
+ catch lists:foldl(
+ fun (Q, Acc) ->
+ case get_q_head(Q) of
+ undefined -> Acc;
+ Val -> throw(Val)
+ end
+ end, undefined, Qs).
+
+get_q_head(Q) ->
+ get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1).
+
+get_pa_head(PA) ->
+ get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1).
+
+get_collection_head(Col, IsEmpty, GetVal) ->
+ case IsEmpty(Col) of
+ false ->
+ {_, MsgStatus} = GetVal(Col),
+ MsgStatus;
+ true -> undefined
+ end.
%%----------------------------------------------------------------------------
%% Minor helpers