summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-17 12:29:11 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-17 12:29:11 +0100
commit246260869c0703a6b1a98f052eeb252b9885cd27 (patch)
tree9a8a1bdd1cfabe933ced3686420b1e409c25fc6e /src
parent2bf4f4559bca46f10df81cc0a06503e4b80ae203 (diff)
downloadrabbitmq-server-git-246260869c0703a6b1a98f052eeb252b9885cd27.tar.gz
make queue_index:deliver take a list of SeqIds
...rather than just a single SeqId. This brings it in line with 'ack', simplifies the code, and is useful in variable_queue.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl30
-rw-r--r--src/rabbit_tests.erl33
-rw-r--r--src/rabbit_variable_queue.erl38
3 files changed, 47 insertions, 54 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index d062c4fd0a..5123ca21b7 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -191,7 +191,7 @@
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/4 :: (guid(), seq_id(), boolean(), qistate()) -> qistate()).
--spec(deliver/2 :: (seq_id(), qistate()) -> qistate()).
+-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
@@ -244,22 +244,11 @@ publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)).
-deliver(SeqId, State) ->
- {JournalHdl, State1} = get_journal_handle(State),
- ok = file_handle_cache:append(
- JournalHdl, <<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>),
- maybe_flush_journal(add_to_journal(SeqId, del, State1)).
+deliver(SeqIds, State) ->
+ deliver_or_ack(del, ?DEL_JPREFIX, SeqIds, State).
-ack([], State) ->
- State;
ack(SeqIds, State) ->
- {JournalHdl, State1} = get_journal_handle(State),
- ok = file_handle_cache:append(
- JournalHdl, [<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>> ||
- SeqId <- SeqIds]),
- maybe_flush_journal(lists:foldl(fun (SeqId, StateN) ->
- add_to_journal(SeqId, ack, StateN)
- end, State1, SeqIds)).
+ deliver_or_ack(ack, ?ACK_JPREFIX, SeqIds, State).
sync([], State) ->
State;
@@ -659,6 +648,17 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
_ErrOrEoF -> State
end.
+deliver_or_ack(_Kind, _JPrefix, [], State) ->
+ State;
+deliver_or_ack(Kind, JPrefix, SeqIds, State) ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ ok = file_handle_cache:append(
+ JournalHdl,
+ [<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]),
+ maybe_flush_journal(lists:foldl(fun (SeqId, StateN) ->
+ add_to_journal(SeqId, Kind, StateN)
+ end, State1, SeqIds)).
+
%%----------------------------------------------------------------------------
%% segment manipulation
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f3df66ca58..8e99780d0f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1550,13 +1550,6 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
ok = rabbit_msg_store:client_terminate(MSCStateEnd),
{A, B}.
-queue_index_deliver(SeqIds, Qi) ->
- lists:foldl(fun (SeqId, QiN) -> rabbit_queue_index:deliver(SeqId, QiN) end,
- Qi, SeqIds).
-
-queue_index_flush(Qi) ->
- rabbit_queue_index:flush(Qi).
-
verify_read_with_published(_Delivered, _Persistent, [], _) ->
ok;
verify_read_with_published(Delivered, Persistent,
@@ -1605,12 +1598,12 @@ test_queue_index() ->
LenB = length(SeqIdsB),
{LenB, _Terms2, Qi12} = test_queue_init(),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
- Qi14 = queue_index_deliver(SeqIdsB, Qi13),
+ Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
{ReadC, SegmentSize, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsGuidsB)),
Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15),
- Qi17 = queue_index_flush(Qi16),
+ Qi17 = rabbit_queue_index:flush(Qi16),
%% Everything will have gone now because #pubs == #acks
{0, 0, Qi18} = rabbit_queue_index:bounds(Qi17),
_Qi19 = rabbit_queue_index:terminate([], Qi18),
@@ -1628,9 +1621,9 @@ test_queue_index() ->
SeqIdsC = lists:seq(0,trunc(SegmentSize/2)),
{0, _Terms4, Qi22} = test_queue_init(),
{Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22),
- Qi24 = queue_index_deliver(SeqIdsC, Qi23),
+ Qi24 = rabbit_queue_index:deliver(SeqIdsC, Qi23),
Qi25 = rabbit_queue_index:ack(SeqIdsC, Qi24),
- Qi26 = queue_index_flush(Qi25),
+ Qi26 = rabbit_queue_index:flush(Qi25),
{Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26),
_Qi28 = rabbit_queue_index:delete_and_terminate(Qi27),
ok = stop_msg_store(),
@@ -1639,10 +1632,10 @@ test_queue_index() ->
%% b) partial pub+del, then move to new segment, then ack all in old segment
{0, _Terms5, Qi29} = test_queue_init(),
{Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29),
- Qi31 = queue_index_deliver(SeqIdsC, Qi30),
+ Qi31 = rabbit_queue_index:deliver(SeqIdsC, Qi30),
{Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31),
Qi33 = rabbit_queue_index:ack(SeqIdsC, Qi32),
- Qi34 = queue_index_flush(Qi33),
+ Qi34 = rabbit_queue_index:flush(Qi33),
_Qi35 = rabbit_queue_index:delete_and_terminate(Qi34),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1651,9 +1644,9 @@ test_queue_index() ->
SeqIdsD = lists:seq(0,SegmentSize*4),
{0, _Terms6, Qi36} = test_queue_init(),
{Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36),
- Qi38 = queue_index_deliver(SeqIdsD, Qi37),
+ Qi38 = rabbit_queue_index:deliver(SeqIdsD, Qi37),
Qi39 = rabbit_queue_index:ack(SeqIdsD, Qi38),
- Qi40 = queue_index_flush(Qi39),
+ Qi40 = rabbit_queue_index:flush(Qi39),
_Qi41 = rabbit_queue_index:delete_and_terminate(Qi40),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1663,11 +1656,11 @@ test_queue_index() ->
%% possibilities in combining the segment with the journal.
{0, _Terms7, Qi42} = test_queue_init(),
{Qi43, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7], false, Qi42),
- Qi44 = queue_index_deliver([0,1,4], Qi43),
+ Qi44 = rabbit_queue_index:deliver([0,1,4], Qi43),
Qi45 = rabbit_queue_index:ack([0], Qi44),
- Qi46 = queue_index_flush(Qi45),
+ Qi46 = rabbit_queue_index:flush(Qi45),
{Qi47, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi46),
- Qi48 = queue_index_deliver([2,3,5,6], Qi47),
+ Qi48 = rabbit_queue_index:deliver([2,3,5,6], Qi47),
Qi49 = rabbit_queue_index:ack([1,2,3], Qi48),
{[], 4, Qi50} = rabbit_queue_index:read(0, 4, Qi49),
{ReadD, 7, Qi51} = rabbit_queue_index:read(4, 7, Qi50),
@@ -1682,14 +1675,14 @@ test_queue_index() ->
%% exercise journal_minus_segment, not segment_plus_journal.
{0, _Terms8, Qi54} = test_queue_init(),
{Qi55, _SeqIdsGuidsE} = queue_index_publish([0,1,2,4,5,7], true, Qi54),
- Qi56 = queue_index_deliver([0,1,4], Qi55),
+ Qi56 = rabbit_queue_index:deliver([0,1,4], Qi55),
Qi57 = rabbit_queue_index:ack([0], Qi56),
_Qi58 = rabbit_queue_index:terminate([], Qi57),
ok = stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
{5, _Terms9, Qi59} = test_queue_init(),
{Qi60, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi59),
- Qi61 = queue_index_deliver([2,3,5,6], Qi60),
+ Qi61 = rabbit_queue_index:deliver([2,3,5,6], Qi60),
Qi62 = rabbit_queue_index:ack([1,2,3], Qi61),
_Qi63 = rabbit_queue_index:terminate([], Qi62),
ok = stop_msg_store(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 61437229ae..72add2af7c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -718,7 +718,7 @@ read_from_msg_store(MSCState, IsPersistent, Guid) ->
maybe_write_delivered(false, _SeqId, IndexState) ->
IndexState;
maybe_write_delivered(true, SeqId, IndexState) ->
- rabbit_queue_index:deliver(SeqId, IndexState).
+ rabbit_queue_index:deliver([SeqId], IndexState).
accumulate_ack(SeqId, IsPersistent, Guid, {SeqIdsAcc, Dict}) ->
{case IsPersistent of
@@ -987,32 +987,32 @@ purge_betas_and_deltas(State = #vqstate { q3 = Q3,
end.
remove_queue_entries(Fold, Q, IndexState) ->
- {GuidsByStore, SeqIds, IndexState1} =
- Fold(fun remove_queue_entries1/2, {dict:new(), [], IndexState}, Q),
+ {GuidsByStore, Delivers, Acks} =
+ Fold(fun remove_queue_entries1/2, {dict:new(), [], []}, Q),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
- rabbit_queue_index:ack(SeqIds, IndexState1).
+ rabbit_queue_index:ack(Acks,
+ rabbit_queue_index:deliver(Delivers, IndexState)).
remove_queue_entries1(
#msg_status { guid = Guid, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {GuidsByStore, SeqIdsAcc, IndexState}) ->
- GuidsByStore1 = case MsgOnDisk of
- true -> rabbit_misc:dict_cons(
- find_msg_store(IsPersistent),
- Guid, GuidsByStore);
- false -> GuidsByStore
- end,
- SeqIdsAcc1 = case IndexOnDisk of
- true -> [SeqId | SeqIdsAcc];
- false -> SeqIdsAcc
- end,
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
- {GuidsByStore1, SeqIdsAcc1, IndexState1}.
+ {GuidsByStore, Delivers, Acks}) ->
+ {case MsgOnDisk of
+ true -> rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid,
+ GuidsByStore);
+ false -> GuidsByStore
+ end,
+ case IndexOnDisk andalso not IsDelivered of
+ true -> [SeqId | Delivers];
+ false -> Delivers
+ end,
+ case IndexOnDisk of
+ true -> [SeqId | Acks];
+ false -> Acks
+ end}.
fetch_from_q3_to_q4(State = #vqstate {
q1 = Q1,