diff options
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 5 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 5 |
3 files changed, 15 insertions, 3 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 3e78d571bf..6067ac6262 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -34,6 +34,8 @@ ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). +-type(peek_result() :: ('empty'|{rabbit_types:basic_message(), + rabbit_types:msg_properties()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). @@ -55,7 +57,8 @@ rabbit_types:msg_properties(), state()) -> {ack(), state()}). -spec(dropwhile/2 :: (fun ((rabbit_types:basic_message(), rabbit_types:msg_properties()) - -> boolean()), state()) -> state()). + -> boolean()), state()) -> state()). +-spec(peek/1 :: (state()) -> {peek_result(), state()}). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 3b449efa55..59c678c92a 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_invariable_queue). -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, + publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, peek/1, dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -118,6 +118,12 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. +peek(State = #iv_state { len = 0 }) -> + {empty, State}; +peek(State = #iv_state { queue = Q}) -> + {value, {Msg, MsgProps, _IsDelivered}} = queue:peek(Q), + {{Msg, MsgProps}, State}. + dropwhile(_Pred, State = #iv_state { len = 0 }) -> State; dropwhile(Pred, State = #iv_state { queue = Q }) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bc99af5544..37b0916b72 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1899,7 +1899,10 @@ test_peek(VQ0) -> %% should be able to fetch still {{_Msg, _, _, _}, VQ3} = rabbit_variable_queue:fetch(false, VQ2), - VQ3. + %% should be empty now + {empty, VQ4} = rabbit_variable_queue:peek(VQ3), + + VQ4. test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), |
