diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-11-26 21:06:14 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-11-26 21:06:14 +0000 |
| commit | e7fa0917c55237a97d5d98dbb2e7d47fbc3d310e (patch) | |
| tree | 4cc8c305246d288a05219ae281e22bc8f24bc306 | |
| parent | cf893fe294da6f173aea8f8e9f5c0f46446b2ad0 (diff) | |
| download | rabbitmq-server-git-e7fa0917c55237a97d5d98dbb2e7d47fbc3d310e.tar.gz | |
cosmetic
| -rw-r--r-- | src/rabbit_queue_index.erl | 74 |
1 files changed, 41 insertions, 33 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index abf8f57e7e..2f6b6c184c 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -208,9 +208,9 @@ write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict }) -> sync_seq_ids(SeqIds, SyncAckJournal, State) -> State1 = case SyncAckJournal of - true -> {Hdl, State2} = get_journal_handle(State), - ok = file_handle_cache:sync(Hdl), - State2; + true -> {Hdl, State2} = get_journal_handle(State), + ok = file_handle_cache:sync(Hdl), + State2; false -> State end, SegNumsSet = @@ -386,7 +386,8 @@ rev_sort(List) -> get_journal_handle(State = #qistate { dir = Dir, seg_num_handles = SegHdls }) -> case dict:find(journal, SegHdls) of - {ok, Hdl} -> {Hdl, State}; + {ok, Hdl} -> + {Hdl, State}; error -> Path = filename:join(Dir, ?ACK_JOURNAL_FILENAME), Mode = [raw, binary, delayed_write, write, read, read_ahead], @@ -435,7 +436,8 @@ get_counted_handle(SegNumA, State = #qistate { partial_segments = Partials, get_seg_handle(SegNum, State = #qistate { dir = Dir, seg_num_handles = SegHdls }) -> case dict:find(SegNum, SegHdls) of - {ok, Hdl} -> {Hdl, State}; + {ok, Hdl} -> + {Hdl, State}; error -> new_handle(SegNum, seg_num_to_path(Dir, SegNum), [binary, raw, read, write, @@ -449,7 +451,7 @@ delete_segment(SegNum, State = #qistate { dir = Dir, partial_segments = Partials }) -> State1 = close_handle(SegNum, State), ok = case file:delete(seg_num_to_path(Dir, SegNum)) of - ok -> ok; + ok -> ok; {error, enoent} -> ok end, State1 #qistate {seg_ack_counts = dict:erase(SegNum, AckCounts), @@ -464,7 +466,8 @@ close_handle(Key, State = #qistate { seg_num_handles = SegHdls }) -> {ok, Hdl} -> ok = file_handle_cache:close(Hdl), State #qistate { seg_num_handles = dict:erase(Key, SegHdls) }; - error -> State + error -> + State end. close_all_handles(State = #qistate { seg_num_handles = SegHdls }) -> @@ -484,7 +487,7 @@ reconstruct_seq_id(SegNum, RelSeq) -> seg_num_to_path(Dir, SegNum) -> SegName = integer_to_list(SegNum), - filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). + filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). delete_queue_directory(Dir) -> {ok, Entries} = file:list_dir(Dir), @@ -510,19 +513,19 @@ blank_state(QueueName) -> StrName = queue_name_to_dir_name(QueueName), Dir = filename:join(queues_dir(), StrName), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - #qistate { dir = Dir, - seg_num_handles = dict:new(), - journal_count = 0, + #qistate { dir = Dir, + seg_num_handles = dict:new(), + journal_count = 0, journal_ack_dict = dict:new(), journal_del_dict = dict:new(), - seg_ack_counts = dict:new(), - publish_handle = undefined, + seg_ack_counts = dict:new(), + publish_handle = undefined, partial_segments = dict:new() }. detect_clean_shutdown(Dir) -> case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of - ok -> true; + ok -> true; {error, enoent} -> false end. @@ -534,10 +537,10 @@ store_clean_shutdown(Dir) -> seg_entries_from_dict(SegNum, Dict) -> case dict:find(SegNum, Dict) of - {ok, Entries} -> Entries; - error -> [] + {ok, Entries} -> Entries; + error -> [] end. - + %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function @@ -567,7 +570,7 @@ queue_index_walker({[], State, SegNums, QueueNames}) -> queue_index_walker({[{_RelSeq, {MsgId, _IsDelivered, IsPersistent}} | Msgs], State, SegNums, QueueNames}) -> case IsPersistent of - true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; + true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; false -> queue_index_walker({Msgs, State, SegNums, QueueNames}) end. @@ -656,7 +659,8 @@ load_journal(Hdl, ADict, DDict) -> load_journal(Hdl, ADict, add_seqid_to_dict(SeqId, DDict)); {ok, <<?ACK_BIT:1, SeqId:?SEQ_BITS>>} -> load_journal(Hdl, add_seqid_to_dict(SeqId, ADict), DDict); - _ErrOrEoF -> {ADict, DDict} + _ErrOrEoF -> + {ADict, DDict} end. replay_journal_to_segment(_SegNum, [], {TotalMsgCount, ADict, State}) -> @@ -669,18 +673,18 @@ replay_journal_to_segment(SegNum, Dels, {TotalMsgCount, ADict, State}) -> fun (RelSeq) -> case dict:find(RelSeq, SDict) of {ok, {_MsgId, false, _IsPersistent}} -> true; - _ -> false + _ -> false end end, sets:from_list(Dels))), State2 = append_dels_to_segment(SegNum, ValidDels, State1), Acks = seg_entries_from_dict(SegNum, ADict), case Acks of [] -> {TotalMsgCount, ADict, State2}; - _ -> - ADict1 = dict:erase(SegNum, ADict), - {Count, State3} = filter_acks_and_append_to_segment(SegNum, SDict, - Acks, State2), - {TotalMsgCount - Count, ADict1, State3} + _ -> ADict1 = dict:erase(SegNum, ADict), + {Count, State3} = + filter_acks_and_append_to_segment(SegNum, SDict, + Acks, State2), + {TotalMsgCount - Count, ADict1, State3} end. replay_journal_acks_to_segment(_SegNum, [], {TotalMsgCount, State}) -> @@ -740,10 +744,11 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, dir = Dir }) -> SegmentExists = case dict:find(SegNum, SegHdls) of {ok, _} -> true; - error -> filelib:is_file(seg_num_to_path(Dir, SegNum)) + error -> filelib:is_file(seg_num_to_path(Dir, SegNum)) end, case SegmentExists of - false -> {dict:new(), 0, 0, 0, State}; + false -> + {dict:new(), 0, 0, 0, State}; true -> {Hdl, State1 = #qistate { journal_del_dict = JDelDict, journal_ack_dict = JAckDict }} = @@ -763,12 +768,13 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, fun (RelSeq, SDict4) -> case dict:find(RelSeq, SDict4) of {ok, {MsgId, false, IsPersistent}} -> - dict:store(RelSeq, {MsgId, true, IsPersistent}, + dict:store(RelSeq, + {MsgId, true, IsPersistent}, SDict4); - _ -> SDict4 + _ -> + SDict4 end end, SDict1, seg_entries_from_dict(SegNum, JDelDict)), - {SDict3, PubCount, AckCount1, HighRelSeq, State1} end. @@ -793,7 +799,8 @@ load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) -> load_segment_entries( Hdl, dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum}, SDict), PubCount + 1, AckCount, HighRelSeq1); - _ErrOrEoF -> {SDict, PubCount, AckCount, HighRelSeq} + _ErrOrEoF -> + {SDict, PubCount, AckCount, HighRelSeq} end. deliver_or_ack_msg(SDict, AckCount, RelSeq) -> @@ -818,7 +825,7 @@ append_acks_to_segment(SegNum, Acks, end, AckTarget = case dict:find(SegNum, Partials) of {ok, PubCount} -> PubCount; - error -> ?SEGMENT_ENTRIES_COUNT + error -> ?SEGMENT_ENTRIES_COUNT end, AckCount2 = AckCount + length(Acks), append_acks_to_segment(SegNum, AckCount2, Acks, AckTarget, State). @@ -832,7 +839,8 @@ append_acks_to_segment(SegNum, AckCount, _Acks, AckCount, State = {SegNum, Hdl, AckCount = ?SEGMENT_ENTRIES_COUNT} when Hdl /= undefined -> {SegNum + 1, undefined, 0}; - _ -> PubHdl + _ -> + PubHdl end, delete_segment(SegNum, State #qistate { publish_handle = PubHdl1 }); append_acks_to_segment(_SegNum, _AckCount, [], _AckTarget, State) -> |
