diff options
| author | Rob Harrop <rob@rabbitmq.com> | 2011-06-13 12:02:58 +0100 |
|---|---|---|
| committer | Rob Harrop <rob@rabbitmq.com> | 2011-06-13 12:02:58 +0100 |
| commit | 9aae97b137bb3e5e7f98f0b59c41061577f39fdd (patch) | |
| tree | 39f529e8ef42f5abfa71e87f0755514a931772d4 /src | |
| parent | 76552918fd0e8636eee8087204e1a169f3a9c66e (diff) | |
| download | rabbitmq-server-git-9aae97b137bb3e5e7f98f0b59c41061577f39fdd.tar.gz | |
Updated specs and fixed error in ack that didn't account for different ack storage formats
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
3 files changed, 18 insertions, 8 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 3d7fb4895d..dd5f5b0438 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -97,14 +97,18 @@ behaviour_info(callbacks) -> {drain_confirmed, 1}, %% Drop messages from the head of the queue while the supplied - %% predicate returns true. + %% predicate returns true. A callback function is supplied + %% allowing callers access to messages that are about to be + %% dropped. {dropwhile, 3}, %% Produce the next message. {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. Must return 1 msg_id per Ack, in the same order as Acks. + %% about. Must return 1 msg_id per Ack, in the same order as + %% Acks. A callback function is supplied allowing callers to + %% access messages that are being acked. {ack, 3}, %% A publish, but in the context of a transaction. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 41053aeee9..a26c43efb8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2176,7 +2176,7 @@ test_dropwhile(VQ0) -> fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 end, - fun(_Msg) -> ok end, + dummy_msg_lookup_fun(), VQ1), %% fetch five now @@ -2191,6 +2191,9 @@ test_dropwhile(VQ0) -> VQ4. +dummy_msg_lookup_fun() -> + fun(_Fun, State) -> State end. + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -2214,7 +2217,8 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, + dummy_msg_lookup_fun(), VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2224,7 +2228,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], + dummy_msg_lookup_fun(), VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2258,7 +2263,8 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, + dummy_msg_lookup_fun(), VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f75095346f..cefa06562e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -688,9 +688,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { ack(AckTags, Fun, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, - fun (MsgStatus = #msg_status {}, State0) -> + fun (MsgState, State0) -> {_, State2} = Fun(read_msg_callback(), - {MsgStatus, State0}), + {MsgState, State0}), State2 end, AckTags, State), |
