summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-18 07:04:52 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-18 07:04:52 +0100
commit9fd7e7df4b77abf9c64589fe0622c92042ebf80d (patch)
tree3d1088eee75a7ae5d619fb1a9c61ea28f6b7dc45
parent6781a33feb32355fc43bb987c7093f509bc17aa6 (diff)
downloadrabbitmq-server-git-9fd7e7df4b77abf9c64589fe0622c92042ebf80d.tar.gz
rename some qi funs
-rw-r--r--src/rabbit_queue_index.erl89
-rw-r--r--src/rabbit_tests.erl30
-rw-r--r--src/rabbit_variable_queue.erl128
3 files changed, 117 insertions, 130 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 44df5976da..241766b823 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,8 +31,8 @@
-module(rabbit_queue_index).
--export([init/3, terminate/2, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
+-export([init/3, terminate/2, terminate_and_erase/1, publish/4,
+ deliver/2, ack/2, sync/2, flush/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, recover/1]).
@@ -189,12 +189,11 @@
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
--spec(write_published/4 :: (guid(), seq_id(), boolean(), qistate())
- -> qistate()).
--spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
--spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
--spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()).
--spec(flush_journal/1 :: (qistate()) -> qistate()).
+-spec(publish/4 :: (guid(), seq_id(), boolean(), qistate()) -> qistate()).
+-spec(deliver/2 :: (seq_id(), qistate()) -> qistate()).
+-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
+-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
+-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
{[{guid(), seq_id(), boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
@@ -291,7 +290,7 @@ terminate_and_erase(State) ->
ok = rabbit_misc:recursive_delete([State1 #qistate.dir]),
State1.
-write_published(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
+publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
@@ -301,15 +300,15 @@ write_published(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)).
-write_delivered(SeqId, State) ->
+deliver(SeqId, State) ->
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
JournalHdl, <<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>),
maybe_flush_journal(add_to_journal(SeqId, del, State1)).
-write_acks([], State) ->
+ack([], State) ->
State;
-write_acks(SeqIds, State) ->
+ack(SeqIds, State) ->
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
JournalHdl, [<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>> ||
@@ -318,43 +317,15 @@ write_acks(SeqIds, State) ->
add_to_journal(SeqId, ack, StateN)
end, State1, SeqIds)).
-sync_seq_ids([], State) ->
+sync([], State) ->
State;
-sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) ->
+sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
State;
-sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
State.
-flush_journal(State = #qistate { dirty_count = 0 }) ->
- State;
-flush_journal(State = #qistate { segments = Segments }) ->
- Segments1 =
- segment_fold(
- fun (_Seg, #segment { journal_entries = JEntries,
- pubs = PubCount,
- acks = AckCount } = Segment, SegmentsN) ->
- case PubCount > 0 andalso PubCount == AckCount of
- true -> ok = delete_segment(Segment),
- SegmentsN;
- false -> segment_store(
- append_journal_to_segment(Segment, JEntries),
- SegmentsN)
- end
- end, segments_new(), Segments),
- {JournalHdl, State1} =
- get_journal_handle(State #qistate { segments = Segments1 }),
- ok = file_handle_cache:clear(JournalHdl),
- State1 #qistate { dirty_count = 0 }.
-
-append_journal_to_segment(Segment, JEntries) ->
- case array:sparse_size(JEntries) of
- 0 -> Segment;
- _ -> {Hdl, Segment1} = get_segment_handle(Segment),
- array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
- ok = file_handle_cache:sync(Hdl),
- Segment1 #segment { journal_entries = array_new() }
- end.
+flush(State) -> flush_journal(State).
read_segment_entries(InitSeqId, State = #qistate { segments = Segments,
dir = Dir }) ->
@@ -599,6 +570,36 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount })
maybe_flush_journal(State) ->
State.
+flush_journal(State = #qistate { dirty_count = 0 }) ->
+ State;
+flush_journal(State = #qistate { segments = Segments }) ->
+ Segments1 =
+ segment_fold(
+ fun (_Seg, #segment { journal_entries = JEntries,
+ pubs = PubCount,
+ acks = AckCount } = Segment, SegmentsN) ->
+ case PubCount > 0 andalso PubCount == AckCount of
+ true -> ok = delete_segment(Segment),
+ SegmentsN;
+ false -> segment_store(
+ append_journal_to_segment(Segment, JEntries),
+ SegmentsN)
+ end
+ end, segments_new(), Segments),
+ {JournalHdl, State1} =
+ get_journal_handle(State #qistate { segments = Segments1 }),
+ ok = file_handle_cache:clear(JournalHdl),
+ State1 #qistate { dirty_count = 0 }.
+
+append_journal_to_segment(Segment, JEntries) ->
+ case array:sparse_size(JEntries) of
+ 0 -> Segment;
+ _ -> {Hdl, Segment1} = get_segment_handle(Segment),
+ array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
+ ok = file_handle_cache:sync(Hdl),
+ Segment1 #segment { journal_entries = array_new() }
+ end.
+
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b9f6dfd6a9..97d74fc93f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1414,8 +1414,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) ->
Guid = rabbit_guid:guid(),
- QiM = rabbit_queue_index:write_published(Guid, SeqId, Persistent,
- QiN),
+ QiM = rabbit_queue_index:publish(
+ Guid, SeqId, Persistent, QiN),
{ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
Guid, MSCStateN),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
@@ -1425,13 +1425,11 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{A, B}.
queue_index_deliver(SeqIds, Qi) ->
- lists:foldl(
- fun (SeqId, QiN) ->
- rabbit_queue_index:write_delivered(SeqId, QiN)
- end, Qi, SeqIds).
+ lists:foldl(fun (SeqId, QiN) -> rabbit_queue_index:deliver(SeqId, QiN) end,
+ Qi, SeqIds).
-queue_index_flush_journal(Qi) ->
- rabbit_queue_index:flush_journal(Qi).
+queue_index_flush(Qi) ->
+ rabbit_queue_index:flush(Qi).
verify_read_with_published(_Delivered, _Persistent, [], _) ->
ok;
@@ -1491,8 +1489,8 @@ test_queue_index() ->
{ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14),
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsGuidsB)),
- Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15),
- Qi17 = queue_index_flush_journal(Qi16),
+ Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15),
+ Qi17 = queue_index_flush(Qi16),
%% Everything will have gone now because #pubs == #acks
{0, 0, Qi18} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
@@ -1512,8 +1510,8 @@ test_queue_index() ->
{0, _Terms4, Qi22} = test_queue_init(),
{Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22),
Qi24 = queue_index_deliver(SeqIdsC, Qi23),
- Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24),
- Qi26 = queue_index_flush_journal(Qi25),
+ Qi25 = rabbit_queue_index:ack(SeqIdsC, Qi24),
+ Qi26 = queue_index_flush(Qi25),
{Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26),
_Qi28 = rabbit_queue_index:terminate_and_erase(Qi27),
ok = stop_msg_store(),
@@ -1524,8 +1522,8 @@ test_queue_index() ->
{Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29),
Qi31 = queue_index_deliver(SeqIdsC, Qi30),
{Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31),
- Qi33 = rabbit_queue_index:write_acks(SeqIdsC, Qi32),
- Qi34 = queue_index_flush_journal(Qi33),
+ Qi33 = rabbit_queue_index:ack(SeqIdsC, Qi32),
+ Qi34 = queue_index_flush(Qi33),
_Qi35 = rabbit_queue_index:terminate_and_erase(Qi34),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1535,8 +1533,8 @@ test_queue_index() ->
{0, _Terms6, Qi36} = test_queue_init(),
{Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36),
Qi38 = queue_index_deliver(SeqIdsD, Qi37),
- Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38),
- Qi40 = queue_index_flush_journal(Qi39),
+ Qi39 = rabbit_queue_index:ack(SeqIdsD, Qi38),
+ Qi40 = queue_index_flush(Qi39),
_Qi41 = rabbit_queue_index:terminate_and_erase(Qi40),
ok = stop_msg_store(),
ok = rabbit_variable_queue:start([]),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 39ef3ec421..079c14eb9a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -360,7 +360,7 @@ delete_and_terminate(State) ->
remove_pending_ack(false, State1),
%% flushing here is good because it deletes all full segments,
%% leaving only partial segments around.
- IndexState1 = rabbit_queue_index:flush_journal(IndexState),
+ IndexState1 = rabbit_queue_index:flush(IndexState),
IndexState2 =
case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
IndexState1) of
@@ -456,7 +456,7 @@ fetch(AckRequired, State =
%% 1. Mark it delivered if necessary
IndexState1 = case IndexOnDisk andalso not IsDelivered of
- true -> rabbit_queue_index:write_delivered(
+ true -> rabbit_queue_index:deliver(
SeqId, IndexState);
false -> IndexState
end,
@@ -465,29 +465,25 @@ fetch(AckRequired, State =
MsgStore = find_msg_store(IsPersistent, PersistentStore),
IndexState2 =
case MsgOnDisk andalso not AckRequired of
- true -> %% Remove from disk now
- ok = case MsgOnDisk of
- true ->
- rabbit_msg_store:remove(MsgStore, [Guid]);
- false ->
- ok
- end,
- case IndexOnDisk of
- true ->
- rabbit_queue_index:write_acks([SeqId],
- IndexState1);
- false ->
- IndexState1
- end;
- false ->
- IndexState1
+ %% Remove from disk now
+ true -> ok = case MsgOnDisk of
+ true -> rabbit_msg_store:remove(
+ MsgStore, [Guid]);
+ false -> ok
+ end,
+ case IndexOnDisk of
+ true -> rabbit_queue_index:ack(
+ [SeqId], IndexState1);
+ false -> IndexState1
+ end;
+ false -> IndexState1
end,
%% 3. If it's on disk, not persistent and an ack's
%% required then remove it from the queue index only.
IndexState3 =
case IndexOnDisk andalso AckRequired andalso not IsPersistent of
- true -> rabbit_queue_index:write_acks([SeqId], IndexState2);
+ true -> rabbit_queue_index:ack([SeqId], IndexState2);
false -> IndexState2
end,
@@ -539,7 +535,7 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
[SeqId | SeqIds], PAN1}
end
end, {dict:new(), [], PA}, AckTags),
- IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState),
+ IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
@@ -635,7 +631,7 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
StateN3}
end
end, {[], dict:new(), State}, AckTags),
- IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState),
+ IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:release(MsgStore, Guids)
end, ok, GuidsByStore),
@@ -707,8 +703,7 @@ needs_sync(_) -> true.
sync(State) -> tx_commit_index(State).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
- State #vqstate { index_state =
- rabbit_queue_index:flush_journal(IndexState) }.
+ State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len, on_sync = {_, _, From},
@@ -759,21 +754,19 @@ remove_pending_ack(KeepPersistent,
{SeqIdsAcc, Dict, dict:erase(SeqId, PAN)}
end, {[], dict:new(), PA}, PA),
case KeepPersistent of
- true ->
- State1 = State #vqstate { pending_ack = PA1 },
- case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
- error -> State1;
- {ok, Guids} -> ok = rabbit_msg_store:remove(
- ?TRANSIENT_MSG_STORE, Guids),
- State1
- end;
- false ->
- IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState),
- ok = dict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:remove(MsgStore, Guids)
- end, ok, GuidsByStore),
- State #vqstate { pending_ack = dict:new(),
- index_state = IndexState1 }
+ true -> State1 = State #vqstate { pending_ack = PA1 },
+ case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
+ error -> State1;
+ {ok, Guids} -> ok = rabbit_msg_store:remove(
+ ?TRANSIENT_MSG_STORE, Guids),
+ State1
+ end;
+ false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
+ ok = dict:fold(fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:remove(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ State #vqstate { pending_ack = dict:new(),
+ index_state = IndexState1 }
end.
lookup_tx(Txn) ->
@@ -812,30 +805,27 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
fun ({Guid, SeqId, IsPersistent, IsDelivered},
{FilteredAcc, IndexStateAcc}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
- true ->
- IndexStateAcc1 =
- case IsDelivered of
- false -> rabbit_queue_index:write_delivered(
- SeqId, IndexStateAcc);
- true -> IndexStateAcc
- end,
- {FilteredAcc, rabbit_queue_index:write_acks(
- [SeqId], IndexStateAcc1)};
- false ->
- case SeqId < SeqIdLimit of
- true ->
- {[#msg_status { msg = undefined,
- guid = Guid,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true
- } | FilteredAcc],
- IndexStateAcc};
- false ->
- {FilteredAcc, IndexStateAcc}
- end
+ true -> IndexStateAcc1 =
+ case IsDelivered of
+ false -> rabbit_queue_index:deliver(
+ SeqId, IndexStateAcc);
+ true -> IndexStateAcc
+ end,
+ {FilteredAcc, rabbit_queue_index:ack(
+ [SeqId], IndexStateAcc1)};
+ false -> case SeqId < SeqIdLimit of
+ true -> {[#msg_status {
+ msg = undefined,
+ guid = Guid,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true
+ } | FilteredAcc],
+ IndexStateAcc};
+ false -> {FilteredAcc, IndexStateAcc}
+ end
end
end, {[], IndexState}, List),
{bpqueue:from_list([{true, Filtered}]), IndexState1}.
@@ -966,8 +956,7 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
false -> SeqIdsAcc
end, StateN1}
end, {Acks, State1}, Pubs),
- IndexState1 =
- rabbit_queue_index:sync_seq_ids(SeqIds, IndexState),
+ IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
@@ -1024,7 +1013,7 @@ remove_queue_entries(PersistentStore, Fold, Q, IndexState) ->
IndexState2 =
case SeqIds of
[] -> IndexState1;
- _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1)
+ _ -> rabbit_queue_index:ack(SeqIds, IndexState1)
end,
{Count, IndexState2}.
@@ -1047,8 +1036,7 @@ remove_queue_entries1(
false -> SeqIdsAcc
end,
IndexStateN1 = case IndexOnDisk andalso not IsDelivered of
- true -> rabbit_queue_index:write_delivered(
- SeqId, IndexStateN);
+ true -> rabbit_queue_index:deliver(SeqId, IndexStateN);
false -> IndexStateN
end,
{PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
@@ -1298,11 +1286,11 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
is_delivered = IsDelivered }, IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- IndexState1 = rabbit_queue_index:write_published(
- Guid, SeqId, IsPersistent, IndexState),
+ IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent,
+ IndexState),
{MsgStatus #msg_status { index_on_disk = true },
case IsDelivered of
- true -> rabbit_queue_index:write_delivered(SeqId, IndexState1);
+ true -> rabbit_queue_index:deliver(SeqId, IndexState1);
false -> IndexState1
end};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->