diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-30 15:28:25 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-30 15:28:25 +0000 |
| commit | 6c2709a85d3a1a16ebc1da263ceef42b2e5a54cb (patch) | |
| tree | d7d46496b7a22ba19daab507123a5874d4c73b56 /src | |
| parent | 2c8ccb231bd7dcc635bc8e2b6c866a052c33c0fb (diff) | |
| download | rabbitmq-server-git-6c2709a85d3a1a16ebc1da263ceef42b2e5a54cb.tar.gz | |
Made the qi keep counters for the publishes and deliveries and flush to disk (minus sync) at the right points. All tests pass. However...
...it's actually wrong. Deliveries really don't have to happen in the right order. For example, if you deliver a load of messages, then publish a load, then requeue the initial lot, then you'll be publishing those msgs with new seqids, and marking them delivered. That could be in a different segment file, thus upsetting the counters.
So really deliveries need journalling too. In theory that looks straight forward because deliveries and acks look exactly the same in segment files, and so just using the very same code for acking and deliveries should be totally fine. But, it's not because of the issue of duplicates. During scattering of the journal out to the segments, there's a period in which there could be three entries (pub + del + ack) for a msg in the segment file, *and* an entry in the journal (ack). This is fine, and we make sure when scattering at startup that we don't reack msgs that have already been acked in the segment. However, we could now have two entries in the segment file (pub + del) and an entry in the journal, and not know whether that entry is the ack or a dup of the del. So we have a choice - either use 1 bit in the journal to indicate whether the entry is a del or an ack (bringing the space for seqids down to 2^63), or use two journals, one for acks and one for dels. Assuming Rabbit can do 50kHz, 2^63 will still last 5.8million years, or 2.9million years at 100kHz. I think this is still fine, so will take 1 bit from the seq id.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 133 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 7 |
3 files changed, 112 insertions, 47 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index e0634bee55..62c6af5384 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -114,7 +114,9 @@ seg_num_handles, journal_ack_count, journal_ack_dict, - seg_ack_counts + seg_ack_counts, + publish_handle, + deliver_handle }). -include("rabbit.hrl"). @@ -123,13 +125,18 @@ -ifdef(use_specs). +-type(hdl() :: ('undefined' | any())). -type(msg_id() :: binary()). -type(seq_id() :: integer()). +-type(hdl_and_count() :: ('undefined' | + {non_neg_integer(), hdl(), non_neg_integer()})). -type(qistate() :: #qistate { dir :: file_path(), seg_num_handles :: dict(), journal_ack_count :: integer(), journal_ack_dict :: dict(), - seg_ack_counts :: dict() + seg_ack_counts :: dict(), + publish_handle :: hdl_and_count(), + deliver_handle :: hdl_and_count() }). -spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}). @@ -179,7 +186,7 @@ write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - {Hdl, State1} = get_seg_handle(SegNum, State), + {Hdl, State1} = get_pub_handle(SegNum, State), ok = file_handle_cache:append(Hdl, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, @@ -188,7 +195,7 @@ write_published(MsgId, SeqId, IsPersistent, State) write_delivered(SeqId, State) -> {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - {Hdl, State1} = get_seg_handle(SegNum, State), + {Hdl, State1} = get_del_handle(SegNum, State), ok = file_handle_cache:append( Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>), @@ -260,7 +267,8 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict, read_segment_entries(InitSeqId, State) -> {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - {SDict, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State), + {SDict, _PubCount, _DelCount, _AckCount, _HighRelSeq, State1} = + load_segment(SegNum, State), %% deliberately sort the list desc, because foldl will reverse it RelSeqs = rev_sort(dict:fetch_keys(SDict)), {lists:foldl(fun (RelSeq, Acc) -> @@ -284,23 +292,24 @@ find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> %% of the lowest segment. That seq_id may not actually exist, but %% that's fine. The important thing is that the segment exists and %% the seq_id reported is on a segment boundary. + + %% SegNums is sorted, ascending. LowSeqIdSeg = case SegNums of [] -> 0; - _ -> reconstruct_seq_id(lists:min(SegNums), 0) + _ -> reconstruct_seq_id(hd(SegNums), 0) end, {NextSeqId, State1} = case SegNums of [] -> {0, State}; - _ -> SegNum2 = lists:max(SegNums), - {SDict, AckCount, HighRelSeq, State2} = + _ -> SegNum2 = lists:last(SegNums), + {_SDict, PubCount, _DelCount, _AckCount, HighRelSeq, State2} = load_segment(SegNum2, State), NextSeqId1 = reconstruct_seq_id(SegNum2, HighRelSeq), NextSeqId2 = - case 0 == AckCount andalso 0 == HighRelSeq andalso - 0 == dict:size(SDict) of - true -> NextSeqId1; - false -> NextSeqId1 + 1 + case PubCount of + 0 -> NextSeqId1; + _ -> NextSeqId1 + 1 end, {NextSeqId2, State2} end, @@ -374,6 +383,30 @@ get_journal_handle(State = #qistate { dir = Dir, seg_num_handles = SegHdls }) -> new_handle(journal, Path, Mode, State) end. +get_pub_handle(SegNum, State = #qistate { publish_handle = PubHandle }) -> + {State1, PubHandle1 = {_SegNum, Hdl, _Count}} = + get_counted_handle(SegNum, State, PubHandle), + {Hdl, State1 #qistate { publish_handle = PubHandle1 }}. + +get_del_handle(SegNum, State = #qistate { deliver_handle = DelHandle }) -> + {State1, DelHandle1 = {_SegNum, Hdl, _Count}} = + get_counted_handle(SegNum, State, DelHandle), + {Hdl, State1 #qistate { deliver_handle = DelHandle1 }}. + +get_counted_handle(SegNum, State, undefined) -> + {Hdl, State1} = get_seg_handle(SegNum, State), + {State1, {SegNum, Hdl, 1}}; +get_counted_handle(SegNum, State, {SegNum, undefined, Count}) -> + {Hdl, State1} = get_seg_handle(SegNum, State), + {State1, {SegNum, Hdl, Count + 1}}; +get_counted_handle(SegNum, State, {SegNum, Hdl, Count}) + when Count < ?SEGMENT_ENTRIES_COUNT -> + {State, {SegNum, Hdl, Count + 1}}; +get_counted_handle(SegNumA, State, {SegNumB, Hdl, ?SEGMENT_ENTRIES_COUNT}) + when SegNumA == SegNumB + 1 -> + ok = file_handle_cache:append_write_buffer(Hdl), + get_counted_handle(SegNumA, State, undefined). + get_seg_handle(SegNum, State = #qistate { dir = Dir, seg_num_handles = SegHdls }) -> case dict:find(SegNum, SegHdls) of {ok, Hdl} -> {Hdl, State}; @@ -427,9 +460,10 @@ add_ack_to_ack_dict(SeqId, ADict) -> dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). all_segment_nums(Dir) -> - [list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) - || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]. + lists:sort( + [list_to_integer( + lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) + || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]). blank_state(QueueName) -> StrName = queue_name_to_dir_name(QueueName), @@ -439,7 +473,10 @@ blank_state(QueueName) -> seg_num_handles = dict:new(), journal_ack_count = 0, journal_ack_dict = dict:new(), - seg_ack_counts = dict:new() }. + seg_ack_counts = dict:new(), + publish_handle = undefined, + deliver_handle = undefined + }. detect_clean_shutdown(Dir) -> case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of @@ -473,7 +510,8 @@ queue_index_walker({[], State, QueueNames}) -> _State = terminate(State), queue_index_walker(QueueNames); queue_index_walker({[SegNum | SegNums], State, QueueNames}) -> - {SDict, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State), + {SDict, _PubCount, _DelCount, _AckCount, _HighRelSeq, State1} = + load_segment(SegNum, State), queue_index_walker({dict:to_list(SDict), State1, SegNums, QueueNames}); queue_index_walker({[], State, SegNums, QueueNames}) -> @@ -495,8 +533,10 @@ read_and_prune_segments(State = #qistate { dir = Dir }) -> CleanShutdown = detect_clean_shutdown(Dir), {TotalMsgCount, State1} = lists:foldl( - fun (SegNum, {TotalMsgCount1, StateN}) -> - {SDict, AckCount, _HighRelSeq, StateM} = + fun (SegNum, {TotalMsgCount1, StateN = + #qistate { publish_handle = PublishHandle, + deliver_handle = DeliverHandle }}) -> + {SDict, PubCount, DelCount, AckCount, _HighRelSeq, StateM} = load_segment(SegNum, StateN), {TransientMsgsAcks, StateL = #qistate { seg_ack_counts = AckCounts, @@ -517,9 +557,27 @@ read_and_prune_segments(State = #qistate { dir = Dir }) -> [] -> JAckDict; _ -> dict:store(SegNum, TransientMsgsAcks, JAckDict) end, + %% In each of the following, there should only be + %% one segment that matches the 3rd case. All other + %% segments should either be full or empty. There + %% could be no partial segments. + PublishHandle1 = case PubCount of + ?SEGMENT_ENTRIES_COUNT -> PublishHandle; + 0 -> PublishHandle; + _ when PublishHandle == undefined -> + {SegNum, undefined, PubCount} + end, + DeliverHandle1 = case DelCount of + ?SEGMENT_ENTRIES_COUNT -> DeliverHandle; + 0 -> DeliverHandle; + _ when DeliverHandle == undefined -> + {SegNum, undefined, DelCount} + end, {TotalMsgCount2, StateL #qistate { seg_ack_counts = AckCounts1, - journal_ack_dict = JAckDict1 }} + journal_ack_dict = JAckDict1, + publish_handle = PublishHandle1, + deliver_handle = DeliverHandle1 }} end, {0, State}, SegNums), {TotalMsgCount, State1}. @@ -552,7 +610,8 @@ load_journal(Hdl, ADict) -> replay_journal_acks_to_segment(_, [], Acc) -> Acc; replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) -> - {SDict, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State), + {SDict, _PubCount, _DelCount, _AckCount, _HighRelSeq, State1} = + load_segment(SegNum, State), ValidRelSeqIds = dict:fetch_keys(SDict), ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds), sets:from_list(Acks))), @@ -605,13 +664,13 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, error -> filelib:is_file(seg_num_to_path(Dir, SegNum)) end, case SegmentExists of - false -> {dict:new(), 0, 0, State}; + false -> {dict:new(), 0, 0, 0, 0, State}; true -> {Hdl, State1 = #qistate { journal_ack_dict = JAckDict }} = get_seg_handle(SegNum, State), ok = file_handle_cache:position(Hdl, bof), - {SDict, AckCount, HighRelSeq} = - load_segment_entries(Hdl, dict:new(), 0, 0), + {SDict, PubCount, DelCount, AckCount, HighRelSeq} = + load_segment_entries(Hdl, dict:new(), 0, 0, 0, 0), RelSeqs = case dict:find(SegNum, JAckDict) of {ok, RelSeqs1} -> RelSeqs1; error -> [] @@ -620,18 +679,20 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, lists:foldl(fun (RelSeq, {SDict2, AckCount2}) -> {dict:erase(RelSeq, SDict2), AckCount2 + 1} end, {SDict, AckCount}, RelSeqs), - {SDict1, AckCount1, HighRelSeq, State1} + {SDict1, PubCount, DelCount, AckCount1, HighRelSeq, State1} end. -load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) -> +load_segment_entries(Hdl, SDict, PubCount, DelCount, AckCount, HighRelSeq) -> case file_handle_cache:read(Hdl, 1) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> {ok, LSB} = file_handle_cache:read( Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), - load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq); + {SDict1, DelCount1, AckCount1} = + deliver_or_ack_msg(SDict, DelCount, AckCount, RelSeq), + load_segment_entries( + Hdl, SDict1, PubCount, DelCount1, AckCount1, HighRelSeq); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> %% because we specify /binary, and binaries are complete @@ -641,18 +702,20 @@ load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) -> Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), - load_segment_entries(Hdl, dict:store(RelSeq, {MsgId, false, - 1 == IsPersistentNum}, - SDict), AckCount, HighRelSeq1); - _ErrOrEoF -> {SDict, AckCount, HighRelSeq} + load_segment_entries( + Hdl, dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum}, + SDict), + PubCount + 1, DelCount, AckCount, HighRelSeq1); + _ErrOrEoF -> {SDict, PubCount, DelCount, AckCount, HighRelSeq} end. -deliver_or_ack_msg(SDict, AckCount, RelSeq) -> +deliver_or_ack_msg(SDict, DelCount, AckCount, RelSeq) -> case dict:find(RelSeq, SDict) of {ok, {MsgId, false, IsPersistent}} -> - {dict:store(RelSeq, {MsgId, true, IsPersistent}, SDict), AckCount}; + {dict:store(RelSeq, {MsgId, true, IsPersistent}, SDict), + DelCount + 1, AckCount}; {ok, {_MsgId, true, _IsPersistent}} -> - {dict:erase(RelSeq, SDict), AckCount + 1} + {dict:erase(RelSeq, SDict), DelCount, AckCount + 1} end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3bf6dd36f1..e3f8ddacdf 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1031,13 +1031,13 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> test_queue_index() -> stop_msg_store(), ok = empty_test_queue(), - SeqIdsA = lists:seq(1,10000), - SeqIdsB = lists:seq(10001,20000), + SeqIdsA = lists:seq(0,9999), + SeqIdsB = lists:seq(10000,19999), {0, Qi0} = rabbit_queue_index:init(test_queue()), {0, 0, Qi1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), - {0, 10001, Qi3} = + {0, 10000, Qi3} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2), {ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3), ok = verify_read_with_published(false, false, ReadA, @@ -1049,10 +1049,10 @@ test_queue_index() -> %% should get length back as 0, as all the msgs were transient {0, Qi6} = rabbit_queue_index:init(test_queue()), false = rabbit_queue_index:can_flush_journal(Qi6), - {0, 10001, Qi7} = + {0, 10000, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), - {0, 20001, Qi9} = + {0, 20000, Qi9} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8), {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9), ok = verify_read_with_published(false, true, ReadB, @@ -1063,7 +1063,7 @@ test_queue_index() -> %% should get length back as 10000 LenB = length(SeqIdsB), {LenB, Qi12} = rabbit_queue_index:init(test_queue()), - {0, 20001, Qi13} = + {0, 20000, Qi13} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12), Qi14 = lists:foldl( fun (SeqId, QiN) -> @@ -1075,7 +1075,10 @@ test_queue_index() -> Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15), true = rabbit_queue_index:can_flush_journal(Qi16), Qi17 = rabbit_queue_index:flush_journal(Qi16), - {0, 20001, Qi18} = + %% the entire first segment will have gone as they were firstly + %% transient, and secondly ack'd + SegmentSize = rabbit_queue_index:segment_size(), + {SegmentSize, 20000, Qi18} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), _Qi19 = rabbit_queue_index:terminate(Qi18), ok = stop_msg_store(), @@ -1086,7 +1089,7 @@ test_queue_index() -> ok = stop_msg_store(), ok = empty_test_queue(), %% this next bit is just to hit the auto deletion of segment files - SeqIdsC = lists:seq(1,65536), + SeqIdsC = lists:seq(0,65535), {0, Qi22} = rabbit_queue_index:init(test_queue()), {Qi23, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, false, Qi22), Qi24 = lists:foldl( diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index da56487e01..7851d8f6a6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -475,10 +475,9 @@ remove_queue_entries(Q, IndexState) -> false -> MsgIdsAcc end, {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1} - %% the foldl is going to reverse the result lists, so start - %% by reversing so that we maintain doing things in - %% ascending seqid order - end, {0, [], [], IndexState}, lists:reverse(queue:to_list(Q))), + %% we need to write the delivered records in order otherwise + %% we upset the qi. So don't reverse. + end, {0, [], [], IndexState}, queue:to_list(Q)), ok = case MsgIds of [] -> ok; _ -> rabbit_msg_store:remove(MsgIds) |
