summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-12-04 22:12:30 +0000
committerMatthias Radestock <matthias@lshift.net>2009-12-04 22:12:30 +0000
commitff7f229037c78ae167403ddf3d42cf31acd26bfa (patch)
tree747c41506b5432309f6055dc0b43ebe88e6c6748
parent3be853a684363e0deacbd41fead1dc34ddafc292 (diff)
downloadrabbitmq-server-git-ff7f229037c78ae167403ddf3d42cf31acd26bfa.tar.gz
cosmetic and some refactoring
-rw-r--r--src/rabbit_queue_index.erl163
1 files changed, 77 insertions, 86 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 347742d645..eda0a43a78 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -179,31 +179,28 @@ init(Name) ->
array:sparse_foldl(
fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
{Segment3, DCountAcc2}) ->
- InMsgStore = rabbit_msg_store:contains(MsgId),
- case {InMsgStore, CleanShutdown} of
- {true, true} ->
- {Segment3, DCountAcc};
- {true, false} when Del == del ->
- {Segment3, DCountAcc};
- {true, false} ->
- {add_to_journal(RelSeq, del, Segment3),
- DCountAcc2 + 1};
- {false, _} when Del == del ->
- {add_to_journal(RelSeq, ack, Segment3),
- DCountAcc2 + 1};
- {false, _} ->
- {add_to_journal(
- RelSeq, ack,
- add_to_journal(
- RelSeq, del, Segment3)),
- DCountAcc2 + 2}
- end
+ {Segment4, DCountDelta} =
+ maybe_add_to_journal(
+ rabbit_msg_store:contains(MsgId),
+ CleanShutdown, Del, RelSeq, Segment3),
+ {Segment4, DCountAcc2 + DCountDelta}
end, {Segment1, DCountAcc}, SegEntries),
{segment_store(Segment2, Segments2),
CountAcc + PubCount - AckCount, DCountAcc1}
end, {Segments, 0, DCount}, AllSegs),
{Count, State2 #qistate { segments = Segments1, dirty_count = DCount1 }}.
+maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) ->
+ {Segment, 0};
+maybe_add_to_journal( true, false, del, _RelSeq, Segment) ->
+ {Segment, 0};
+maybe_add_to_journal( true, false, _Del, RelSeq, Segment) ->
+ {add_to_journal(RelSeq, del, Segment), 1};
+maybe_add_to_journal(false, _, del, RelSeq, Segment) ->
+ {add_to_journal(RelSeq, ack, Segment), 1};
+maybe_add_to_journal(false, _, _Del, RelSeq, Segment) ->
+ {add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)), 2}.
+
terminate(State) ->
terminate(true, State).
@@ -212,34 +209,30 @@ terminate_and_erase(State) ->
ok = delete_queue_directory(State1 #qistate.dir),
State1.
-write_published(MsgId, SeqId, IsPersistent, State)
- when is_binary(MsgId) ->
+write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) ->
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} = get_journal_handle(State),
- ok = file_handle_cache:append(JournalHdl,
- [<<(case IsPersistent of
- true -> ?PUB_PERSIST_JPREFIX;
- false -> ?PUB_TRANS_JPREFIX
- end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>,
- MsgId]),
+ ok = file_handle_cache:append(
+ JournalHdl, [<<(case IsPersistent of
+ true -> ?PUB_PERSIST_JPREFIX;
+ false -> ?PUB_TRANS_JPREFIX
+ end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId]),
maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)).
write_delivered(SeqId, State) ->
{JournalHdl, State1} = get_journal_handle(State),
- ok = file_handle_cache:append(JournalHdl,
- <<?DEL_JPREFIX:?JPREFIX_BITS,
- SeqId:?SEQ_BITS>>),
+ ok = file_handle_cache:append(
+ JournalHdl, <<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>),
maybe_flush_journal(add_to_journal(SeqId, del, State1)).
write_acks(SeqIds, State) ->
{JournalHdl, State1} = get_journal_handle(State),
- ok = file_handle_cache:append(JournalHdl,
- [<<?ACK_JPREFIX:?JPREFIX_BITS,
- SeqId:?SEQ_BITS>> || SeqId <- SeqIds]),
- State2 = lists:foldl(fun (SeqId, StateN) ->
- add_to_journal(SeqId, ack, StateN)
- end, State1, SeqIds),
- maybe_flush_journal(State2).
+ ok = file_handle_cache:append(
+ JournalHdl, [<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>> ||
+ SeqId <- SeqIds]),
+ maybe_flush_journal(lists:foldl(fun (SeqId, StateN) ->
+ add_to_journal(SeqId, ack, StateN)
+ end, State1, SeqIds)).
sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) ->
State;
@@ -255,23 +248,11 @@ flush_journal(State = #qistate { segments = Segments }) ->
fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
acks = AckCount } = Segment, SegmentsN) ->
case PubCount > 0 andalso PubCount == AckCount of
- true ->
- ok = delete_segment(Segment),
- SegmentsN;
- false ->
- Segment1 =
- case array:sparse_size(JEntries) of
- 0 -> Segment;
- _ -> {Hdl, Segment2} =
- get_segment_handle(Segment),
- array:sparse_foldl(
- fun write_entry_to_segment/3, Hdl,
- JEntries),
- ok = file_handle_cache:sync(Hdl),
- Segment2 #segment { journal_entries =
- array_new() }
- end,
- segment_store(Segment1, SegmentsN)
+ true -> ok = delete_segment(Segment),
+ SegmentsN;
+ false -> segment_store(
+ append_journal_to_segment(Segment, JEntries),
+ SegmentsN)
end
end, segments_new(), Segments),
{JournalHdl, State1} =
@@ -279,6 +260,15 @@ flush_journal(State = #qistate { segments = Segments }) ->
ok = file_handle_cache:clear(JournalHdl),
State1 #qistate { dirty_count = 0 }.
+append_journal_to_segment(Segment, JEntries) ->
+ case array:sparse_size(JEntries) of
+ 0 -> Segment;
+ _ -> {Hdl, Segment1} = get_segment_handle(Segment),
+ array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
+ ok = file_handle_cache:sync(Hdl),
+ Segment1 #segment { journal_entries = array_new() }
+ end.
+
read_segment_entries(InitSeqId, State = #qistate { segments = Segments,
dir = Dir }) ->
{Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
@@ -544,8 +534,8 @@ bool_to_int(false) -> 0.
write_entry_to_segment(_RelSeq, {{_MsgId, _IsPersistent}, del, ack}, Hdl) ->
Hdl;
-write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) ->
- ok = case Publish of
+write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
+ ok = case Pub of
no_pub ->
ok;
{MsgId, IsPersistent} ->
@@ -555,14 +545,16 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) ->
RelSeq:?REL_SEQ_BITS>>, MsgId])
end,
ok = case {Del, Ack} of
- {no_del, no_ack} -> ok;
- _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
- Data = case {Del, Ack} of
- {del, ack} -> [Binary, Binary];
- _ -> Binary
- end,
- file_handle_cache:append(Hdl, Data)
+ {no_del, no_ack} ->
+ ok;
+ _ ->
+ Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ file_handle_cache:append(
+ Hdl, case {Del, Ack} of
+ {del, ack} -> [Binary, Binary];
+ _ -> Binary
+ end)
end,
Hdl.
@@ -645,11 +637,11 @@ load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) ->
deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries) ->
case array:get(RelSeq, SegEntries) of
- {PubRecord, no_del, no_ack} ->
- {AckCount, array:set(RelSeq, {PubRecord, del, no_ack}, SegEntries)};
- {PubRecord, del, no_ack} when KeepAcks ->
- {AckCount + 1, array:set(RelSeq, {PubRecord, del, ack}, SegEntries)};
- {_PubRecord, del, no_ack} ->
+ {Pub, no_del, no_ack} ->
+ {AckCount, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
+ {Pub, del, no_ack} when KeepAcks ->
+ {AckCount + 1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
+ {_Pub, del, no_ack} ->
{AckCount + 1, array:reset(RelSeq, SegEntries)}
end.
@@ -700,11 +692,10 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> =
<<MsgIdNum:?MSG_ID_BITS>>,
- Publish = {MsgId,
- case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end},
+ Publish = {MsgId, case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end},
load_journal_entries(
add_to_journal(SeqId, Publish, State));
_ErrOrEoF -> %% err, we've lost at least a publish
@@ -747,10 +738,10 @@ add_to_journal(RelSeq, Action, SegJArray) ->
del -> {no_pub, del, no_ack};
ack -> {no_pub, no_del, ack}
end, SegJArray);
- ({PubRecord, no_del, no_ack}) when Action == del ->
- array:set(RelSeq, {PubRecord, del, no_ack}, SegJArray);
- ({PubRecord, Del, no_ack}) when Action == ack ->
- array:set(RelSeq, {PubRecord, Del, ack}, SegJArray)
+ ({Pub, no_del, no_ack}) when Action == del ->
+ array:set(RelSeq, {Pub, del, no_ack}, SegJArray);
+ ({Pub, Del, no_ack}) when Action == ack ->
+ array:set(RelSeq, {Pub, Del, ack}, SegJArray)
end.
%% Combine what we have just read from a segment file with what we're
@@ -784,9 +775,9 @@ journal_plus_segment({{_MsgId, _IsPersistent}, del, ack},
array:reset(RelSeq, Out);
journal_plus_segment({no_pub, del, no_ack},
- {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
+ {Pub = {_MsgId, _IsPersistent}, no_del, no_ack},
RelSeq, Out) ->
- array:set(RelSeq, {PubRecord, del, no_ack}, Out);
+ array:set(RelSeq, {Pub, del, no_ack}, Out);
journal_plus_segment({no_pub, del, ack},
{{_MsgId, _IsPersistent}, no_del, no_ack},
@@ -854,8 +845,8 @@ journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack},
not_found,
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
-journal_minus_segment({PubRecord, del, no_ack},
- {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
+journal_minus_segment({Pub, del, no_ack},
+ {Pub = {_MsgId, _IsPersistent}, no_del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, {no_pub, del, no_ack}, Out),
PubsRemoved + 1, AcksRemoved};
@@ -880,13 +871,13 @@ journal_minus_segment({{_MsgId, _IsPersistent}, del, ack},
not_found,
_RelSeq, Out, PubsRemoved, AcksRemoved) ->
{Out, PubsRemoved, AcksRemoved};
-journal_minus_segment({PubRecord, del, ack},
- {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
+journal_minus_segment({Pub, del, ack},
+ {Pub = {_MsgId, _IsPersistent}, no_del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, {no_pub, del, ack}, Out),
PubsRemoved + 1, AcksRemoved};
-journal_minus_segment({PubRecord, del, ack},
- {PubRecord = {_MsgId, _IsPersistent}, del, no_ack},
+journal_minus_segment({Pub, del, ack},
+ {Pub = {_MsgId, _IsPersistent}, del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, {no_pub, no_del, ack}, Out),
PubsRemoved + 1, AcksRemoved}.