summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl5
-rw-r--r--src/rabbit_invariable_queue.erl8
-rw-r--r--src/rabbit_tests.erl5
3 files changed, 15 insertions, 3 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 3e78d571bf..6067ac6262 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -34,6 +34,8 @@
('empty'|{rabbit_types:basic_message(),
boolean(), ack(),
non_neg_integer()})).
+-type(peek_result() :: ('empty'|{rabbit_types:basic_message(),
+ rabbit_types:msg_properties()})).
-type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
@@ -55,7 +57,8 @@
rabbit_types:msg_properties(), state()) -> {ack(), state()}).
-spec(dropwhile/2 ::
(fun ((rabbit_types:basic_message(), rabbit_types:msg_properties())
- -> boolean()), state()) -> state()).
+ -> boolean()), state()) -> state()).
+-spec(peek/1 :: (state()) -> {peek_result(), state()}).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/4 ::
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 3b449efa55..59c678c92a 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -32,7 +32,7 @@
-module(rabbit_invariable_queue).
-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
- publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
+ publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, peek/1,
dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
@@ -118,6 +118,12 @@ publish_delivered(true, Msg = #basic_message { guid = Guid },
ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
+peek(State = #iv_state { len = 0 }) ->
+ {empty, State};
+peek(State = #iv_state { queue = Q}) ->
+ {value, {Msg, MsgProps, _IsDelivered}} = queue:peek(Q),
+ {{Msg, MsgProps}, State}.
+
dropwhile(_Pred, State = #iv_state { len = 0 }) ->
State;
dropwhile(Pred, State = #iv_state { queue = Q }) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bc99af5544..37b0916b72 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1899,7 +1899,10 @@ test_peek(VQ0) ->
%% should be able to fetch still
{{_Msg, _, _, _}, VQ3} = rabbit_variable_queue:fetch(false, VQ2),
- VQ3.
+ %% should be empty now
+ {empty, VQ4} = rabbit_variable_queue:peek(VQ3),
+
+ VQ4.
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),