summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-26 15:29:56 +0000
committerMatthew Sackman <matthew@lshift.net>2009-10-26 15:29:56 +0000
commit896dee8c4acffe20e2e3c4dda3358678ca02e5ac (patch)
tree533701b62d894aba49078a94a9c5040a8ed0c7bb
parent595098e4b5c04b5031facfec20c9c92f337191ec (diff)
downloadrabbitmq-server-git-896dee8c4acffe20e2e3c4dda3358678ca02e5ac.tar.gz
Mainly a load of cosmetics, and some minor API changes, but also manage to hook in getting the queue to flush the journal out if the queue is idle and has no work to do. This takes advantage of the ability to incrementally flush out the ack journal.
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_queue_index.erl39
-rw-r--r--src/rabbit_tests.erl32
-rw-r--r--src/rabbit_variable_queue.erl10
4 files changed, 69 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cd94c6e450..180a9f8a07 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -153,28 +153,32 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State = #q { variable_queue_state = VQS }) ->
+next_state(State = #q{variable_queue_state = VQS}) ->
next_state1(State, rabbit_variable_queue:needs_sync(VQS)).
-next_state1(State = #q { sync_timer_ref = undefined }, true) ->
+next_state1(State = #q{sync_timer_ref = undefined}, true) ->
{start_sync_timer(State), 0};
next_state1(State, true) ->
{State, 0};
-next_state1(State = #q { sync_timer_ref = undefined }, false) ->
- {State, hibernate};
+next_state1(State = #q{sync_timer_ref = undefined,
+ variable_queue_state = VQS}, false) ->
+ {State, case rabbit_variable_queue:can_flush_journal(VQS) of
+ true -> 0;
+ false -> hibernate
+ end};
next_state1(State, false) ->
{stop_sync_timer(State), 0}.
-start_sync_timer(State = #q { sync_timer_ref = undefined }) ->
+start_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(?SYNC_INTERVAL, rabbit_amqqueue,
tx_commit_vq_callback, [self()]),
- State #q { sync_timer_ref = TRef }.
+ State#q{sync_timer_ref = TRef}.
-stop_sync_timer(State = #q { sync_timer_ref = TRef }) ->
+stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- State #q { sync_timer_ref = undefined }.
+ State#q{sync_timer_ref = undefined}.
-assert_invariant(#q { active_consumers = AC, variable_queue_state = VQS }) ->
+assert_invariant(#q{active_consumers = AC, variable_queue_state = VQS}) ->
true = (queue:is_empty(AC) orelse rabbit_variable_queue:is_empty(VQS)).
lookup_ch(ChPid) ->
@@ -868,6 +872,13 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
+handle_info(timeout, State = #q{variable_queue_state = VQS,
+ sync_timer_ref = undefined}) ->
+ %% if sync_timer_ref is undefined then we must have set the
+ %% timeout to zero because we thought we could flush the journal
+ noreply(State#q{variable_queue_state =
+ rabbit_variable_queue:flush_journal(VQS)});
+
handle_info(timeout, State = #q{variable_queue_state = VQS}) ->
noreply(
run_message_queue(
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index e3057f9cb4..39116b0dd2 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,7 +32,7 @@
-module(rabbit_queue_index).
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, flush_journal/1, sync_seq_ids/3,
+ write_delivered/2, write_acks/2, can_flush_journal/1, flush_journal/1, sync_seq_ids/3,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
@@ -136,7 +136,8 @@
-> qistate()).
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
--spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}).
+-spec(flush_journal/1 :: (qistate()) -> qistate()).
+-spec(can_flush_journal/1 :: (qistate()) -> boolean()).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
{( [{msg_id(), seq_id(), boolean(), boolean()}]
| 'not_found'), qistate()}).
@@ -211,12 +212,6 @@ write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict,
false -> State2
end.
-full_flush_journal(State) ->
- case flush_journal(State) of
- {true, State1} -> full_flush_journal(State1);
- {false, State1} -> State1
- end.
-
sync_seq_ids(SeqIds, SyncAckJournal, State) ->
State1 = case SyncAckJournal of
true -> {Hdl, State2} = get_journal_handle(State),
@@ -237,8 +232,13 @@ sync_seq_ids(SeqIds, SyncAckJournal, State) ->
StateM
end, State1, SegNumsSet).
+can_flush_journal(#qistate { journal_ack_count = 0 }) ->
+ false;
+can_flush_journal(_) ->
+ true.
+
flush_journal(State = #qistate { journal_ack_count = 0 }) ->
- {false, State};
+ State;
flush_journal(State = #qistate { journal_ack_dict = JAckDict,
journal_ack_count = JAckCount }) ->
[SegNum|_] = dict:fetch_keys(JAckDict),
@@ -253,11 +253,11 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict,
ok = file_handle_cache:position(Hdl, bof),
ok = file_handle_cache:truncate(Hdl),
ok = file_handle_cache:sync(Hdl),
- {false, State3};
+ State3;
JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT ->
flush_journal(State2);
true ->
- {true, State2}
+ State2
end.
read_segment_entries(InitSeqId, State) ->
@@ -348,6 +348,13 @@ start_msg_store(DurableQueues) ->
%% Minor Helpers
%%----------------------------------------------------------------------------
+full_flush_journal(State) ->
+ case can_flush_journal(State) of
+ true -> State1 = flush_journal(State),
+ full_flush_journal(State1);
+ false -> State
+ end.
+
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
Bin = term_to_binary(Name),
Size = 8*size(Bin),
@@ -421,6 +428,11 @@ add_ack_to_ack_dict(SeqId, ADict) ->
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict).
+all_segment_nums(Dir) ->
+ [list_to_integer(
+ lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName))
+ || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)].
+
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
@@ -454,11 +466,6 @@ queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Entries],
%% Startup Functions
%%----------------------------------------------------------------------------
-all_segment_nums(Dir) ->
- [list_to_integer(
- lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName))
- || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)].
-
find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) ->
SegNums = all_segment_nums(Dir),
{TotalMsgCount, State1} =
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8114247604..ebd8432a20 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1040,31 +1040,33 @@ test_queue_index() ->
ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
%% should get length back as 0, as all the msgs were transient
{0, Qi6} = rabbit_queue_index:init(test_queue()),
- {false, Qi7} = rabbit_queue_index:flush_journal(Qi6),
- {0, 10001, Qi8} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi7),
- {Qi9, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi8),
- {0, 20001, Qi10} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi9),
- {ReadB, Qi11} = rabbit_queue_index:read_segment_entries(0, Qi10),
+ false = rabbit_queue_index:can_flush_journal(Qi6),
+ {0, 10001, Qi7} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
+ {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
+ {0, 20001, Qi9} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8),
+ {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9),
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsMsgIdsB)),
- _Qi12 = rabbit_queue_index:terminate(Qi11),
+ _Qi11 = rabbit_queue_index:terminate(Qi10),
ok = rabbit_msg_store:stop(),
ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
%% should get length back as 10000
LenB = length(SeqIdsB),
- {LenB, Qi13} = rabbit_queue_index:init(test_queue()),
- {0, 20001, Qi14} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi13),
- Qi15 = lists:foldl(
+ {LenB, Qi12} = rabbit_queue_index:init(test_queue()),
+ {0, 20001, Qi13} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12),
+ Qi14 = lists:foldl(
fun (SeqId, QiN) ->
rabbit_queue_index:write_delivered(SeqId, QiN)
- end, Qi14, SeqIdsB),
- {ReadC, Qi16} = rabbit_queue_index:read_segment_entries(0, Qi15),
+ end, Qi13, SeqIdsB),
+ {ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14),
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsMsgIdsB)),
- Qi17 = rabbit_queue_index:write_acks(SeqIdsB, Qi16),
+ Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15),
+ true = rabbit_queue_index:can_flush_journal(Qi16),
+ Qi17 = rabbit_queue_index:flush_journal(Qi16),
{0, 20001, Qi18} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
_Qi19 = rabbit_queue_index:terminate(Qi18),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 33e09c113d..da56487e01 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -35,7 +35,8 @@
set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1,
ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1,
requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4,
- tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1]).
+ tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1,
+ can_flush_journal/1, flush_journal/1]).
%%----------------------------------------------------------------------------
@@ -412,6 +413,13 @@ needs_sync(#vqstate { on_sync = {_, _, []} }) ->
needs_sync(_) ->
true.
+can_flush_journal(#vqstate { index_state = IndexState }) ->
+ rabbit_queue_index:can_flush_journal(IndexState).
+
+flush_journal(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state =
+ rabbit_queue_index:flush_journal(IndexState) }.
+
%%----------------------------------------------------------------------------
persistent_msg_ids(Pubs) ->