summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lqueue.erl10
-rw-r--r--src/rabbit_variable_queue.erl18
2 files changed, 18 insertions, 10 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 4a8164f6db..f07eec1873 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -17,7 +17,7 @@
-module(lqueue).
-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2,
- foldl/3, foldr/3, from_list/1, to_list/1]).
+ foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]).
-define(QUEUE, queue).
@@ -42,6 +42,8 @@
-spec(foldr/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B).
-spec(from_list/1 :: ([value()]) -> ?MODULE()).
-spec(to_list/1 :: (?MODULE()) -> [value()]).
+-spec(peek(?MODULE) -> 'empty' | {'value',value()}).
+-spec(peek_r(?MODULE) -> 'empty' | {'value',value()}).
-endif.
@@ -87,3 +89,9 @@ foldr(Fun, Init, Q) ->
len({L, _Q}) ->
L.
+
+peek({0, _Q}) -> empty;
+peek({_L, Q}) -> ?QUEUE:peek(Q).
+
+peek_r({0, _Q}) -> empty;
+peek_r({_L, Q}) -> ?QUEUE:peek_r(Q).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 60c3dfd2d3..775a16648a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -724,17 +724,17 @@ needs_timeout(State) ->
end.
null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) ->
- {null_gamma_delta_msg(?QUEUE:out(Q2), ?QUEUE:out(Q2),
+ {null_gamma_delta_msg(?QUEUE:peek(Q2), ?QUEUE:peek(Q2),
fun (SeqId) -> SeqId end) orelse
- null_gamma_delta_msg(?QUEUE:out_r(Q3), ?QUEUE:out(Q3),
+ null_gamma_delta_msg(?QUEUE:peek_r(Q3), ?QUEUE:peek(Q3),
fun rabbit_queue_index:next_segment_boundary/1),
State}.
-null_gamma_delta_msg({{value, #msg_status { seq_id = SeqId1,
- index_on_disk = true }}, _Q},
- {{value, #msg_status { seq_id = SeqId2 }}, _Q2},
+null_gamma_delta_msg({value, #msg_status { seq_id = SeqId1,
+ index_on_disk = true }},
+ {value, #msg_status { seq_id = SeqId2 }},
LimitFun) ->
- SeqId1 >= LimitFun(SeqId2);
+ LimitFun =:= undefined orelse SeqId1 >= LimitFun(SeqId2);
null_gamma_delta_msg(_, _, _) ->
false.
@@ -1376,9 +1376,9 @@ msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
needs_confirming = false } }, State1}.
beta_limit(Q) ->
- case ?QUEUE:out(Q) of
- {{value, #msg_status { seq_id = SeqId }}, _Q} -> SeqId;
- {empty, _Q} -> undefined
+ case ?QUEUE:peek(Q) of
+ {value, #msg_status { seq_id = SeqId }} -> SeqId;
+ empty -> undefined
end.
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;