summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-06-13 12:02:58 +0100
committerRob Harrop <rob@rabbitmq.com>2011-06-13 12:02:58 +0100
commit9aae97b137bb3e5e7f98f0b59c41061577f39fdd (patch)
tree39f529e8ef42f5abfa71e87f0755514a931772d4 /src
parent76552918fd0e8636eee8087204e1a169f3a9c66e (diff)
downloadrabbitmq-server-git-9aae97b137bb3e5e7f98f0b59c41061577f39fdd.tar.gz
Updated specs and fixed error in ack that didn't account for different ack storage formats
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_variable_queue.erl4
3 files changed, 18 insertions, 8 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 3d7fb4895d..dd5f5b0438 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -97,14 +97,18 @@ behaviour_info(callbacks) ->
{drain_confirmed, 1},
%% Drop messages from the head of the queue while the supplied
- %% predicate returns true.
+ %% predicate returns true. A callback function is supplied
+ %% allowing callers access to messages that are about to be
+ %% dropped.
{dropwhile, 3},
%% Produce the next message.
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 msg_id per Ack, in the same order as Acks.
+ %% about. Must return 1 msg_id per Ack, in the same order as
+ %% Acks. A callback function is supplied allowing callers to
+ %% access messages that are being acked.
{ack, 3},
%% A publish, but in the context of a transaction.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 41053aeee9..a26c43efb8 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2176,7 +2176,7 @@ test_dropwhile(VQ0) ->
fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
end,
- fun(_Msg) -> ok end,
+ dummy_msg_lookup_fun(),
VQ1),
%% fetch five now
@@ -2191,6 +2191,9 @@ test_dropwhile(VQ0) ->
VQ4.
+dummy_msg_lookup_fun() ->
+ fun(_Fun, State) -> State end.
+
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -2214,7 +2217,8 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags,
+ dummy_msg_lookup_fun(), VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2224,7 +2228,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag],
+ dummy_msg_lookup_fun(), VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2258,7 +2263,8 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
+ dummy_msg_lookup_fun(), VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f75095346f..cefa06562e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -688,9 +688,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
ack(AckTags, Fun, State) ->
{MsgIds, State1} = ack(fun msg_store_remove/3,
- fun (MsgStatus = #msg_status {}, State0) ->
+ fun (MsgState, State0) ->
{_, State2} = Fun(read_msg_callback(),
- {MsgStatus, State0}),
+ {MsgState, State0}),
State2
end,
AckTags, State),