summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-02 15:43:04 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-02 15:43:04 +0000
commitbf1029db49ed642f270c78f0420e6aec691f1d95 (patch)
tree5ea79f1dcb3c53dd5569006a8075980875129acd /src
parent4eec29a3d5d1f1d0da31c546b060cd59d327fcc4 (diff)
downloadrabbitmq-server-git-bf1029db49ed642f270c78f0420e6aec691f1d95.tar.gz
Still haven't discovered any real bugs in the refactoring that happened to qi on Friday, but quite a lot of cleaning, extra code comments and general polish to qi
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl82
1 files changed, 41 insertions, 41 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index c68a3d0460..4b48df82dc 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -292,21 +292,20 @@ find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) ->
%% SegNums is sorted, ascending.
LowSeqIdSeg =
case SegNums of
- [] -> 0;
- _ -> reconstruct_seq_id(hd(SegNums), 0)
+ [] -> 0;
+ [MinSegNum|_] -> reconstruct_seq_id(MinSegNum, 0)
end,
{NextSeqId, State1} =
case SegNums of
[] -> {0, State};
- _ -> SegNum2 = lists:last(SegNums),
+ _ -> MaxSegNum = lists:last(SegNums),
{_SDict, PubCount, _AckCount, HighRelSeq, State2} =
- load_segment(SegNum2, State),
- NextSeqId1 = reconstruct_seq_id(SegNum2, HighRelSeq),
- NextSeqId2 =
- case PubCount of
- 0 -> NextSeqId1;
- _ -> NextSeqId1 + 1
- end,
+ load_segment(MaxSegNum, State),
+ NextSeqId1 = reconstruct_seq_id(MaxSegNum, HighRelSeq),
+ NextSeqId2 = case PubCount of
+ 0 -> NextSeqId1;
+ _ -> NextSeqId1 + 1
+ end,
{NextSeqId2, State2}
end,
{LowSeqIdSeg, NextSeqId, State1}.
@@ -459,8 +458,9 @@ seg_num_to_path(Dir, SegNum) ->
delete_queue_directory(Dir) ->
{ok, Entries} = file:list_dir(Dir),
- lists:foreach(fun file:delete/1,
- [ filename:join(Dir, Entry) || Entry <- Entries ]),
+ ok = lists:foldl(fun (Entry, ok) ->
+ file:delete(filename:join(Dir, Entry))
+ end, ok, Entries),
ok = file:del_dir(Dir).
add_seqid_to_dict(SeqId, Dict) ->
@@ -556,6 +556,10 @@ read_and_prune_segments(State = #qistate { dir = Dir }) ->
load_segment(SegNum, StateN),
StateL = #qistate { seg_ack_counts = AckCounts } =
drop_and_deliver(SegNum, SDict, CleanShutdown, StateM),
+ %% ignore the effect of drop_and_deliver on
+ %% TotalMsgCount and AckCounts, as drop_and_deliver
+ %% will add to the journal dicts, which will then
+ %% effect TotalMsgCount when we scatter the journal
TotalMsgCount2 = TotalMsgCount1 + dict:size(SDict),
AckCounts1 = case AckCount of
0 -> AckCounts;
@@ -589,12 +593,13 @@ scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) ->
{TotalMsgCount1, ADict1, State3} =
dict:fold(fun replay_journal_to_segment/3,
{TotalMsgCount, ADict,
- %% supply empty dict so that when
- %% replay_journal_acks_to_segment loads segments,
- %% it gets all msgs, and ignores anything we've
- %% found in the journal.
+ %% supply empty dicts so that when
+ %% replay_journal_to_segment loads segments, it
+ %% gets all msgs, and ignores anything we've found
+ %% in the journal.
State2 #qistate { journal_del_dict = dict:new(),
journal_ack_dict = dict:new() }}, DDict),
+ %% replay for segments which only had acks, and no deliveries
{TotalMsgCount2, State4} =
dict:fold(fun replay_journal_acks_to_segment/3,
{TotalMsgCount1, State3}, ADict1),
@@ -767,41 +772,36 @@ append_acks_to_segment(SegNum, Acks,
{ok, AckCount1} -> AckCount1;
error -> 0
end,
- case append_acks_to_segment(SegNum, AckCount, Acks, State) of
- {0, State1} -> State1;
- {?SEGMENT_ENTRIES_COUNT,
- State1 = #qistate { seg_ack_counts = AckCounts1 }} ->
- State1 #qistate { seg_ack_counts = dict:erase(SegNum, AckCounts1) };
- {AckCount2, State1 = #qistate { seg_ack_counts = AckCounts1 }} ->
- State1 #qistate { seg_ack_counts = dict:store(SegNum, AckCount2,
- AckCounts1) }
- end.
-
-append_acks_to_segment(SegNum, AckCount, Acks, State = #qistate { dir = Dir })
- when length(Acks) + AckCount == ?SEGMENT_ENTRIES_COUNT ->
+ AckCount2 = AckCount + length(Acks),
+ AckCounts1 = case AckCount2 of
+ 0 -> AckCounts;
+ ?SEGMENT_ENTRIES_COUNT -> dict:erase(SegNum, AckCounts);
+ _ -> dict:store(SegNum, AckCount2, AckCounts)
+ end,
+ append_acks_to_segment(SegNum, AckCount2, Acks,
+ State #qistate { seg_ack_counts = AckCounts1 }).
+
+append_acks_to_segment(SegNum, AckCount, _Acks, State = #qistate { dir = Dir })
+ when AckCount == ?SEGMENT_ENTRIES_COUNT ->
State1 = close_handle(SegNum, State),
ok = case file:delete(seg_num_to_path(Dir, SegNum)) of
ok -> ok;
{error, enoent} -> ok
end,
- {?SEGMENT_ENTRIES_COUNT, State1};
+ State1;
append_acks_to_segment(SegNum, AckCount, Acks, State)
- when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT ->
- {Count, Hdl, State1} = append_to_segment(SegNum, Acks, State),
+ when AckCount < ?SEGMENT_ENTRIES_COUNT ->
+ {Hdl, State1} = append_to_segment(SegNum, Acks, State),
ok = file_handle_cache:sync(Hdl),
- {AckCount + Count, State1}.
+ State1.
append_dels_to_segment(SegNum, Dels, State) ->
- {_Count, _Hdl, State1} = append_to_segment(SegNum, Dels, State),
+ {_Hdl, State1} = append_to_segment(SegNum, Dels, State),
State1.
append_to_segment(SegNum, AcksOrDels, State) ->
{Hdl, State1} = get_seg_handle(SegNum, State),
- {Count, List} =
- lists:foldl(fun (RelSeq, {Count1, Acc}) ->
- {Count1 + 1,
- [<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>> | Acc]}
- end, {0, []}, AcksOrDels),
- ok = file_handle_cache:append(Hdl, List),
- {Count, Hdl, State1}.
+ ok = file_handle_cache:append(
+ Hdl, [<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>> || RelSeq <- AcksOrDels ]),
+ {Hdl, State1}.