diff options
| -rw-r--r-- | src/lqueue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 18 |
2 files changed, 18 insertions, 10 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl index 4a8164f6db..f07eec1873 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -17,7 +17,7 @@ -module(lqueue). -export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2, - foldl/3, foldr/3, from_list/1, to_list/1]). + foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]). -define(QUEUE, queue). @@ -42,6 +42,8 @@ -spec(foldr/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B). -spec(from_list/1 :: ([value()]) -> ?MODULE()). -spec(to_list/1 :: (?MODULE()) -> [value()]). +-spec(peek(?MODULE) -> 'empty' | {'value',value()}). +-spec(peek_r(?MODULE) -> 'empty' | {'value',value()}). -endif. @@ -87,3 +89,9 @@ foldr(Fun, Init, Q) -> len({L, _Q}) -> L. + +peek({0, _Q}) -> empty; +peek({_L, Q}) -> ?QUEUE:peek(Q). + +peek_r({0, _Q}) -> empty; +peek_r({_L, Q}) -> ?QUEUE:peek_r(Q). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 60c3dfd2d3..775a16648a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -724,17 +724,17 @@ needs_timeout(State) -> end. null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) -> - {null_gamma_delta_msg(?QUEUE:out(Q2), ?QUEUE:out(Q2), + {null_gamma_delta_msg(?QUEUE:peek(Q2), ?QUEUE:peek(Q2), fun (SeqId) -> SeqId end) orelse - null_gamma_delta_msg(?QUEUE:out_r(Q3), ?QUEUE:out(Q3), + null_gamma_delta_msg(?QUEUE:peek_r(Q3), ?QUEUE:peek(Q3), fun rabbit_queue_index:next_segment_boundary/1), State}. -null_gamma_delta_msg({{value, #msg_status { seq_id = SeqId1, - index_on_disk = true }}, _Q}, - {{value, #msg_status { seq_id = SeqId2 }}, _Q2}, +null_gamma_delta_msg({value, #msg_status { seq_id = SeqId1, + index_on_disk = true }}, + {value, #msg_status { seq_id = SeqId2 }}, LimitFun) -> - SeqId1 >= LimitFun(SeqId2); + LimitFun =:= undefined orelse SeqId1 >= LimitFun(SeqId2); null_gamma_delta_msg(_, _, _) -> false. @@ -1376,9 +1376,9 @@ msg_from_pending_ack(SeqId, MsgPropsFun, State) -> needs_confirming = false } }, State1}. beta_limit(Q) -> - case ?QUEUE:out(Q) of - {{value, #msg_status { seq_id = SeqId }}, _Q} -> SeqId; - {empty, _Q} -> undefined + case ?QUEUE:peek(Q) of + {value, #msg_status { seq_id = SeqId }} -> SeqId; + empty -> undefined end. delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; |
