diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-17 12:29:11 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-17 12:29:11 +0100 |
| commit | 246260869c0703a6b1a98f052eeb252b9885cd27 (patch) | |
| tree | 9a8a1bdd1cfabe933ced3686420b1e409c25fc6e /src | |
| parent | 2bf4f4559bca46f10df81cc0a06503e4b80ae203 (diff) | |
| download | rabbitmq-server-git-246260869c0703a6b1a98f052eeb252b9885cd27.tar.gz | |
make queue_index:deliver take a list of SeqIds
...rather than just a single SeqId. This brings it in line with 'ack',
simplifies the code, and is useful in variable_queue.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 38 |
3 files changed, 47 insertions, 54 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d062c4fd0a..5123ca21b7 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -191,7 +191,7 @@ -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/4 :: (guid(), seq_id(), boolean(), qistate()) -> qistate()). --spec(deliver/2 :: (seq_id(), qistate()) -> qistate()). +-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). @@ -244,22 +244,11 @@ publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). -deliver(SeqId, State) -> - {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append( - JournalHdl, <<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>), - maybe_flush_journal(add_to_journal(SeqId, del, State1)). +deliver(SeqIds, State) -> + deliver_or_ack(del, ?DEL_JPREFIX, SeqIds, State). -ack([], State) -> - State; ack(SeqIds, State) -> - {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append( - JournalHdl, [<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || - SeqId <- SeqIds]), - maybe_flush_journal(lists:foldl(fun (SeqId, StateN) -> - add_to_journal(SeqId, ack, StateN) - end, State1, SeqIds)). + deliver_or_ack(ack, ?ACK_JPREFIX, SeqIds, State). sync([], State) -> State; @@ -659,6 +648,17 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> _ErrOrEoF -> State end. +deliver_or_ack(_Kind, _JPrefix, [], State) -> + State; +deliver_or_ack(Kind, JPrefix, SeqIds, State) -> + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append( + JournalHdl, + [<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]), + maybe_flush_journal(lists:foldl(fun (SeqId, StateN) -> + add_to_journal(SeqId, Kind, StateN) + end, State1, SeqIds)). + %%---------------------------------------------------------------------------- %% segment manipulation %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f3df66ca58..8e99780d0f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1550,13 +1550,6 @@ queue_index_publish(SeqIds, Persistent, Qi) -> ok = rabbit_msg_store:client_terminate(MSCStateEnd), {A, B}. -queue_index_deliver(SeqIds, Qi) -> - lists:foldl(fun (SeqId, QiN) -> rabbit_queue_index:deliver(SeqId, QiN) end, - Qi, SeqIds). - -queue_index_flush(Qi) -> - rabbit_queue_index:flush(Qi). - verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; verify_read_with_published(Delivered, Persistent, @@ -1605,12 +1598,12 @@ test_queue_index() -> LenB = length(SeqIdsB), {LenB, _Terms2, Qi12} = test_queue_init(), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), - Qi14 = queue_index_deliver(SeqIdsB, Qi13), + Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), {ReadC, SegmentSize, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), ok = verify_read_with_published(true, true, ReadC, lists:reverse(SeqIdsGuidsB)), Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), - Qi17 = queue_index_flush(Qi16), + Qi17 = rabbit_queue_index:flush(Qi16), %% Everything will have gone now because #pubs == #acks {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), _Qi19 = rabbit_queue_index:terminate([], Qi18), @@ -1628,9 +1621,9 @@ test_queue_index() -> SeqIdsC = lists:seq(0,trunc(SegmentSize/2)), {0, _Terms4, Qi22} = test_queue_init(), {Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22), - Qi24 = queue_index_deliver(SeqIdsC, Qi23), + Qi24 = rabbit_queue_index:deliver(SeqIdsC, Qi23), Qi25 = rabbit_queue_index:ack(SeqIdsC, Qi24), - Qi26 = queue_index_flush(Qi25), + Qi26 = rabbit_queue_index:flush(Qi25), {Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26), _Qi28 = rabbit_queue_index:delete_and_terminate(Qi27), ok = stop_msg_store(), @@ -1639,10 +1632,10 @@ test_queue_index() -> %% b) partial pub+del, then move to new segment, then ack all in old segment {0, _Terms5, Qi29} = test_queue_init(), {Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29), - Qi31 = queue_index_deliver(SeqIdsC, Qi30), + Qi31 = rabbit_queue_index:deliver(SeqIdsC, Qi30), {Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31), Qi33 = rabbit_queue_index:ack(SeqIdsC, Qi32), - Qi34 = queue_index_flush(Qi33), + Qi34 = rabbit_queue_index:flush(Qi33), _Qi35 = rabbit_queue_index:delete_and_terminate(Qi34), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1651,9 +1644,9 @@ test_queue_index() -> SeqIdsD = lists:seq(0,SegmentSize*4), {0, _Terms6, Qi36} = test_queue_init(), {Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36), - Qi38 = queue_index_deliver(SeqIdsD, Qi37), + Qi38 = rabbit_queue_index:deliver(SeqIdsD, Qi37), Qi39 = rabbit_queue_index:ack(SeqIdsD, Qi38), - Qi40 = queue_index_flush(Qi39), + Qi40 = rabbit_queue_index:flush(Qi39), _Qi41 = rabbit_queue_index:delete_and_terminate(Qi40), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1663,11 +1656,11 @@ test_queue_index() -> %% possibilities in combining the segment with the journal. {0, _Terms7, Qi42} = test_queue_init(), {Qi43, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7], false, Qi42), - Qi44 = queue_index_deliver([0,1,4], Qi43), + Qi44 = rabbit_queue_index:deliver([0,1,4], Qi43), Qi45 = rabbit_queue_index:ack([0], Qi44), - Qi46 = queue_index_flush(Qi45), + Qi46 = rabbit_queue_index:flush(Qi45), {Qi47, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi46), - Qi48 = queue_index_deliver([2,3,5,6], Qi47), + Qi48 = rabbit_queue_index:deliver([2,3,5,6], Qi47), Qi49 = rabbit_queue_index:ack([1,2,3], Qi48), {[], 4, Qi50} = rabbit_queue_index:read(0, 4, Qi49), {ReadD, 7, Qi51} = rabbit_queue_index:read(4, 7, Qi50), @@ -1682,14 +1675,14 @@ test_queue_index() -> %% exercise journal_minus_segment, not segment_plus_journal. {0, _Terms8, Qi54} = test_queue_init(), {Qi55, _SeqIdsGuidsE} = queue_index_publish([0,1,2,4,5,7], true, Qi54), - Qi56 = queue_index_deliver([0,1,4], Qi55), + Qi56 = rabbit_queue_index:deliver([0,1,4], Qi55), Qi57 = rabbit_queue_index:ack([0], Qi56), _Qi58 = rabbit_queue_index:terminate([], Qi57), ok = stop_msg_store(), ok = rabbit_variable_queue:start([test_queue()]), {5, _Terms9, Qi59} = test_queue_init(), {Qi60, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi59), - Qi61 = queue_index_deliver([2,3,5,6], Qi60), + Qi61 = rabbit_queue_index:deliver([2,3,5,6], Qi60), Qi62 = rabbit_queue_index:ack([1,2,3], Qi61), _Qi63 = rabbit_queue_index:terminate([], Qi62), ok = stop_msg_store(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 61437229ae..72add2af7c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -718,7 +718,7 @@ read_from_msg_store(MSCState, IsPersistent, Guid) -> maybe_write_delivered(false, _SeqId, IndexState) -> IndexState; maybe_write_delivered(true, SeqId, IndexState) -> - rabbit_queue_index:deliver(SeqId, IndexState). + rabbit_queue_index:deliver([SeqId], IndexState). accumulate_ack(SeqId, IsPersistent, Guid, {SeqIdsAcc, Dict}) -> {case IsPersistent of @@ -987,32 +987,32 @@ purge_betas_and_deltas(State = #vqstate { q3 = Q3, end. remove_queue_entries(Fold, Q, IndexState) -> - {GuidsByStore, SeqIds, IndexState1} = - Fold(fun remove_queue_entries1/2, {dict:new(), [], IndexState}, Q), + {GuidsByStore, Delivers, Acks} = + Fold(fun remove_queue_entries1/2, {dict:new(), [], []}, Q), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), - rabbit_queue_index:ack(SeqIds, IndexState1). + rabbit_queue_index:ack(Acks, + rabbit_queue_index:deliver(Delivers, IndexState)). remove_queue_entries1( #msg_status { guid = Guid, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {GuidsByStore, SeqIdsAcc, IndexState}) -> - GuidsByStore1 = case MsgOnDisk of - true -> rabbit_misc:dict_cons( - find_msg_store(IsPersistent), - Guid, GuidsByStore); - false -> GuidsByStore - end, - SeqIdsAcc1 = case IndexOnDisk of - true -> [SeqId | SeqIdsAcc]; - false -> SeqIdsAcc - end, - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - {GuidsByStore1, SeqIdsAcc1, IndexState1}. + {GuidsByStore, Delivers, Acks}) -> + {case MsgOnDisk of + true -> rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, + GuidsByStore); + false -> GuidsByStore + end, + case IndexOnDisk andalso not IsDelivered of + true -> [SeqId | Delivers]; + false -> Delivers + end, + case IndexOnDisk of + true -> [SeqId | Acks]; + false -> Acks + end}. fetch_from_q3_to_q4(State = #vqstate { q1 = Q1, |
