summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-11-26 21:06:14 +0000
committerMatthias Radestock <matthias@lshift.net>2009-11-26 21:06:14 +0000
commite7fa0917c55237a97d5d98dbb2e7d47fbc3d310e (patch)
tree4cc8c305246d288a05219ae281e22bc8f24bc306
parentcf893fe294da6f173aea8f8e9f5c0f46446b2ad0 (diff)
downloadrabbitmq-server-git-e7fa0917c55237a97d5d98dbb2e7d47fbc3d310e.tar.gz
cosmetic
-rw-r--r--src/rabbit_queue_index.erl74
1 files changed, 41 insertions, 33 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index abf8f57e7e..2f6b6c184c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -208,9 +208,9 @@ write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict }) ->
sync_seq_ids(SeqIds, SyncAckJournal, State) ->
State1 = case SyncAckJournal of
- true -> {Hdl, State2} = get_journal_handle(State),
- ok = file_handle_cache:sync(Hdl),
- State2;
+ true -> {Hdl, State2} = get_journal_handle(State),
+ ok = file_handle_cache:sync(Hdl),
+ State2;
false -> State
end,
SegNumsSet =
@@ -386,7 +386,8 @@ rev_sort(List) ->
get_journal_handle(State = #qistate { dir = Dir, seg_num_handles = SegHdls }) ->
case dict:find(journal, SegHdls) of
- {ok, Hdl} -> {Hdl, State};
+ {ok, Hdl} ->
+ {Hdl, State};
error ->
Path = filename:join(Dir, ?ACK_JOURNAL_FILENAME),
Mode = [raw, binary, delayed_write, write, read, read_ahead],
@@ -435,7 +436,8 @@ get_counted_handle(SegNumA, State = #qistate { partial_segments = Partials,
get_seg_handle(SegNum, State = #qistate { dir = Dir, seg_num_handles = SegHdls }) ->
case dict:find(SegNum, SegHdls) of
- {ok, Hdl} -> {Hdl, State};
+ {ok, Hdl} ->
+ {Hdl, State};
error ->
new_handle(SegNum, seg_num_to_path(Dir, SegNum),
[binary, raw, read, write,
@@ -449,7 +451,7 @@ delete_segment(SegNum, State = #qistate { dir = Dir,
partial_segments = Partials }) ->
State1 = close_handle(SegNum, State),
ok = case file:delete(seg_num_to_path(Dir, SegNum)) of
- ok -> ok;
+ ok -> ok;
{error, enoent} -> ok
end,
State1 #qistate {seg_ack_counts = dict:erase(SegNum, AckCounts),
@@ -464,7 +466,8 @@ close_handle(Key, State = #qistate { seg_num_handles = SegHdls }) ->
{ok, Hdl} ->
ok = file_handle_cache:close(Hdl),
State #qistate { seg_num_handles = dict:erase(Key, SegHdls) };
- error -> State
+ error ->
+ State
end.
close_all_handles(State = #qistate { seg_num_handles = SegHdls }) ->
@@ -484,7 +487,7 @@ reconstruct_seq_id(SegNum, RelSeq) ->
seg_num_to_path(Dir, SegNum) ->
SegName = integer_to_list(SegNum),
- filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION).
+ filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION).
delete_queue_directory(Dir) ->
{ok, Entries} = file:list_dir(Dir),
@@ -510,19 +513,19 @@ blank_state(QueueName) ->
StrName = queue_name_to_dir_name(QueueName),
Dir = filename:join(queues_dir(), StrName),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- #qistate { dir = Dir,
- seg_num_handles = dict:new(),
- journal_count = 0,
+ #qistate { dir = Dir,
+ seg_num_handles = dict:new(),
+ journal_count = 0,
journal_ack_dict = dict:new(),
journal_del_dict = dict:new(),
- seg_ack_counts = dict:new(),
- publish_handle = undefined,
+ seg_ack_counts = dict:new(),
+ publish_handle = undefined,
partial_segments = dict:new()
}.
detect_clean_shutdown(Dir) ->
case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
- ok -> true;
+ ok -> true;
{error, enoent} -> false
end.
@@ -534,10 +537,10 @@ store_clean_shutdown(Dir) ->
seg_entries_from_dict(SegNum, Dict) ->
case dict:find(SegNum, Dict) of
- {ok, Entries} -> Entries;
- error -> []
+ {ok, Entries} -> Entries;
+ error -> []
end.
-
+
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
@@ -567,7 +570,7 @@ queue_index_walker({[], State, SegNums, QueueNames}) ->
queue_index_walker({[{_RelSeq, {MsgId, _IsDelivered, IsPersistent}} | Msgs],
State, SegNums, QueueNames}) ->
case IsPersistent of
- true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}};
+ true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}};
false -> queue_index_walker({Msgs, State, SegNums, QueueNames})
end.
@@ -656,7 +659,8 @@ load_journal(Hdl, ADict, DDict) ->
load_journal(Hdl, ADict, add_seqid_to_dict(SeqId, DDict));
{ok, <<?ACK_BIT:1, SeqId:?SEQ_BITS>>} ->
load_journal(Hdl, add_seqid_to_dict(SeqId, ADict), DDict);
- _ErrOrEoF -> {ADict, DDict}
+ _ErrOrEoF ->
+ {ADict, DDict}
end.
replay_journal_to_segment(_SegNum, [], {TotalMsgCount, ADict, State}) ->
@@ -669,18 +673,18 @@ replay_journal_to_segment(SegNum, Dels, {TotalMsgCount, ADict, State}) ->
fun (RelSeq) ->
case dict:find(RelSeq, SDict) of
{ok, {_MsgId, false, _IsPersistent}} -> true;
- _ -> false
+ _ -> false
end
end, sets:from_list(Dels))),
State2 = append_dels_to_segment(SegNum, ValidDels, State1),
Acks = seg_entries_from_dict(SegNum, ADict),
case Acks of
[] -> {TotalMsgCount, ADict, State2};
- _ ->
- ADict1 = dict:erase(SegNum, ADict),
- {Count, State3} = filter_acks_and_append_to_segment(SegNum, SDict,
- Acks, State2),
- {TotalMsgCount - Count, ADict1, State3}
+ _ -> ADict1 = dict:erase(SegNum, ADict),
+ {Count, State3} =
+ filter_acks_and_append_to_segment(SegNum, SDict,
+ Acks, State2),
+ {TotalMsgCount - Count, ADict1, State3}
end.
replay_journal_acks_to_segment(_SegNum, [], {TotalMsgCount, State}) ->
@@ -740,10 +744,11 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
dir = Dir }) ->
SegmentExists = case dict:find(SegNum, SegHdls) of
{ok, _} -> true;
- error -> filelib:is_file(seg_num_to_path(Dir, SegNum))
+ error -> filelib:is_file(seg_num_to_path(Dir, SegNum))
end,
case SegmentExists of
- false -> {dict:new(), 0, 0, 0, State};
+ false ->
+ {dict:new(), 0, 0, 0, State};
true ->
{Hdl, State1 = #qistate { journal_del_dict = JDelDict,
journal_ack_dict = JAckDict }} =
@@ -763,12 +768,13 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
fun (RelSeq, SDict4) ->
case dict:find(RelSeq, SDict4) of
{ok, {MsgId, false, IsPersistent}} ->
- dict:store(RelSeq, {MsgId, true, IsPersistent},
+ dict:store(RelSeq,
+ {MsgId, true, IsPersistent},
SDict4);
- _ -> SDict4
+ _ ->
+ SDict4
end
end, SDict1, seg_entries_from_dict(SegNum, JDelDict)),
-
{SDict3, PubCount, AckCount1, HighRelSeq, State1}
end.
@@ -793,7 +799,8 @@ load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) ->
load_segment_entries(
Hdl, dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum},
SDict), PubCount + 1, AckCount, HighRelSeq1);
- _ErrOrEoF -> {SDict, PubCount, AckCount, HighRelSeq}
+ _ErrOrEoF ->
+ {SDict, PubCount, AckCount, HighRelSeq}
end.
deliver_or_ack_msg(SDict, AckCount, RelSeq) ->
@@ -818,7 +825,7 @@ append_acks_to_segment(SegNum, Acks,
end,
AckTarget = case dict:find(SegNum, Partials) of
{ok, PubCount} -> PubCount;
- error -> ?SEGMENT_ENTRIES_COUNT
+ error -> ?SEGMENT_ENTRIES_COUNT
end,
AckCount2 = AckCount + length(Acks),
append_acks_to_segment(SegNum, AckCount2, Acks, AckTarget, State).
@@ -832,7 +839,8 @@ append_acks_to_segment(SegNum, AckCount, _Acks, AckCount, State =
{SegNum, Hdl, AckCount = ?SEGMENT_ENTRIES_COUNT}
when Hdl /= undefined ->
{SegNum + 1, undefined, 0};
- _ -> PubHdl
+ _ ->
+ PubHdl
end,
delete_segment(SegNum, State #qistate { publish_handle = PubHdl1 });
append_acks_to_segment(_SegNum, _AckCount, [], _AckTarget, State) ->