summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl15
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_variable_queue.erl4
4 files changed, 30 insertions, 11 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 295d90394f..cc7cca04e6 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -28,6 +28,13 @@
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')).
+-type(msg_lookup_result() :: {rabbit_types:basic_message(), {any(), state()}}).
+
+-type(msg_lookup_fun() :: fun((any(), state()) -> msg_lookup_result())).
+
+-type(msg_lookup_callback() ::
+ fun((msg_lookup_fun(), {any(), state()}) -> {any(), state()})).
+
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
@@ -45,12 +52,14 @@
rabbit_types:message_properties(), pid(), state())
-> {undefined, state()}).
-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
--spec(dropwhile/2 ::
- (fun ((rabbit_types:message_properties()) -> boolean()), state())
+-spec(dropwhile/3 ::
+ (fun ((rabbit_types:message_properties()) -> boolean()),
+ msg_lookup_callback(), state())
-> state()).
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(ack/3 :: ([ack()], msg_lookup_callback(), state()) ->
+ {[rabbit_guid:guid()], state()}).
-spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state()) ->
state()).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 3d7fb4895d..dd5f5b0438 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -97,14 +97,18 @@ behaviour_info(callbacks) ->
{drain_confirmed, 1},
%% Drop messages from the head of the queue while the supplied
- %% predicate returns true.
+ %% predicate returns true. A callback function is supplied
+ %% allowing callers access to messages that are about to be
+ %% dropped.
{dropwhile, 3},
%% Produce the next message.
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 msg_id per Ack, in the same order as Acks.
+ %% about. Must return 1 msg_id per Ack, in the same order as
+ %% Acks. A callback function is supplied allowing callers to
+ %% access messages that are being acked.
{ack, 3},
%% A publish, but in the context of a transaction.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 41053aeee9..a26c43efb8 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2176,7 +2176,7 @@ test_dropwhile(VQ0) ->
fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
end,
- fun(_Msg) -> ok end,
+ dummy_msg_lookup_fun(),
VQ1),
%% fetch five now
@@ -2191,6 +2191,9 @@ test_dropwhile(VQ0) ->
VQ4.
+dummy_msg_lookup_fun() ->
+ fun(_Fun, State) -> State end.
+
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -2214,7 +2217,8 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags,
+ dummy_msg_lookup_fun(), VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2224,7 +2228,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag],
+ dummy_msg_lookup_fun(), VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2258,7 +2263,8 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
+ dummy_msg_lookup_fun(), VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f75095346f..cefa06562e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -688,9 +688,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
ack(AckTags, Fun, State) ->
{MsgIds, State1} = ack(fun msg_store_remove/3,
- fun (MsgStatus = #msg_status {}, State0) ->
+ fun (MsgState, State0) ->
{_, State2} = Fun(read_msg_callback(),
- {MsgStatus, State0}),
+ {MsgState, State0}),
State2
end,
AckTags, State),